In [0]:
%pip install polars
%pip install fastexcel

In [0]:
import polars as pl
import glob
import fastexcel
import datetime
import pandas as pd
import logging
import os.path
import sys
from datetime import datetime, timedelta, time
from pathlib import Path
from pyspark.sql import SparkSession
from typing import Literal
from zoneinfo import ZoneInfo

In [0]:
# Uncomment below if I ever need to use spark
# spark = SparkSession.builder.config("spark.sql.execution.arrow.pyspark.enabled", "true").getOrCreate()
# Example: spark_main = spark.createDataFrame(df.to_arrow()).createOrReplaceTempView("spark_main") to create a spark dataframe from df

tz = ZoneInfo('Asia/Singapore')
today = datetime.now(tz)

mediatel_date = (today - timedelta(days=1))
mediatel_date = mediatel_date.strftime("%Y%m%d")

excel_epoch = datetime(1899, 12, 30)

select_cols = [
    "Queuedescription",
    "Incomingcalltime",
    "IVR_Time",
    "ExitType",
    "Calltype",
    "Call_Reason",
    "Call_Code",
    "Hierarchy",
    "Loan_ID"
    ]

In [0]:
def f_LoadLatestMediatel(mediatel_date:str, select_cols:list):
    mediatel_file = glob.glob(f"/Workspace/Users/brett.kim@tilt.com/Mediatel Extracts/{mediatel_date}*.xlsx")
    if not mediatel_file:
        raise FileNotFoundError("No Mediatel extract found")
    latest_file = max(mediatel_file)
    df_mediatel = pl.read_excel(latest_file, columns = select_cols).filter(~pl.col("Queuedescription").is_in(["Inbound", "Outbound"]))

    return df_mediatel

