In [97]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, lit
import requests
import json
import datetime

StatementMeta(practice, 6, 98, Finished, Available, Finished)

In [98]:
# Initialize Spark Session
spark = SparkSession.builder.appName("FacebookAdsBackfillFactMetrics").getOrCreate()

# Define Storage Account & Container
storage_account_name = "learningstorage1093"
container_name = "learning"
stage = "bronze"
adls_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{stage}/facebook_ads/fact_facebook_ads_metrics/historical/"

StatementMeta(practice, 6, 99, Finished, Available, Finished)

In [99]:
# Facebook Ads API Config
API_KEY = "5baaf652028536e79b8ac9dfc72569ec781c"
BASE_URL = "https://connectors.windsor.ai/facebook"
ACCOUNT_ID = "2977132755882180"
# Selecting Fact Table Fields
FIELDS = "date,ad_id,adset_id,campaign_id,account_id,platform_position,publisher_platform,spend,impressions,clicks,frequency,actions_complete_registration,actions_lead,actions_video_view,actions_post_engagement,unique_link_clicks_ctr,video_thruplay_watched_actions_video_view"

StatementMeta(practice, 6, 100, Finished, Available, Finished)

In [100]:
# Define the Start and End Date for Historical Backfill
start_date = datetime.date(2023, 1, 1)  
end_date = datetime.date.today() - datetime.timedelta(days=1)    

StatementMeta(practice, 6, 101, Finished, Available, Finished)

In [101]:
# Placeholder for all daily data
all_dataframes = []

StatementMeta(practice, 6, 102, Finished, Available, Finished)

In [102]:
def fetch_facebook_ads_data(date_from, date_to):
    """Fetches Facebook Ads data for a specific date range"""
    params = {
        "api_key": API_KEY,
        "date_from": date_from,
        "date_to": date_to,
        "fields": FIELDS,
        "select_accounts": ACCOUNT_ID
    }
    
    response = requests.get(BASE_URL, params=params)

    # Ensure response is valid JSON and contains 'data'
    if response.status_code == 200:
        try:
            json_data = response.json()  
            if "data" in json_data:  
                return json_data["data"]
            else:
                print(f"No 'data' key found in response for {date_from} to {date_to}")
                return []
        except json.JSONDecodeError as e:
            print(f"JSON parsing error: {e}")
            return []
    else:
        print(f"Error fetching data for {date_from} to {date_to}: {response.text}")
        return []

StatementMeta(practice, 6, 103, Finished, Available, Finished)

In [103]:
from pyspark.sql.types import StructType, StructField, StringType

# Explicit Schema using StringType() to handle mixed int/float. We will change it in the silver stage
schema = StructType([
    StructField("date", StringType(), True),
    StructField("ad_id", StringType(), True),
    StructField("adset_id", StringType(), True),
    StructField("campaign_id", StringType(), True),
    StructField("account_id", StringType(), True),
    StructField("platform_position", StringType(), True),
    StructField("publisher_platform", StringType(), True),
    StructField("spend", StringType(), True),
    StructField("impressions", StringType(), True),
    StructField("clicks", StringType(), True),
    StructField("frequency", StringType(), True),
    StructField("actions_complete_registration", StringType(), True),  
    StructField("actions_lead", StringType(), True),  
    StructField("actions_video_view", StringType(), True),  
    StructField("actions_post_engagement", StringType(), True),  
    StructField("unique_link_clicks_ctr", StringType(), True),
    StructField("video_thruplay_watched_actions_video_view", StringType(), True)
])


StatementMeta(practice, 6, 104, Finished, Available, Finished)

In [104]:
# Loop through each day separately
current_date = start_date
while current_date <= end_date:
    date_str = current_date.strftime("%Y-%m-%d")
    print(f"📅 Fetching data for: {date_str}")

    # Fetch data for the specific day
    data = fetch_facebook_ads_data(date_str, date_str)

    if data:  # ✅ Check if data is available
        try:
            # ✅ Convert Data into DataFrame with String Schema
            df = spark.createDataFrame(data, schema=schema)

            # ✅ Convert all numeric fields to FloatType()
            numeric_columns = ["spend", "impressions", "clicks", "frequency", 
                               "actions_complete_registration", "actions_lead", 
                               "actions_video_view", "actions_post_engagement", 
                               "unique_link_clicks_ctr", "video_thruplay_watched_actions_video_view"]

            for col_name in numeric_columns:
                df = df.withColumn(col_name, col(col_name).cast("float"))

            # ✅ Add metadata columns
            df = df.withColumn("load_date", lit(datetime.datetime.now().isoformat())) \
                   .withColumn("source", lit("facebook_ads"))

            # ✅ Partitioning by Year/Month/Day
            df = df.withColumn("year", year(col("date"))) \
                   .withColumn("month", month(col("date"))) \
                   .withColumn("day", dayofmonth(col("date")))

            # ✅ Append to list
            all_dataframes.append(df)

        except Exception as e:
            print(f"❌ Error processing data for {date_str}: {e}")

    else:
        print(f"❌ No data found for {date_str}, skipping.")

    # Move to the next day
    current_date += datetime.timedelta(days=1)


StatementMeta(practice, 6, 105, Finished, Available, Finished)

📅 Fetching data for: 2023-01-01
📅 Fetching data for: 2023-01-02
📅 Fetching data for: 2023-01-03
📅 Fetching data for: 2023-01-04
📅 Fetching data for: 2023-01-05
📅 Fetching data for: 2023-01-06
📅 Fetching data for: 2023-01-07
📅 Fetching data for: 2023-01-08
📅 Fetching data for: 2023-01-09
📅 Fetching data for: 2023-01-10
📅 Fetching data for: 2023-01-11
📅 Fetching data for: 2023-01-12
📅 Fetching data for: 2023-01-13
📅 Fetching data for: 2023-01-14
📅 Fetching data for: 2023-01-15
📅 Fetching data for: 2023-01-16
📅 Fetching data for: 2023-01-17
📅 Fetching data for: 2023-01-18
📅 Fetching data for: 2023-01-19
📅 Fetching data for: 2023-01-20
📅 Fetching data for: 2023-01-21
📅 Fetching data for: 2023-01-22
📅 Fetching data for: 2023-01-23
📅 Fetching data for: 2023-01-24
📅 Fetching data for: 2023-01-25
📅 Fetching data for: 2023-01-26
📅 Fetching data for: 2023-01-27
📅 Fetching data for: 2023-01-28
📅 Fetching data for: 2023-01-29
📅 Fetching data for: 2023-01-30
📅 Fetching data for: 2023-01-31
📅 Fetchi

In [105]:
# Save All Data in a Single Write Operation
if all_dataframes:
    final_df = all_dataframes[0]  # Start with the first dataframe

    # Union all daily data into one dataframe
    for df in all_dataframes[1:]:
        final_df = final_df.union(df)

    # Save to ADLS (Partitioned by Year/Month/Day)
    print("🚀 Saving data to ADLS in a single batch...")
    final_df.write.mode("overwrite").partitionBy("year", "month", "day").json(adls_path)

    print(f"✅ Successfully saved data to {adls_path}")
else:
    print("❌ No data fetched. Skipping save operation.")

StatementMeta(practice, 6, 106, Finished, Available, Finished)

🚀 Saving data to ADLS in a single batch...
✅ Successfully saved data to abfss://learning@learningstorage1093.dfs.core.windows.net/bronze/facebook_ads/fact_facebook_ads_metrics/historical/
