In [0]:
# Databricks Notebook: Unified_Marketing_Sales_ETL
# Author: Rodger Roberts
# Description:
# Integrate multiple data sources (CRM, Ads, ERP) into a unified data model
# for enterprise-wide reporting and analytics.


In [0]:

#  Import Libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, trim, upper, countDistinct

spark = SparkSession.builder.appName("Unified_Marketing_Sales_ETL").getOrCreate()


In [0]:

# Load Raw Data from Multiple Sources (Ingested via FiveTran)

crm_df = spark.read.format("delta").load("/mnt/raw/crm/customers")
ads_df = spark.read.format("delta").load("/mnt/raw/ads/google_ads")
sales_df = spark.read.format("delta").load("/mnt/raw/erp/sales_orders")


In [0]:

# Basic Cleaning and Standardization

crm_df = crm_df.withColumn("email", trim(upper(col("email"))))
ads_df = ads_df.withColumn("campaign_name", trim(col("campaign_name")))
sales_df = sales_df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))


In [0]:

# Join Datasets on Common Keys (Customer → Campaign → Orders)

joined_df = (
    sales_df
    .join(crm_df, "customer_id", "left")
    .join(ads_df, sales_df.campaign_id == ads_df.campaign_id, "left")
)


In [0]:

# Data Quality Checks

dq_summary = joined_df.agg(
    countDistinct("customer_id").alias("unique_customers"),
    countDistinct("order_id").alias("unique_orders")
)
display(dq_summary)


In [0]:

# Transform to Unified Reporting Schema

report_df = joined_df.select(
    "customer_id",
    "customer_name",
    "campaign_name",
    "order_id",
    "order_date",
    "order_amount",
    when(col("order_amount") > 0, "Completed").otherwise("Pending").alias("order_status")
)


In [0]:

# Write to Curated Layer (Delta Lake)

report_df.write.format("delta").mode("overwrite").save("/mnt/curated/unified_sales_marketing")


In [0]:

#  Register as SQL Table for Power BI Access

spark.sql("""
CREATE OR REPLACE TABLE unified_sales_marketing
USING DELTA
LOCATION '/mnt/curated/unified_sales_marketing'
""")


In [0]:

# Result:
# Clean, standardized dataset ready for enterprise BI reporting (Power BI / Tableau)
# Demonstrates multi-source integration, data quality validation,
# and scalable transformation architecture.