In [0]:
def f_CreateMetrics(df, timeframe=Literal["daily", "weekly", "monthly"]):
    # The difference between daily, weekly, and monthly timeframes is the group by variables for counting unique accounts over each respective timeframe.  
    if timeframe == "daily":
        group_vars = ["Loan_ID"] # Do not need DPD type for daily grouping because accounts do not move between DPD groups in the same day.
    elif timeframe == "weekly":
        group_vars = ["week_num", "DPD_type", "Loan_ID"] # For weekly & monthly need to include DPD type because accounts move DPD groups over time.
    elif timeframe == "monthly":
        group_vars =["call_month", "DPD_type", "Loan_ID"]
    else:
        print(f"Invalid timeframe {timeframe} provided. Can only take 'daily', 'weekly', or 'monthly'")

    hc_logic = (
        pl.when((pl.col("Calltype") == "Manual Outgoing") & (pl.col("Call_Code") == "CONNECTION FAILED")).then(0)
        .when((pl.col("Calltype") == "Outbound predictive") & (pl.col("Call_Code") == "CONNECTION FAILED")).then(1)
        .when(((pl.col("call_time_of_day") == pl.lit("Daytime")) & (pl.col("Hierarchy").is_between(1, 12)))).then(1)
        .otherwise(0)
        .alias("HC_Connect")
    )

    # Create metrics
    df = (
        df
        # Group Queuedescription into pre-due and M1 buckets
        .with_columns(pl.when(pl.col("Queuedescription").str.contains("M1"))
                        .then(pl.lit("M1"))
                        .otherwise(pl.lit("Pre-Due"))
                        .alias("DPD_type"))
        # Convert call time into a date and time columns
        .with_columns((pl.col("Incomingcalltime").cast(pl.Float64)
                        .map_elements(lambda x: excel_epoch + timedelta(days=x) if x is not None else None).alias("incoming_calltime_converted")))
        .with_columns(pl.col("incoming_calltime_converted")
                        .map_elements(lambda x: x.date() if x is not None else None).alias("call_date"))
        .with_columns(pl.col("incoming_calltime_converted")
                        .map_elements(lambda x: x.time() if x is not None else None).alias("call_time"))
        .with_columns(pl.col("IVR_Time").str.strptime(pl.Time, "%H:%M:%S").alias("ivr_time_converted"))
        # Bucket call types into Daytime (8am - 5pm) and Afterhours (5pm - 9pm)
        .with_columns(pl.when((pl.col("call_time") >= time(8, 0)) & 
                                (pl.col("call_time") < time(17, 0)))
                        .then(pl.lit("Daytime"))
                        .otherwise(pl.lit("Afterhours"))
                        .alias("call_time_of_day"))
        # Create columns for customer connect funnel by creating columns for HC, IVR, and VB calls
        # HC Connect
        .with_columns(hc_logic.alias("HC_Connect"))
        # Split out HC Connects into Agent connects, and various stages of drop-offs
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") == 8))
                        .then(1)
                        .otherwise(0)
                        .alias("HC_Connect_DropOff"))
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy").is_between(9, 10)))
                        .then(1)
                        .otherwise(0)
                        .alias("HC_Connect_Voicemail"))
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy").is_between(11, 12)))
                        .then(1)
                        .otherwise(0)
                        .alias("HC_Connect_Other"))
        # Agent Connect is are calls where agents converse with agents
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") <= 7))
                        .then(1)
                        .otherwise(0)
                        .alias("HC_Connect_AgentConnect"))
        # Granular breakdown of Agent Connects
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") == 1))
                        .then(1)
                        .otherwise(0)
                        .alias("AC_PTP"))
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") == 2))
                        .then(1)
                        .otherwise(0)
                        .alias("AC_NO_PTP"))
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") == 3))
                        .then(1)
                        .otherwise(0)
                        .alias("AC_Paid_Loan"))
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") == 4))
                        .then(1)
                        .otherwise(0)
                        .alias("AC_DNC"))
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") == 5))
                        .then(1)
                        .otherwise(0)
                        .alias("AC_Call_Back"))
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") == 6))
                        .then(1)
                        .otherwise(0)
                        .alias("AC_Hang_Up"))
        .with_columns(pl.when((hc_logic == 1) & (pl.col("Hierarchy") == 7))
                        .then(1)
                        .otherwise(0)
                        .alias("AC_PTP_FFUP"))
        # IVR Connect is where a customer picks up a dial, and is put in IVR. This is broken up into two instances:
        # In Queue is when dialled, customer picked up, but did not get through to agent. 
        # Ring Agent is when dialled, customer picked up, and was about to get transferred to agent, but failed in the process
        # In either case it's a partial fail since a customer did not connect to an agent.
        # When an IVR successfully connects to an agent, it will be counted under one of the HC connections.
        .with_columns(pl.when((pl.col("ExitType").is_in(pl.lit(["In Queue", "Ring Agent"]))) & 
                                (pl.col("Call_Reason") == pl.lit("Closed waiting")) &
                                (pl.col("call_time_of_day") == pl.lit("Daytime")))
                        .then(1)
                        .otherwise(0)
                        .alias("IVR_Connect"))
        # VB Connect is like IVR but after hours. 
        # We have a minimum 3 second call time for a Voice Blaster to be considered as a sucessful connection.
        # Including 30 January 2026 and onward, new calling strategy means not all accounts receive human calls, these are the "COL VB Low Risk Predue -5 to -1" and "COL VB Med Risk Predue -5 to -4" cohorts.
        .with_columns(pl.when((pl.col("ExitType") == pl.lit("In IVR")) & 
                                (pl.col("Call_Reason") == pl.lit("Closed In Script")) &
                                ((pl.col("call_time_of_day") == pl.lit("Afterhours")) | (pl.col("Queuedescription").is_in(["COL VB Low Risk Predue -5 to -1", "COL VB Med Risk Predue -5 to -4"]))) & 
                                (pl.col("ivr_time_converted").dt.second() >= 3))
                        .then(1)
                        .otherwise(0)
                        .alias("VB_Connect"))
        # Create columns for processed accounts & PTP specific counts
        .with_columns(pl.when((pl.col("Hierarchy") >= 1) &
                                (pl.col("Hierarchy") <= 4))
                        .then(1)
                        .otherwise(0)
                        .alias("Processed_account"))
        .with_columns(pl.when(pl.col("Hierarchy") == 1).then(1).otherwise(0).alias("PTP"))
        # Accounts by queuedescription
        .with_columns(pl.when(pl.col("Queuedescription").is_in(["COL VB Low Risk Predue -5 to -1", "COL VB Med Risk Predue -5 to -4"])).then(0).otherwise(1).alias("HC_Account"))
        .with_columns(pl.when((pl.col("Queuedescription").is_in(["COL VB Low Risk Predue -5 to -1", "COL VB Med Risk Predue -5 to -4"]))|(pl.col("call_time_of_day") == pl.lit("Afterhours"))).then(1).otherwise(0).alias("VB_Account"))
        .with_columns(pl.when(pl.col("Queuedescription").is_in(["COL VB Low Risk Predue -5 to -1", "COL VB Med Risk Predue -5 to -4"])).then(1).otherwise(0).alias("VB_Only_Account"))
        # Count up accounts by dial type
        .with_columns((pl.col("HC_Account").sum()
                        .over(group_vars) > 0)
                        .cast(pl.Int8)
                        .alias("HC_Account"))
        .with_columns((pl.col("VB_Account").sum()
                        .over(group_vars) > 0)
                        .cast(pl.Int8)
                        .alias("VB_Account"))
        .with_columns((pl.col("VB_Only_Account").sum()
                        .over(group_vars) > 0)
                        .cast(pl.Int8)
                        .alias("VB_Only_Account"))
        # Spins by dial type
        .with_columns(pl.when((pl.col("HC_Account") == 1) & (pl.col("call_time_of_day") == "Daytime")).then(1).otherwise(0).alias("HC_Spin"))
        .with_columns(pl.when((pl.col("HC_Account") == 1) & (pl.col("call_time_of_day") == "Daytime")).then(0).otherwise(1).alias("VB_Spin"))
        .sort(["Loan_ID", "call_date", "AC_PTP", "AC_NO_PTP", "AC_Paid_Loan", "AC_DNC", "AC_Call_Back", "AC_Hang_Up", "AC_PTP_FFUP"], descending = [False, False, True, True, True, True, True, True, True])
        )
        
        

    return df

