In [9]:
# ============================================================================
# IMPORTS
# ============================================================================
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, max, lit, current_timestamp, current_date, explode_outer
)
from pyspark.sql.types import NullType, StringType, StructType, ArrayType
from datetime import datetime, timedelta
import os
import re

# ============================================================================
# SPARK SESSION
# ============================================================================
spark = (
    SparkSession.builder
    .appName("MailshakeCampaignCurations")
    .config(
        "spark.driver.extraClassPath",
        "/opt/spark/jars/hadoop-aws-3.3.4.jar:/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
    )
    .config(
        "spark.executor.extraClassPath",
        "/opt/spark/jars/hadoop-aws-3.3.4.jar:/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar"
    )
    .getOrCreate()
)

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# ============================================================================
# S3 CONFIG
# ============================================================================
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID"))
hadoop_conf.set("fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY"))
hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

# ============================================================================
# CONFIG
# ============================================================================
RAW_PATH = "s3a://mailshake-analytics/raw"
CURATED_PATH = "s3a://mailshake-analytics/curated"
CLIENT_IDS = ["client_1", "client_2", "client_3"]

SINGLE_DATE = "2025-12-25"   # set None for incremental
BOOTSTRAP_DATE = "2025-12-20"

# ============================================================================
# HELPERS
# ============================================================================

def sanitize_column_names(df):
    """Replace invalid characters and dots in column names with underscores"""
    for col_name in df.columns:
        clean = re.sub(r'[^a-zA-Z0-9_]', '_', col_name)
        clean = re.sub(r'_+', '_', clean).lower()
        if clean != col_name:
            df = df.withColumnRenamed(col_name, clean)
    return df

def fix_void_columns(df):
    """Cast NullType columns so Parquet can write."""
    for field in df.schema.fields:
        if isinstance(field.dataType, NullType):
            df = df.withColumn(field.name, col(field.name).cast(StringType()))
    return df



def ensure_columns_and_reorder(df: DataFrame, column_order: list, column_types: dict = None) -> DataFrame:
    """
    Ensure all columns exist (with specified data types if provided) and reorder DataFrame.
    
    Args:
        df (DataFrame): input DataFrame
        column_order (list): list of columns in the desired order
        column_types (dict, optional): dictionary of {column_name: DataType} for missing columns
    Returns:
        DataFrame with all columns present and reordered
    """
    column_types = column_types or {}

    for col_name in column_order:
        if col_name not in df.columns:
            dtype = column_types.get(col_name)
            if dtype:
                df = df.withColumn(col_name, lit(None).cast(dtype))
            else:
                df = df.withColumn(col_name, lit(None))
            print(f"‚ö†Ô∏è Adding missing column: {col_name}")

    # Select columns in the desired order
    df = df.select([col(c) for c in column_order])
    return df


def flatten_struct_columns(df):
    """Flatten all StructType columns and explode arrays of structs."""
    while True:
        struct_cols = [
            field.name
            for field in df.schema.fields
            if isinstance(field.dataType, StructType)
        ]

        if not struct_cols:
            break

        for col_name in struct_cols:
            for nested in df.schema[col_name].dataType.fields:
                df = df.withColumn(
                    f"{col_name}_{nested.name}",
                    col(f"{col_name}.{nested.name}")
                )
            df = df.drop(col_name)

    # Explode array<struct> columns
    array_struct_cols = [
        field.name
        for field in df.schema.fields
        if isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType)
    ]

    for col_name in array_struct_cols:
        df = df.withColumn(col_name, explode_outer(col(col_name)))
        for nested in df.schema[col_name].dataType.fields:
            df = df.withColumn(f"{col_name}_{nested.name}", col(f"{col_name}.{nested.name}"))
        df = df.drop(col_name)

    return df

def get_dates_to_process(curated_path, client_ids, single_date=None, bootstrap_date="2025-12-20"):
    if single_date:
        print(f"üìÖ Single-date mode enabled: {single_date}")
        return {c: [single_date] for c in client_ids}

    dates = {}
    try:
        existing = spark.read.parquet(curated_path)
        last_dates = (
            existing.groupBy("client_id")
            .agg(max("source_date").alias("last_date"))
            .collect()
        )
        last_map = {r["client_id"]: r["last_date"] for r in last_dates}
    except Exception:
        last_map = {}

    today = datetime.today()
    for client in client_ids:
        start = (
            datetime.strptime(str(last_map[client]), "%Y-%m-%d") + timedelta(days=1)
            if client in last_map
            else datetime.strptime(bootstrap_date, "%Y-%m-%d")
        )
        dates[client] = [
            (start + timedelta(days=i)).strftime("%Y-%m-%d")
            for i in range((today - start).days + 1)
        ]
    return dates

