### Libraries Requirements

In [1]:
# !pip3 install -r requirements.txt

In [2]:
import pandas as pd
import numpy as np
import random
import string
import json

from tqdm import tqdm

import warnings
warnings.filterwarnings("ignore")

#### Extract Config Data and Define Processing Methods

In [3]:
with open('config.json', 'r') as f:
    config_data = json.load(f)

PROCESSING_INFO = config_data['PROCESSING_INFO']

In [4]:
def generate_identifier(length=30):
    """Generate a random identifier of a given length."""
    characters = string.ascii_letters + string.digits
    identifier = ''.join(random.choice(characters) for _ in range(length))
    return identifier

def preprocess_df(df, processing_info):
    """Preprocesses the input DataFrame by setting correct dtypes, filling NaN, filtering unnecessary fields.

    Args:
        df (pandas.DataFrame): Input DataFrame to be preprocessed.
        processing_info (dict): Information about the processing setup.

    Returns:
        pandas.DataFrame: Preprocessed DataFrame.
    """
    data = df[processing_info["raw_columns"]].copy()
    mapping = dict(zip(processing_info["raw_columns"], processing_info["new_columns"]))
    data.columns = [mapping[col] for col in data.columns]

    data["day"] = pd.to_datetime(data["day"])
    # data["ad_set"] = data["ad_set"].astype(str)

    identifiers = [generate_identifier() for _ in range(data["ad_set"].nunique())]
    assert len(set(identifiers)) == data["ad_set"].nunique()
    mapping = dict(zip(data["ad_set"].unique(), list(identifiers)))

    data["ad_set"] = data["ad_set"].apply(lambda x: mapping[x])
    
    data["clicks"] = data["clicks"].astype(float)
    data["impressions"] = data["impressions"].astype(float)

    if not data["time_of_day"].dtype == int:
        data["hour"] = pd.to_datetime(data["time_of_day"].apply(lambda x: x.split(" - ")[0])).dt.hour + 1
    else:
        data["hour"] = data["time_of_day"]
    data = data[processing_info["init_fields"]]

    temp = pd.DataFrame({col: [] for col in data.columns})
    temp["day_num"] = []

    for add in data["ad_set"].unique():
       cons_days = data[data["ad_set"] == add]["day"].unique()[:processing_info["days"]]
       days_map = {day: i+1 for i, day in enumerate(cons_days)}
       df = data[(data["ad_set"] == add) & (data["day"].isin(cons_days))].copy()
       df["day_num"] = df["day"].apply(lambda x: days_map[x])
       temp = pd.concat([temp, df])
    
    data = temp
    data = data.sort_values(["ad_set", "day_num", "hour"]).reset_index(drop=True)

    data["clicks"].fillna(0, inplace=True)
    data["spend"].fillna(0, inplace=True)
    data["purchases"].fillna(0, inplace=True)
    data["leads"].fillna(0, inplace=True)
    data["impressions"].fillna(0, inplace=True)
    data["revenue"] = data["purchases"]*processing_info["ltv"]

    data["cpl"] = data["spend"] / data["leads"]
    data["ctr"] = data["clicks"] / data["impressions"]
    data["cpm"] = data["spend"] / data["impressions"] * 1000

    res = data.groupby("ad_set")["spend"].count()
    ad_sets = res[res >= 24*processing_info["days"]].index
    data = data[data["ad_set"].isin(ad_sets)]

    return data

def label_sucess(df):
    """Labels success based on total ROAS.

    Args:
        df (pandas.DataFrame): Input DataFrame with revenue and spend columns.

    Returns:
        pandas.DataFrame: DataFrame with success column added.
    """
    data = df.copy()

    spend = data.groupby("ad_set")["spend"].sum()
    revenue = data.groupby("ad_set")["revenue"].sum()

    roas = revenue / spend
    success = {add: 1 if roas.loc[add] > 1 else 0 for add in roas.index}
    data["success"] = data["ad_set"].apply(lambda x: success[x])

    return data