In [0]:
def f_CreateSummary(df, timeframe=Literal["daily", "weekly", "monthly"]):
    if timeframe == "daily":
        group_vars = ["call_date", "DPD_type", "Loan_ID"]
    elif timeframe == "weekly":
        group_vars = ["week_num", "min_date", "max_date", "week_title", "DPD_type", "Loan_ID"]
        df = (df
              .with_columns(pl.col("call_date").min().over("week_num").alias("min_date"))
              .with_columns(pl.col("call_date").max().over("week_num").alias("max_date"))
              .with_columns(pl.format("Week {}: {} - {}", 
                                      pl.col("week_num"), 
                                      pl.col("min_date").dt.strftime("%b-%d"), 
                                      pl.col("max_date").dt.strftime("%b-%d"))
                            .alias("week_title"))
        )
    elif timeframe == "monthly":
        group_vars = ["call_month", "DPD_type", "Loan_ID"]
    else:
        print(
            f"Invalid timeframe {timeframe} provided. Can only take 'daily', 'weekly', or 'monthly'"
        )

    # First create summary that does unique accounts on a hierarchical basis
    df_summary = df.group_by(group_vars).agg(
        pl.when(pl.col("Loan_ID").n_unique() > 0).then(1).otherwise(0).alias("Unique_Accounts"),
        pl.when(pl.col("HC_Connect").sum() > 0).then(1).otherwise(0).alias("HC_Connect"),
        pl.when((pl.col("HC_Connect").sum() == 0) & (pl.col("IVR_Connect").sum() > 0)).then(1).otherwise(0).alias("IVR_Connect"),
        pl.when((pl.col("HC_Connect").sum() == 0) & (pl.col("IVR_Connect").sum() == 0) & (pl.col("VB_Connect").sum() > 0)).then(1).otherwise(0).alias("VB_Connect"),
        pl.when((pl.col("HC_Connect").sum() + pl.col("IVR_Connect").sum() + pl.col("VB_Connect").sum()) > 0).then(1).otherwise(0).alias("Total_Connect"),
        pl.when(pl.col("HC_Connect_AgentConnect").sum() > 0).then(1).otherwise(0).alias("HC_Connect_AgentConnect"),
        pl.when((pl.col("HC_Connect_AgentConnect").sum() == 0) & (pl.col("HC_Connect_DropOff").sum() > 0)).then(1).otherwise(0).alias("HC_Connect_DropOff"),
        pl.when((pl.col("HC_Connect_AgentConnect").sum() == 0) & (pl.col("HC_Connect_DropOff").sum() == 0) & (pl.col("HC_Connect_Voicemail").sum() > 0)).then(1).otherwise(0).alias("HC_Connect_Voicemail"),
        pl.when((pl.col("HC_Connect_AgentConnect").sum() == 0) & (pl.col("HC_Connect_DropOff").sum() == 0) & (pl.col("HC_Connect_Voicemail").sum() == 0) & (pl.col("HC_Connect_Other").sum() > 0)).then(1).otherwise(0).alias("HC_Connect_Other"),
        pl.when(pl.col("AC_PTP").sum() > 0).then(1).otherwise(0).alias("AC_PTP"),
        pl.when((pl.col("AC_PTP").sum() == 0) & (pl.col("AC_NO_PTP").sum() > 0)).then(1).otherwise(0).alias("AC_NO_PTP"),
        pl.when((pl.col("AC_PTP").sum() == 0) & (pl.col("AC_NO_PTP").sum() == 0) & (pl.col("AC_Paid_Loan").sum() > 0)).then(1).otherwise(0).alias("AC_Paid_Loan"),
        pl.when((pl.col("AC_PTP").sum() == 0) & (pl.col("AC_NO_PTP").sum() == 0) & (pl.col("AC_Paid_Loan").sum() == 0) & (pl.col("AC_DNC").sum() > 0)).then(1).otherwise(0).alias("AC_DNC"),
        pl.when((pl.col("AC_PTP").sum() == 0) & (pl.col("AC_NO_PTP").sum() == 0) & (pl.col("AC_Paid_Loan").sum() == 0) & (pl.col("AC_DNC").sum() == 0) & (pl.col("AC_Call_Back").sum() > 0)).then(1).otherwise(0).alias("AC_Call_Back"),
        pl.when((pl.col("AC_PTP").sum() == 0) & (pl.col("AC_NO_PTP").sum() == 0) & (pl.col("AC_Paid_Loan").sum() == 0) & (pl.col("AC_DNC").sum() == 0) & (pl.col("AC_Call_Back").sum() == 0) & (pl.col("AC_Hang_Up").sum() > 0)).then(1).otherwise(0).alias("AC_Hang_Up"),
        pl.when((pl.col("AC_PTP").sum() == 0) & (pl.col("AC_NO_PTP").sum() == 0) & (pl.col("AC_Paid_Loan").sum() == 0) & (pl.col("AC_DNC").sum() == 0) & (pl.col("AC_Call_Back").sum() == 0) & (pl.col("AC_Hang_Up").sum() == 0) & (pl.col("AC_PTP_FFUP").sum() > 0)).then(1).otherwise(0).alias("AC_PTP_FFUP"),
        pl.when(pl.col("Processed_account").sum() > 0).then(1).otherwise(0).alias("Processed_account"),
        pl.when(pl.col("HC_Account").sum() > 0).then(1).otherwise(0).alias("HC_Account"),
        pl.when(pl.col("VB_Account").sum() > 0).then(1).otherwise(0).alias("VB_Account"),
        pl.when(pl.col("VB_Only_Account").sum() > 0).then(1).otherwise(0).alias("VB_Only_Account"),
        # Spin basis
        pl.col("HC_Spin").sum(),
        pl.col("VB_Spin").sum(),
        pl.col("HC_Connect").sum().alias("HC_Spin_Connect"),
        pl.col("IVR_Connect").sum().alias("IVR_Spin_Connect"),
        pl.col("VB_Connect").sum().alias("VB_Spin_Connect"),
        pl.col("HC_Connect_DropOff").sum().alias("HC_Spin_Connect_DropOff"),
        pl.col("HC_Connect_Voicemail").sum().alias("HC_Spin_Connect_Voicemail"),
        pl.col("HC_Connect_Other").sum().alias("HC_Spin_Connect_Other"),
        pl.col("HC_Connect_AgentConnect").sum().alias("HC_Spin_Connect_AgentConnect"),
        pl.col("AC_PTP").sum().alias("AC_Spin_PTP"),
        pl.col("AC_NO_PTP").sum().alias("AC_Spin_NO_PTP"),
        pl.col("AC_Paid_Loan").sum().alias("AC_Spin_Paid_Loan"),
        pl.col("AC_DNC").sum().alias("AC_Spin_DNC"),
        pl.col("AC_Call_Back").sum().alias("AC_Spin_Call_Back"),
        pl.col("AC_Hang_Up").sum().alias("AC_Spin_Hang_Up"),
        pl.col("AC_PTP_FFUP").sum().alias("AC_Spin_PTP_FFUP")
    )

    # Summarise the unique account based summary into Tableau summary
    group_vars = [x for x in group_vars if x != "Loan_ID"]
    
    df_summary = df_summary.group_by(group_vars).agg(
        pl.col("Unique_Accounts").sum(),
        pl.col("HC_Connect").sum(),
        pl.col("IVR_Connect").sum(),
        pl.col("VB_Connect").sum(),
        pl.col("Total_Connect").sum(),
        pl.col("HC_Connect_AgentConnect").sum(),
        pl.col("HC_Connect_DropOff").sum(),
        pl.col("HC_Connect_Voicemail").sum(),
        pl.col("HC_Connect_Other").sum(),
        pl.col("AC_PTP").sum(),
        pl.col("AC_NO_PTP").sum(),
        pl.col("AC_Paid_Loan").sum(),
        pl.col("AC_DNC").sum(),
        pl.col("AC_Call_Back").sum(),
        pl.col("AC_Hang_Up").sum(),
        pl.col("AC_PTP_FFUP").sum(),
        pl.col("Processed_account").sum(),
        pl.col("HC_Account").sum(),
        pl.col("VB_Account").sum(),
        pl.col("HC_Spin").sum(),
        pl.col("VB_Spin").sum(),
        pl.col("HC_Spin_Connect").sum(),
        pl.col("IVR_Spin_Connect").sum(),
        pl.col("VB_Spin_Connect").sum(),
        pl.col("HC_Spin_Connect_DropOff").sum(),
        pl.col("HC_Spin_Connect_Voicemail").sum(),
        pl.col("HC_Spin_Connect_Other").sum(),
        pl.col("HC_Spin_Connect_AgentConnect").sum(),
        pl.col("AC_Spin_PTP").sum(),
        pl.col("AC_Spin_NO_PTP").sum(),
        pl.col("AC_Spin_Paid_Loan").sum(),
        pl.col("AC_Spin_DNC").sum(),
        pl.col("AC_Spin_Call_Back").sum(),
        pl.col("AC_Spin_Hang_Up").sum(),
        pl.col("AC_Spin_PTP_FFUP").sum(),
        pl.col("VB_Only_Account").sum(),
    )

    return df_summary

