# Sample ETL Pipeline - User Activity Analysis
This notebook demonstrates a typical PySpark ETL pipeline.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum as spark_sum, avg, when, broadcast
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName('UserActivityETL') \
    .config('spark.sql.shuffle.partitions', '200') \
    .getOrCreate()

In [None]:
# Read raw event logs from S3
events_df = spark.read.parquet('s3://data-lake/raw/events/')
users_df = spark.read.parquet('s3://data-lake/raw/users/')
products_df = spark.read.csv('s3://data-lake/raw/products.csv', header=True)

In [None]:
# Filter to last 30 days and remove bots
clean_events = events_df \
    .filter(col('event_date') >= '2025-01-01') \
    .filter(col('is_bot') == False) \
    .dropDuplicates(['event_id'])

In [None]:
# Join events with user data (potential skew on popular users)
enriched = clean_events.join(users_df, 'user_id', 'left') \
    .join(broadcast(products_df), 'product_id', 'left')

In [None]:
# Aggregate by user - causes shuffle
user_summary = enriched.groupBy('user_id', 'country') \
    .agg(
        count('event_id').alias('total_events'),
        spark_sum(when(col('event_type') == 'purchase', col('amount'))).alias('total_spend'),
        avg('session_duration').alias('avg_session')
    )

In [None]:
# Window function for ranking
from pyspark.sql.functions import row_number
window_spec = Window.partitionBy('country').orderBy(col('total_spend').desc())
ranked = user_summary.withColumn('rank', row_number().over(window_spec))

# Get top 100 spenders per country
top_spenders = ranked.filter(col('rank') <= 100)

In [None]:
# Write results
top_spenders.write \
    .mode('overwrite') \
    .partitionBy('country') \
    .parquet('s3://data-lake/curated/top_spenders/')

print(f'Wrote {top_spenders.count()} records')