In [0]:
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd
# Import the custom function from your project library
from myfunctions import check_for_alert

In [0]:
# Get input and output datasets directly by their names
input_dataset = dataiku.Dataset("unified_weekly_metrics_windows")
output_dataset = dataiku.Dataset("python_leading_indicators_w_alerts")

In [0]:
# Read the input dataset into a Pandas DataFrame
df = input_dataset.get_dataframe()

In [0]:
# --- Ensure correct data types and sort for rolling averages ---
# Convert 'week_start' to datetime if not already
df['week_start'] = pd.to_datetime(df['week_start'])
# Sort data to ensure rolling calculations are correct
df = df.sort_values(by=['region', 'week_start'])

In [0]:
# --- Calculate % Change vs. Rolling Average for each metric ---
# Ensure rolling average columns are numeric, coerce errors to NaN
# NOTE: Column names updated to match the new schema (e.g., quotesCreated_rolling_avg)
df['new_applications_rolling_avg'] = pd.to_numeric(df['new_applications_rolling_avg'], errors='coerce')
df['logins_rolling_avg'] = pd.to_numeric(df['logins_rolling_avg'], errors='coerce')
df['quotesCreated_rolling_avg'] = pd.to_numeric(df['quotesCreated_rolling_avg'], errors='coerce')
df['revenueUSD_rolling_avg'] = pd.to_numeric(df['revenueUSD_rolling_avg'], errors='coerce')

In [0]:
# Calculate percentage change, handling division by zero or NaN rolling averages
def calculate_change(current_value, rolling_avg):
    if pd.notna(rolling_avg) and rolling_avg != 0:
        return (current_value - rolling_avg) / rolling_avg
    return 0 # Or NaN, depending on desired behavior for no rolling average

# NOTE: Column names updated to match the new schema (e.g., quotesCreated_sum, revenueUSD_sum)
#       and new column names made consistent.
df['new_applications_change_vs_avg'] = df.apply(
    lambda row: calculate_change(row['new_applications_sum'], row['new_applications_rolling_avg']),
    axis=1
)
df['logins_change_vs_avg'] = df.apply(
    lambda row: calculate_change(row['logins_sum'], row['logins_rolling_avg']),
    axis=1
)
df['quotesCreated_change_vs_avg'] = df.apply(
    lambda row: calculate_change(row['quotesCreated_sum'], row['quotesCreated_rolling_avg']),
    axis=1
)
df['revenueUSD_change_vs_avg'] = df.apply(
    lambda row: calculate_change(row['revenueUSD_sum'], row['revenueUSD_rolling_avg']),
    axis=1
)

In [0]:
# --- Calculate Alert Flag using the imported function and project variable ---
# Retrieve the custom variable for the alert threshold
alert_threshold = float(dataiku.get_custom_variables()["alert_threshold"])

# Apply the check_for_alert function for each metric
# NOTE: Column names for change metrics and new alert flags updated for consistency.
df['alert_flag_applications'] = df['new_applications_change_vs_avg'].apply(check_for_alert, alert_threshold=alert_threshold)
df['alert_flag_logins'] = df['logins_change_vs_avg'].apply(check_for_alert, alert_threshold=alert_threshold)
df['alert_flag_quotesCreated'] = df['quotesCreated_change_vs_avg'].apply(check_for_alert, alert_threshold=alert_threshold)
df['alert_flag_revenueUSD'] = df['revenueUSD_change_vs_avg'].apply(check_for_alert, alert_threshold=alert_threshold)

# Combine all alert flags into a single 'overall_alert_flag'
# NOTE: Individual alert flag column names updated.
df['overall_alert_flag'] = (df['alert_flag_applications'] | df['alert_flag_logins'] |
                            df['alert_flag_quotesCreated'] | df['alert_flag_revenueUSD']).astype(int)

In [0]:
# Write the result to the output dataset
output_dataset.write_with_schema(df)

In [0]:
# Recipe outputs
# NOTE: Corrected the DataFrame variable name from 'pandas_dataframe' to 'df'
python_leading_indicators_w_alerts = dataiku.Dataset("python_leading_indicators_w_alerts")
python_leading_indicators_w_alerts.write_with_schema(df)