# Daily functions

In [0]:
def f_DailyCreateSparkTable(df, table_name:str):
    # Convert to PySpark Dataframe
    from pyspark.sql.types import (
        StructType, StructField,
        DateType, StringType, IntegerType, LongType
    )

    # Ensure that the order of the schema columns are the same as the column ordering in the dataframe
    schema = StructType([
        StructField("call_date", DateType(), True),
        StructField("DPD_type", StringType(), True),
        StructField("Unique_Accounts", IntegerType(), True),
        StructField("HC_Connect", IntegerType(), True),
        StructField("IVR_Connect", IntegerType(), True),
        StructField("VB_Connect", IntegerType(), True),
        StructField("Total_Connect", IntegerType(), True),
        StructField("HC_Connect_AgentConnect", IntegerType(), True),
        StructField("HC_Connect_DropOff", IntegerType(), True),
        StructField("HC_Connect_Voicemail", IntegerType(), True),
        StructField("HC_Connect_Other", IntegerType(), True),
        StructField("AC_PTP", IntegerType(), True),
        StructField("AC_NO_PTP", IntegerType(), True),
        StructField("AC_Paid_Loan", IntegerType(), True),
        StructField("AC_DNC", IntegerType(), True),
        StructField("AC_Call_Back", IntegerType(), True),
        StructField("AC_Hang_Up", IntegerType(), True),
        StructField("AC_PTP_FFUP", IntegerType(), True),
        StructField("Processed_account", IntegerType(), True),
        StructField("HC_Account", IntegerType(), True),
        StructField("VB_Account", IntegerType(), True),
        StructField("HC_Spin", IntegerType(), True),
        StructField("VB_Spin", IntegerType(), True),
        StructField("HC_Spin_Connect", IntegerType(), True),
        StructField("IVR_Spin_Connect", IntegerType(), True),
        StructField("VB_Spin_Connect", IntegerType(), True),
        StructField("HC_Spin_Connect_DropOff", IntegerType(), True),
        StructField("HC_Spin_Connect_Voicemail", IntegerType(), True),
        StructField("HC_Spin_Connect_Other", IntegerType(), True),
        StructField("HC_Spin_Connect_AgentConnect", IntegerType(), True),
        StructField("AC_Spin_PTP", IntegerType(), True),
        StructField("AC_Spin_NO_PTP", IntegerType(), True),
        StructField("AC_Spin_Paid_Loan", IntegerType(), True),
        StructField("AC_Spin_DNC", IntegerType(), True),
        StructField("AC_Spin_Call_Back", IntegerType(), True),
        StructField("AC_Spin_Hang_Up", IntegerType(), True),
        StructField("AC_Spin_PTP_FFUP", IntegerType(), True),
        StructField("VB_Only_Account", IntegerType(), True),
    ])

    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    spark_df = spark.createDataFrame(df, schema=schema)

    return spark_df

