# 🧪 Customer 360 Data Platform with Spark + Delta Lake + AWS S3
This notebook demonstrates how to build a simple Customer 360 data pipeline using Delta Lake on AWS S3.

We'll simulate three layers:
- **Bronze**: Raw data ingestion
- **Silver**: Cleaned and enriched data
- **Gold**: Final Customer 360 dataset

In [None]:
# ✅ Spark Session with Delta and S3 access
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("Customer360") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.access.key", "<YOUR_ACCESS_KEY>") \
    .config("spark.hadoop.fs.s3a.secret.key", "<YOUR_SECRET_KEY>") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## 🥇 Step 1: Ingest CRM and Web Log data (Bronze layer)

In [None]:
crm_df = spark.createDataFrame([
    (1, "Alice", "alice@example.com"),
    (2, "Bob", "bob@example.com")
], ["customer_id", "name", "email"])

web_df = spark.createDataFrame([
    (1, "page_view", "2025-07-01"),
    (1, "purchase", "2025-07-02"),
    (2, "page_view", "2025-07-01")
], ["customer_id", "event", "event_date"])

# Save raw data to Bronze layer
crm_df.write.format("delta").mode("overwrite").save("s3a://mybucket/bronze/crm/")
web_df.write.format("delta").mode("overwrite").save("s3a://mybucket/bronze/web_logs/")

## 🥈 Step 2: Clean and standardize (Silver layer)

In [None]:
clean_crm = spark.read.format("delta").load("s3a://mybucket/bronze/crm/")
clean_web = spark.read.format("delta").load("s3a://mybucket/bronze/web_logs/")

clean_web = clean_web.filter("event_date IS NOT NULL")

clean_crm.write.format("delta").mode("overwrite").save("s3a://mybucket/silver/crm/")
clean_web.write.format("delta").mode("overwrite").save("s3a://mybucket/silver/web_logs/")

## 🥉 Step 3: Join and create Customer 360 (Gold layer)

In [None]:
from pyspark.sql.functions import count

crm = spark.read.format("delta").load("s3a://mybucket/silver/crm/")
web = spark.read.format("delta").load("s3a://mybucket/silver/web_logs/")

activity = web.groupBy("customer_id").agg(count("*").alias("total_events"))
customer_360 = crm.join(activity, on="customer_id", how="left").fillna(0)

customer_360.write.format("delta").mode("overwrite").save("s3a://mybucket/gold/customer_360/")

## 🔍 Step 4: Read final Customer 360 table

In [None]:
df = spark.read.format("delta").load("s3a://mybucket/gold/customer_360/")
df.show()