# ============================================================================
# DATASET PROCESSOR
# ============================================================================
def process_dataset(
    raw_base_path,
    curated_base_path,
    client_ids,
    dataset_name,
    unique_keys,
    explode_col=None,
    dates_per_client=None,
    desired_columns =None,
    column_types=None
):
    entity_path = f"{curated_base_path}/entity={dataset_name}"

    for client_id in client_ids:
        for process_date in dates_per_client.get(client_id, []):
            input_path = (
                f"{raw_base_path}/client_id={client_id}/entity={dataset_name}/date={process_date}/"
            )
            try:
                print(f"üìÇ {dataset_name} | {client_id} | {process_date}")

                df = spark.read.parquet(input_path)

                df = flatten_struct_columns(df)   # flatten structs and arrays first

                if explode_col:
                    explode_col = explode_col.replace(".", "_")
                    if explode_col in df.columns:
                        df = df.withColumn(explode_col, explode_outer(col(explode_col)))

                # Sanitize column names
                df = sanitize_column_names(df)

                # Ensure all columns exist and reorder
                df = ensure_columns_and_reorder(df, desired_columns,column_types)

                # Add metadata
                df = (
                    df.withColumn("client_id", lit(client_id))
                      .withColumn("source_date", lit(process_date))
                      .withColumn("client_id_col", lit(client_id))
                      .withColumn("source_date_col", lit(process_date))
                      .withColumn("processing_timestamp", current_timestamp())
                      .withColumn("processing_date", current_date())
                      .withColumn("load_type", lit("single_date" if SINGLE_DATE else "incremental"))
                )

               # # Add missing columns
               # if required_columns:
                #    df = add_explicit_missing_columns(df, required_columns)

                # Deduplicate
                safe_keys = [k.replace(".", "_") for k in unique_keys]
                df = df.dropDuplicates(safe_keys + ["client_id", "source_date"])

                df = fix_void_columns(df)

                # Write
                df.write.mode("overwrite").partitionBy("client_id", "source_date").parquet(entity_path)
                print(f"‚úÖ Written {df.count()} records")

            except Exception as e:
                print(f"‚ö†Ô∏è Skipped {dataset_name} | {client_id} | {process_date}: {e}")

# ============================================================================
# RUN
# ============================================================================
dates_per_client = get_dates_to_process(CURATED_PATH, CLIENT_IDS, single_date=SINGLE_DATE, bootstrap_date=BOOTSTRAP_DATE)

# -------------------- activity_open --------------------
process_dataset(
    RAW_PATH, CURATED_PATH, CLIENT_IDS,
    "activity_open",
    unique_keys=["id", "recipient.id", "campaign.id"],
    dates_per_client=dates_per_client,
    desired_columns = [
            "object", "id", "actiondate", "isduplicate", "recipient_object",
            "recipient_id", "recipient_emailaddress", "recipient_fullname",
            "recipient_created", "recipient_ispaused", "recipient_contactid",
            "recipient_first", "recipient_last", "recipient_fields_link",
            "recipient_fields_status", "recipient_fields_first",
            "recipient_fields_position", "recipient_fields_date_applied",
            "recipient_fields_account", "recipient_fields_phonenumber",
            "recipient_fields_facebookurl", "recipient_fields_instagramid",
            "recipient_fields_linkedinurl", "recipient_fields_twitterid",
            "campaign_object", "campaign_id", "campaign_title", "campaign_wizardstatus",
            "parent_object", "parent_id", "parent_type", "parent_message_object",
            "parent_message_id", "parent_message_type", "parent_message_subject",
            "parent_message_replytoid"
        ],
            # Optional: specify data types for missing columns
        column_types = {
            "recipient_fields_status": StringType(),
            "recipient_fields_first": StringType()
        }
)

# -------------------- activity_reply --------------------
process_dataset(
    RAW_PATH, CURATED_PATH, CLIENT_IDS,
    "activity_reply",
    unique_keys=["id", "recipient.id", "campaign.id"],
    dates_per_client=dates_per_client,
    desired_columns = [
            "object", "id", "actiondate",
            "type", "subject", "externalid",
            "externalrawmessageid", "externalconversationid", "rawbody",
            "body", "plaintextbody", "recipient_object",
            "recipient_id", "recipient_emailaddress", "recipient_fullname",
            "recipient_created", "recipient_ispaused", "recipient_contactid",
            "recipient_first", "recipient_last", "recipient_fields_link",
            "recipient_fields_status",  "recipient_fields_first", "recipient_fields_position", "recipient_fields_date_applied",
            "recipient_fields_account", "recipient_fields_phonenumber", "recipient_fields_facebookurl",
            "recipient_fields_instagramid", "recipient_fields_linkedinurl", "recipient_fields_twitterid",
            "campaign_object", "campaign_id", "campaign_title",
            "campaign_wizardstatus", "parent_object", "parent_id",
            "parent_type", "parent_message_object", "parent_message_id",
            "parent_message_type", "parent_message_subject", "parent_message_replytoid",
            "from_object", "from_address", "from_fullname",
            "from_first", "from_last"
            ],
    column_types ={"recipient_fields_status": StringType(),
                     "recipient_fields_first": StringType()}
)