# Weekly functions

In [0]:
def f_WeeklyCreateWeekNum(df):
    df = (
        df
        # Group Queuedescription into pre-due and M1 buckets
        .with_columns(pl.when(pl.col("Queuedescription").str.contains("M1"))
                      .then(pl.lit("M1"))
                      .otherwise(pl.lit("Pre-Due"))
                      .alias("DPD_type"))
        # Convert call time into a date and time column
        .with_columns((pl.col("Incomingcalltime").cast(pl.Float64).map_elements(lambda x: excel_epoch + timedelta(days=x) if x is not None else None).alias("incoming_calltime_converted")))
        .with_columns(pl.col("incoming_calltime_converted").map_elements(lambda x: x.date() if x is not None else None).alias("call_date"))
        .with_columns(pl.col("incoming_calltime_converted").map_elements(lambda x: x.time() if x is not None else None).alias("call_time"))
        .with_columns(pl.col("IVR_Time").str.strptime(pl.Time, "%H:%M:%S").alias("ivr_time_converted"))
        # Bucket call types into Daytime (8am - 5pm) and Afterhours (5pm - 9pm
        .with_columns(pl.when((pl.col("call_time") >= time(8, 0)) & (pl.col("call_time") < time(17, 0)))
                      .then(pl.lit("Daytime"))
                      .otherwise(pl.lit("Afterhours"))
                      .alias("call_time_of_day"))
        # Create Nth week of year column
        .with_columns(pl.col("call_date").dt.week().alias("week_num"))
        )
    return df

