In [0]:
%pip install missingno

In [0]:
from pyspark.sql.functions import col
import datetime
import requests
import json
import pandas as pd
import numpy as np
import missingno as msno
import matplotlib.pyplot as plt

In [0]:
# Databricks notebook source
# Unmounting

mount_point = "/mnt/capstone"
if any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)

storage_account_name = "jscapstone"
container_name = "capstone"
sas_token = "sv=2024-11-04&ss=bfqt&srt=co&sp=rwdlacupyx&se=2026-11-05T06:48:55Z&st=2025-11-04T22:33:55Z&spr=https&sig=C9I%2BFRRtANmD1KDpe%2BthyWBuhsjgVQXRe2%2BwtFr0X6s%3D"  

# Mount if not already mounted
if not any(m.mountPoint == mount_point for m in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
        mount_point = mount_point,
        extra_configs = {f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net": sas_token}
    )

# List files to confirm
display(dbutils.fs.ls("/mnt/capstone"))

In [0]:
# Load CSV Files

google_media_path = '/mnt/capstone/bronze/google/'
meta_media_path = '/mnt/capstone/bronze/meta/'
internal_media_path = '/mnt/capstone/bronze/internal/'
# Google Media
google_media_df = spark.read.csv(google_media_path, header=True, inferSchema=True)
display(google_media_df)
# Meta Media
meta_media_df = spark.read.csv(meta_media_path, header=True, inferSchema=True)
display(meta_media_df)
# Internal Media
internal_df = spark.read.csv(internal_media_path, header=True, inferSchema=True)
display(internal_df)


In [0]:
# Checking Lengths 
print(f'Google DF length: {google_media_df.count()}')
print(f'Meta DF length: {meta_media_df.count()}')
print(f'Internal DF length: {internal_df.count()}')

In [0]:
# Change all columns to lowercase
def lowercase_columns(df):
    for old_col in df.columns:
        new_col = old_col.lower()
        df = df.withColumnRenamed(old_col, new_col)
    return df

google_media_df = lowercase_columns(google_media_df)
meta_media_df = lowercase_columns(meta_media_df)
internal_df = lowercase_columns(internal_df)

google_media_df.printSchema()
meta_media_df.printSchema()
internal_df.printSchema()


## Change Column Names for Readability

In [0]:
# Convert 'organisation_id' to 'organization_id'
google_media_df = google_media_df.withColumnRenamed('organisation_id', 'organization_id')
meta_media_df = meta_media_df.withColumnRenamed('organisation_id', 'organization_id')
internal_df = internal_df.withColumnRenamed('organisation_id', 'organization_id')

In [0]:
internal_df = internal_df.withColumnRenamed('first_purchases', 'new_customers')

## Check Nulls

In [0]:
# Convert Spark df to Pandas DF 
google_pd_df = google_media_df.toPandas()
meta_pd_df = meta_media_df.toPandas()
internal_pd_df = internal_df.toPandas()

There are null values in the spend, clicks, and metrics media. Let's use missingno to check missing at random.

In [0]:
# Calculate percentage of rows with missing values to see if we can delete all the rows
# for loop to generate percentage of missing values

def percent_missing(df):
    percent_rows_na = df.isnull().any(axis=1).sum() 
    perc = percent_rows_na / len(df) * 100
    print(f'Percentage of row with missing values against dataframe total rows: {perc}') 

for df in [google_pd_df, meta_pd_df, internal_pd_df]:
    percent_missing(df)


There is alot of missing rows in google media and meta media - check if these are missing at random or not.

### Visualize Missingness with missingno Matrix

In [0]:
# Sort by date_day
google_order = ['organization_id', 'date_day', 'google_paid_search_spend', 'google_paid_search_impressions', 'google_paid_search_clicks', 
    'google_shopping_spend', 'google_shopping_impressions', 'google_shopping_clicks', 'google_pmax_spend', 'google_pmax_impressions', 'google_pmax_clicks']

meta_order = ['organization_id', 'date_day', 'meta_facebook_spend', 'meta_facebook_impressions', 'meta_facebook_clicks', 'meta_instagram_spend', 'meta_instagram_impressions', 'meta_instagram_clicks']

msno.matrix(google_pd_df[google_order])
plt.title("Missing Data Matrix for Google Media Dataset")

msno.matrix(meta_pd_df[meta_order])
plt.title("Missing Data Matrix for Meta Media Dataset")


The missing values are missing not at random and as suspected are date specific, based no non-media spend dates. 

In [0]:
df.columns

In [0]:
# define media
google_cols = [
    'google_paid_search_spend','google_paid_search_impressions', 'google_paid_search_clicks',
    'google_shopping_spend', 'google_shopping_impressions', 'google_shopping_clicks',
    'google_pmax_spend', 'google_pmax_impressions', 'google_pmax_clicks',
    'google_display_spend', 'google_display_impressions', 'google_display_clicks',
    'google_video_spend', 'google_video_impressions', 'google_video_clicks',
    'meta_facebook_spend', 'meta_facebook_impressions', 'meta_facebook_clicks',
    'meta_instagram_spend', 'meta_instagram_impressions', 'meta_instagram_clicks',
    'meta_other_spend', 'meta_other_impressions', 'meta_other_clicks'
]

# merge meta and google on organization_id and date_day
media_pd_df = google_pd_df.merge(meta_pd_df, on=['organization_id', 'date_day'])
media_pd_df = media_pd_df.sort_values(by=['organization_id', 'date_day'])

# Create zero flags for each channel individually
for col in media_cols:
     media_pd_df[f'{col}_zero_flag'] = (media_pd_df[col] == 0).astype(int)

# Print zero-spend, zero-clicks, zero-impressions per channel

print("Zero-spend days per channel:")
for col in media_cols:

    # if column name has has 'spend' in the name, then do calculation
    if 'spend' in col:
        zero_count = media_pd_df[f'{col}_zero_flag'].sum()
        print(f"{col}: {zero_count} days ({zero_count / len(media_pd_df) * 100:.2f}%)")

print("Zero-impressions days per channel:")
for col in media_cols:
    # if column *_zero_flag has 'impressions' in the name, then do calculation
    if 'impressions' in col:
        zero_count = media_pd_df[f'{col}_zero_flag'].sum()
        print(f"{col}: {zero_count} days ({zero_count / len(media_pd_df) * 100:.2f}%)")

print("Zero-clicks days per channel:")
for col in media_cols:
    # if column *_zero_flag has 'clicks' in the name, then do calculation
    if 'clicks' in col:
        zero_count = media_pd_df[f'{col}_zero_flag'].sum()
        print(f"{col}: {zero_count} days ({zero_count / len(media_pd_df) * 100:.2f}%)")



# Filter for zero-spend days where new_customers > 0
zero_spend_with_customers = df[(df['no_spend_day'] == 1) & (df['new_customers'] > 0)]

# # Show counts and proportions
# total_zero_spend = df['no_spend_day'].sum()
# rows_with_customers = len(zero_spend_with_customers)

# print("\nOverall zero-spend days across all channels:")
# print(f"Zero-spend days with new customers: {rows_with_customers}")
# print(f"Total zero-spend days: {total_zero_spend}")
# print(f"Percentage of zero-spend days with customers: {rows_with_customers / total_zero_spend:.2%}")


[0;31m---------------------------------------------------------------------------[0m
[0;31mKeyError[0m                                  Traceback (most recent call last)
File [0;32m/databricks/python/lib/python3.12/site-packages/pandas/core/indexes/base.py:3802[0m, in [0;36mIndex.get_loc[0;34m(self, key, method, tolerance)[0m
[1;32m   3801[0m [38;5;28;01mtry[39;00m:
[0;32m-> 3802[0m     [38;5;28;01mreturn[39;00m [38;5;28mself[39m[38;5;241m.[39m_engine[38;5;241m.[39mget_loc(casted_key)
[1;32m   3803[0m [38;5;28;01mexcept[39;00m [38;5;167;01mKeyError[39;00m [38;5;28;01mas[39;00m err:

File [0;32m/databricks/python/lib/python3.12/site-packages/pandas/_libs/index.pyx:138[0m, in [0;36mpandas._libs.index.IndexEngine.get_loc[0;34m()[0m

File [0;32m/databricks/python/lib/python3.12/site-packages/pandas/_libs/index.pyx:165[0m, in [0;36mpandas._libs.index.IndexEngine.get_loc[0;34m()[0m

File [0;32mpandas/_libs/hashtable_class_helper.pxi:5745[0m, in [0;

In [0]:
# Go back to Spark df: Fill in missing columns in google_media_df with 0 for no-spend and inactive media days 
google_media_df = google_media_df.na.fill(0)

# Fill in missing in meta_media_df with 0 for no-spend and inactive media days
meta_media_df = meta_media_df.na.fill(0)

print(f"Rows: {google_media_df.count()}, Columns: {len(google_media_df.columns)}")
print(f"Rows: {google_media_df.count()}, Columns: {len(google_media_df.columns)}")

## Check for Duplicates


In [0]:
# Check duplicates using pandas dataframe

display(google_pd_df.duplicated().value_counts())
display(meta_pd_df.duplicated().value_counts())
display(internal_pd_df.duplicated().value_counts())

In [0]:
# Fetch Holiday Data from Nager API using internal file min and max dates and store raw JSON load into bronze folder
# Get date range from internal data
min_date = internal_df.agg({"date_day": "min"}).collect()[0][0]
max_date = internal_df.agg({"date_day": "max"}).collect()[0][0]

start_year = min_date.year
end_year = max_date.year

# Set container for holidays so we don't get duplicates
fetched_holidays = set()

excluded_holidays = ["Good Friday", "Lincoln's Birthday", "Truman Day"]

for year in range(start_year, end_year + 1):
    try:
        url = f'https://date.nager.at/api/v3/PublicHolidays/{year}/US'
        response = requests.get(url)
        response.raise_for_status()
        holidays = response.json()

        for h in holidays:
            if h["name"] not in excluded_holidays:
                fetched_holidays.add(datetime.datetime.strptime(h['date'], "%Y-%m-%d").date())

    except requests.exceptions.RequestException as e:
        print(f"Error fetching data for {year}: {e}")
    
    # Print completed fetch request and year of holiday
    print(f'Completed holiday fetch request for {year}')

# Save holiday array as JSON to bronze folder
holiday_path = "/dbfs/mnt/capstone/bronze/nager_api/public_holidays.json"
with open(holiday_path, 'w') as f:
    json.dump(holiday_array, f)

# Sort list
fetched_holidays = sorted(fetched_holidays)

print(fetched_holidays)


In [0]:
# Save cleaned holidays data as JSON file for silver stage
cleaned_holiday_path = "/dbfs/mnt/capstone/silver/holiday_cleaned/cleaned_public_holidays.json"
with open(cleaned_holiday_path, "w") as f:
    # Write each holiday date as a separate line in the file for reading in spark in later stages
     for d in fetched_holidays:
        json.dump({"holiday_date": d.isoformat()}, f)
        f.write("\n")

print(f"Saved cleaned holidays to {cleaned_holiday_path}")

In [0]:
# Save cleaned data as csv in silver folder

# Array must stay in same order since they make pairs using zip
silver_files = ['google', 'meta', 'internal']
df_list = [google_media_df, meta_media_df, internal_df]

for df_name, endpoint in zip(df_list, silver_files):
    path_name = f'/mnt/capstone/silver/{endpoint}'
    df_name.write.format('delta').mode('overwrite').save(path_name)

    print(f'Saved cleaned {endpoint} data to {path_name}')