# -------------------- activity_sent --------------------
process_dataset(
    RAW_PATH, CURATED_PATH, CLIENT_IDS,
    "activity_sent",
    unique_keys=["id", "recipient.id", "campaign.id"],
    explode_col="to",
    dates_per_client=dates_per_client,
    desired_columns =
                    # Core
                    ["object", "id", "actiondate",
                    "type", "excludebody",
                    # To (exploded)
                    "to_address", "to_first", "to_fullname",
                    "to_last", "to_object",
                    # Message content
                    "subject", "externalid", "externalrawmessageid",
                    "externalconversationid", "rawbody", "body",
                    "plaintextbody",
                    # Recipient
                    "recipient_object", "recipient_id", "recipient_emailaddress",
                    "recipient_fullname", "recipient_created", "recipient_ispaused",
                    "recipient_first", "recipient_last",
                    # Recipient fields
                    "recipient_fields_account", "recipient_fields_phonenumber",
                    "recipient_fields_facebookurl", "recipient_fields_instagramid",
                    "recipient_fields_linkedinurl", "recipient_fields_twitterid",
                    "recipient_fields_link", "recipient_fields_position",
                    "recipient_fields_date_applied", "recipient_fields_status",
                    # Campaign
                    "campaign_object", "campaign_id",
                    "campaign_title", "campaign_wizardstatus",
                    # Message (parent)
                    "message_object", "message_id", "message_type",
                    "message_subject", "message_replytoid",
                    # From
                    "from_object", "from_address", "from_fullname",
                    "from_first", "from_last"],
      column_types={"recipient_fields_status": StringType()}
)

# -------------------- created_leads --------------------
process_dataset(
    RAW_PATH, CURATED_PATH, CLIENT_IDS,
    "created_leads",
    unique_keys=["id", "recipient.id", "campaign.id"],
    dates_per_client=dates_per_client,
    desired_columns = [
    "object", "id", "created",
    "openeddate", "laststatuschangedate", "annotation",
    "status",

    "recipient_object", "recipient_id", "recipient_emailaddress",
    "recipient_fullname", "recipient_created", "recipient_ispaused",
    "recipient_contactid", "recipient_first", "recipient_last",

    "recipient_fields_link", "recipient_fields_first",
    "recipient_fields_status", "recipient_fields_position",
    "recipient_fields_date_applied", "recipient_fields_account",
    "recipient_fields_phonenumber", "recipient_fields_facebookurl",
    "recipient_fields_instagramid", "recipient_fields_linkedinurl",
    "recipient_fields_twitterid",

    "campaign_object", "campaign_id",
    "campaign_title", "campaign_wizardstatus",

    "assignedto_object", "assignedto_id",
    "assignedto_emailaddress", "assignedto_fullname",
    "assignedto_first", "assignedto_last"
]       ,
    column_types={  "recipient_fields_status": StringType(),
                    "recipient_fields_first": StringType(),
                    "assignedto_object": StringType(),
                    "assignedto_id": DoubleType(),
                    "assignedto_emailaddress": StringType(),
                    "assignedto_fullname": StringType(),
                    "assignedto_first": StringType(),
                    "assignedto_last": StringType()
}
)

spark.stop()
print("üéâ All datasets processed successfully!")


üìÖ Single-date mode enabled: 2025-12-25
üìÇ activity_sent | client_1 | 2025-12-25
‚ö†Ô∏è Adding missing column: recipient_fields_status


                                                                                

‚úÖ Written 25 records
üìÇ activity_sent | client_2 | 2025-12-25


                                                                                

‚úÖ Written 25 records
üìÇ activity_sent | client_3 | 2025-12-25
‚ö†Ô∏è Adding missing column: recipient_fields_status


                                                                                

‚úÖ Written 25 records
üìÇ created_leads | client_1 | 2025-12-25
‚ö†Ô∏è Adding missing column: recipient_fields_first
‚ö†Ô∏è Adding missing column: recipient_fields_status
‚ö†Ô∏è Adding missing column: assignedto_object
‚ö†Ô∏è Adding missing column: assignedto_id
‚ö†Ô∏è Adding missing column: assignedto_emailaddress
‚ö†Ô∏è Adding missing column: assignedto_fullname
‚ö†Ô∏è Adding missing column: assignedto_first
‚ö†Ô∏è Adding missing column: assignedto_last


                                                                                

‚úÖ Written 100 records
üìÇ created_leads | client_2 | 2025-12-25
‚ö†Ô∏è Adding missing column: recipient_fields_first
‚ö†Ô∏è Adding missing column: assignedto_object
‚ö†Ô∏è Adding missing column: assignedto_id
‚ö†Ô∏è Adding missing column: assignedto_emailaddress
‚ö†Ô∏è Adding missing column: assignedto_fullname
‚ö†Ô∏è Adding missing column: assignedto_first
‚ö†Ô∏è Adding missing column: assignedto_last


                                                                                

‚úÖ Written 43 records
üìÇ created_leads | client_3 | 2025-12-25
‚ö†Ô∏è Adding missing column: recipient_fields_status


                                                                                

‚úÖ Written 16 records
üéâ All datasets processed successfully!