In [0]:
def f_WeeklyCreateSparkTable(df, table_name:str):
    # Convert to PySpark Dataframe
    from pyspark.sql.types import (
        StructType, StructField,
        DateType, StringType, IntegerType, LongType
        )

    # Ensure that the order of the schema columns are the same as the column ordering in the dataframe
    schema = StructType([
        StructField("week_num", IntegerType(), True),
        StructField("min_date", DateType(), True),
        StructField("max_date", DateType(), True),
        StructField("week_title", StringType(), True),
        StructField("DPD_type", StringType(), True),
        StructField("Unique_Accounts", IntegerType(), True),
        StructField("HC_Connect", IntegerType(), True),
        StructField("IVR_Connect", IntegerType(), True),
        StructField("VB_Connect", IntegerType(), True),
        StructField("Total_Connect", IntegerType(), True),
        StructField("HC_Connect_AgentConnect", IntegerType(), True),
        StructField("HC_Connect_DropOff", IntegerType(), True),
        StructField("HC_Connect_Voicemail", IntegerType(), True),
        StructField("HC_Connect_Other", IntegerType(), True),
        StructField("AC_PTP", IntegerType(), True),
        StructField("AC_NO_PTP", IntegerType(), True),
        StructField("AC_Paid_Loan", IntegerType(), True),
        StructField("AC_DNC", IntegerType(), True),
        StructField("AC_Call_Back", IntegerType(), True),
        StructField("AC_Hang_Up", IntegerType(), True),
        StructField("AC_PTP_FFUP", IntegerType(), True),
        StructField("Processed_account", IntegerType(), True),
        StructField("HC_Account", IntegerType(), True),
        StructField("VB_Account", IntegerType(), True),
        StructField("HC_Spin", IntegerType(), True),
        StructField("VB_Spin", IntegerType(), True),
        StructField("HC_Spin_Connect", IntegerType(), True),
        StructField("IVR_Spin_Connect", IntegerType(), True),
        StructField("VB_Spin_Connect", IntegerType(), True),
        StructField("HC_Spin_Connect_DropOff", IntegerType(), True),
        StructField("HC_Spin_Connect_Voicemail", IntegerType(), True),
        StructField("HC_Spin_Connect_Other", IntegerType(), True),
        StructField("HC_Spin_Connect_AgentConnect", IntegerType(), True),
        StructField("AC_Spin_PTP", IntegerType(), True),
        StructField("AC_Spin_NO_PTP", IntegerType(), True),
        StructField("AC_Spin_Paid_Loan", IntegerType(), True),
        StructField("AC_Spin_DNC", IntegerType(), True),
        StructField("AC_Spin_Call_Back", IntegerType(), True),
        StructField("AC_Spin_Hang_Up", IntegerType(), True),
        StructField("AC_Spin_PTP_FFUP", IntegerType(), True),
        StructField("VB_Only_Account", IntegerType(), True),
        ])  

    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    spark_df = spark.createDataFrame(df, schema=schema)

    return spark_df