def calculate_cummulatives(df, processing_info):
    """Calculates cumulative metrics for specified fields.

    Args:
        df (pandas.DataFrame): Input DataFrame with necessary columns.
        processing_info (dict): Information about processing setup.

    Returns:
        pandas.DataFrame: DataFrame with cumulative metrics calculated.
    """
    temp = pd.DataFrame(columns=df.columns)
    for id in df["ad_set"].unique():
        subset = df[df["ad_set"] == id].copy().sort_values(["ad_set", "day_num", "hour"]).reset_index(drop=True)
        for metric in processing_info["cumm_fields"]:
            subset[f"{metric}_cumm"] = np.where(~np.isfinite(subset[f"{metric}"]), 0, subset[f"{metric}"]).cumsum()
        
        subset["cpl_sub"] = subset["spend_cumm"] / subset["leads_cumm"]
        subset["ctr_sub"] = subset["clicks_cumm"] / subset["impressions_cumm"]
        subset["cpm_sub"] = subset["spend_cumm"] / subset["impressions_cumm"] * 1000
        subset["roas_sub"] = subset["revenue_cumm"] / subset["spend_cumm"]

        subset["cpl_sub"] = np.where(~(np.isfinite(subset["cpl_sub"])), 0, subset["cpl_sub"])
        subset["ctr_sub"] = np.where(~(np.isfinite(subset["ctr_sub"])), 0, subset["ctr_sub"])
        subset["cpm_sub"] = np.where(~(np.isfinite(subset["cpm_sub"])), 0, subset["cpm_sub"])
        subset["roas_sub"] = np.where(~(np.isfinite(subset["roas_sub"])), 0, subset["roas_sub"])

        temp = pd.concat([temp, subset])

    temp.reset_index(drop=True, inplace=True)
    return temp

def calculate_increases(df, processing_info):
    """Calculates percentage increases for specified fields.

    Args:
        df (pandas.DataFrame): Input DataFrame with necessary columns.
        processing_info (dict): Information about processing setup.

    Returns:
        pandas.DataFrame: DataFrame with percentage increases calculated.
    """
    temp = pd.DataFrame(columns=df.columns)
    for id in df["ad_set"].unique():
        subset = df[df["ad_set"] == id].copy().sort_values(["ad_set", "day", "hour"]).reset_index(drop=True)
        for metric in processing_info["incr_fields"]:
            subset[f"{metric}_incr"] = ((subset[f"{metric}"] - subset[f"{metric}"].shift(1)) / subset[f"{metric}"].shift(1) * 100).fillna(0)
            subset[f"{metric}_incr"] = np.where(~np.isfinite(subset[f"{metric}_incr"]), 0, subset[f"{metric}_incr"])
        temp = pd.concat([temp, subset])
    temp.reset_index(drop=True, inplace=True)
    return temp

def process_df(filename, processing_info=PROCESSING_INFO, prefix = "", to_return=False):
    """Prepares the DataFrame for model training and Thompson Sampling evaluation.

    Args:
        filename (str): Name of the file to be processed.
        processing_info (dict, optional): Information about processing setup. Defaults to PROCESSING_INFO.
        prefix (str, optional): Prefix to be added to the processed filename. Defaults to "".
        to_return (bool, optional): Whether to return the processed DataFrame. Defaults to False.

    Returns:
        pandas.DataFrame or None: Processed DataFrame if to_return is True, else None.
    """
    df = pd.read_csv(processing_info["raw_folder"] + filename)
    df = preprocess_df(df, processing_info)
    df = label_sucess(df)
    df = calculate_cummulatives(df, processing_info)
    df = calculate_increases(df, processing_info)

    df.to_csv(processing_info["processed_folder"] + prefix + filename, index=False)

    if to_return:
        return df

### Process Data

In [5]:
FILES = [
    "01-14.04.2023.csv",
    "03-09.09.2023.csv",
    "01-07.11.2023.csv",
    "12-18.12.2023.csv",
    "15-17.02.2024.csv",
]

for filename in tqdm(FILES):
    data = process_df(filename, prefix="", to_return=True)

  0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 5/5 [00:41<00:00,  8.26s/it]