# Monthly functions

In [0]:
def f_MonthlyCreateMonthNum(df):
    df = (
        df
        # Group Queuedescription into pre-due and M1 buckets
        .with_columns(pl.when(pl.col("Queuedescription").str.contains("M1"))
                      .then(pl.lit("M1"))
                      .otherwise(pl.lit("Pre-Due"))
                      .alias("DPD_type"))
        # Convert call time into a date and time column
        .with_columns((pl.col("Incomingcalltime").cast(pl.Float64).map_elements(lambda x: excel_epoch + timedelta(days=x) if x is not None else None).alias("incoming_calltime_converted")))
        .with_columns(pl.col("incoming_calltime_converted").map_elements(lambda x: x.date() if x is not None else None).alias("call_date"))
        .with_columns(pl.col("incoming_calltime_converted").map_elements(lambda x: x.time() if x is not None else None).alias("call_time"))
        .with_columns(pl.col("IVR_Time").str.strptime(pl.Time, "%H:%M:%S").alias("ivr_time_converted"))
        # Bucket call types into Daytime (8am - 5pm) and Afterhours (5pm - 9pm
        .with_columns(pl.when((pl.col("call_time") >= time(8, 0)) & (pl.col("call_time") < time(17, 0)))
                      .then(pl.lit("Daytime"))
                      .otherwise(pl.lit("Afterhours"))
                      .alias("call_time_of_day"))
        # Floor call_date to start of month
        .with_columns(pl.col("call_date").dt.month_start().alias("call_month"))
    )
    return df

In [0]:
def f_MonthlyCreateSparkTable(df, table_name:str):
    # Convert to PySpark Dataframe
    from pyspark.sql.types import (
        StructType, StructField,
        DateType, StringType, IntegerType, LongType
        )

    # Ensure that the order of the schema columns are the same as the column ordering in the dataframe
    schema = StructType([
        StructField("call_month", DateType(), True),
        StructField("DPD_type", StringType(), True),
        StructField("Unique_Accounts", IntegerType(), True),
        StructField("HC_Connect", IntegerType(), True),
        StructField("IVR_Connect", IntegerType(), True),
        StructField("VB_Connect", IntegerType(), True),
        StructField("Total_Connect", IntegerType(), True),
        StructField("HC_Connect_AgentConnect", IntegerType(), True),
        StructField("HC_Connect_DropOff", IntegerType(), True),
        StructField("HC_Connect_Voicemail", IntegerType(), True),
        StructField("HC_Connect_Other", IntegerType(), True),
        StructField("AC_PTP", IntegerType(), True),
        StructField("AC_NO_PTP", IntegerType(), True),
        StructField("AC_Paid_Loan", IntegerType(), True),
        StructField("AC_DNC", IntegerType(), True),
        StructField("AC_Call_Back", IntegerType(), True),
        StructField("AC_Hang_Up", IntegerType(), True),
        StructField("AC_PTP_FFUP", IntegerType(), True),
        StructField("Processed_account", IntegerType(), True),
        StructField("HC_Account", IntegerType(), True),
        StructField("VB_Account", IntegerType(), True),
        StructField("HC_Spin", IntegerType(), True),
        StructField("VB_Spin", IntegerType(), True),
        StructField("HC_Spin_Connect", IntegerType(), True),
        StructField("IVR_Spin_Connect", IntegerType(), True),
        StructField("VB_Spin_Connect", IntegerType(), True),
        StructField("HC_Spin_Connect_DropOff", IntegerType(), True),
        StructField("HC_Spin_Connect_Voicemail", IntegerType(), True),
        StructField("HC_Spin_Connect_Other", IntegerType(), True),
        StructField("HC_Spin_Connect_AgentConnect", IntegerType(), True),
        StructField("AC_Spin_PTP", IntegerType(), True),
        StructField("AC_Spin_NO_PTP", IntegerType(), True),
        StructField("AC_Spin_Paid_Loan", IntegerType(), True),
        StructField("AC_Spin_DNC", IntegerType(), True),
        StructField("AC_Spin_Call_Back", IntegerType(), True),
        StructField("AC_Spin_Hang_Up", IntegerType(), True),
        StructField("AC_Spin_PTP_FFUP", IntegerType(), True),
        StructField("VB_Only_Account", IntegerType(), True),
        ])  

    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    spark_df = spark.createDataFrame(df, schema=schema)

    return spark_df