# Spark Connect Interactive Notebook

This notebook demonstrates how to use Spark Connect for interactive data exploration and feature engineering at DoorDash.

## Prerequisites

```bash
pip install pyspark[connect] pandas
```

## Key Benefits

- **No local Spark installation** - Just pip install the thin client
- **Unity Catalog access** - Pre-configured catalog access
- **Team isolation** - Each team gets their own cluster
- **Environment separation** - dev/staging/prod clusters

## 1. Connect to Spark

In [None]:
# Option 1: Using our helper library
import sys
sys.path.insert(0, '../../spark-connect-client/python')
from spark_connect_client import SparkConnectClient

spark = SparkConnectClient.create_session(
    team="feature-engineering",
    environment="dev",
    region="us-west-2",
    app_name="interactive-exploration"
)

print(f"Connected to Spark: {spark.version}")

In [None]:
# Option 2: Direct connection (if you know the endpoint)
from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .remote("sc://feature-eng-dev-uswest2.doordash.team:15002") \
#     .appName("interactive-exploration") \
#     .getOrCreate()

## 2. Explore Unity Catalog

In [None]:
# List available catalogs
spark.sql("SHOW CATALOGS").show()

In [None]:
# List databases in pedregal catalog
spark.sql("SHOW DATABASES IN pedregal").show()

In [None]:
# List tables in the raw database
spark.sql("SHOW TABLES IN pedregal.raw").show()

In [None]:
# Describe a table
spark.sql("DESCRIBE TABLE pedregal.raw.events").show(truncate=False)

## 3. Query Data

In [None]:
# Sample data from events table
events_df = spark.sql("""
    SELECT *
    FROM pedregal.raw.events
    WHERE ds = '2024-01-01'
    LIMIT 10
""")

events_df.show()

In [None]:
# Count events by type
event_counts = spark.sql("""
    SELECT
        event_type,
        COUNT(*) as count,
        COUNT(DISTINCT user_id) as unique_users
    FROM pedregal.raw.events
    WHERE ds = '2024-01-01'
    GROUP BY event_type
    ORDER BY count DESC
""")

event_counts.show()

In [None]:
# Convert to Pandas for visualization
event_counts_pd = event_counts.toPandas()
event_counts_pd

## 4. Feature Engineering

In [None]:
# Compute user-level features
user_features = spark.sql("""
    WITH user_events AS (
        SELECT
            user_id,
            event_type,
            event_timestamp,
            properties
        FROM pedregal.raw.events
        WHERE ds >= date_sub(current_date(), 7)
          AND user_id IS NOT NULL
    )
    SELECT
        user_id,
        
        -- Activity features
        COUNT(*) as total_events_7d,
        COUNT(DISTINCT DATE(event_timestamp)) as active_days_7d,
        
        -- Event type features
        SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) as views_7d,
        SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) as clicks_7d,
        SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchases_7d,
        
        -- Derived features
        CASE
            WHEN SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) > 0
            THEN ROUND(
                SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) * 1.0 /
                SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END), 4
            )
            ELSE 0
        END as ctr_7d
        
    FROM user_events
    GROUP BY user_id
""")

print(f"Computed features for {user_features.count()} users")
user_features.show()

In [None]:
# Feature statistics
user_features.describe().show()

In [None]:
# Export to Pandas for model training
features_pd = user_features.toPandas()
print(f"Exported {len(features_pd)} rows to Pandas")
features_pd.head()

## 5. Write Results Back to Unity Catalog

In [None]:
# Add date partition
from pyspark.sql.functions import current_date, lit

user_features_with_date = user_features.withColumn("ds", current_date())
user_features_with_date.show(5)

In [None]:
# Write to feature store (create or append)
# Note: In dev environment, we use a dev-specific table

user_features_with_date.write \
    .format("iceberg") \
    .mode("overwrite") \
    .option("partitionOverwriteMode", "dynamic") \
    .saveAsTable("pedregal.feature_store_dev.user_features_exploration")

print("Features written to Unity Catalog!")

## 6. Using DataFrame API

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Read events using DataFrame API
events = spark.table("pedregal.raw.events") \
    .filter(F.col("ds") == "2024-01-01") \
    .filter(F.col("user_id").isNotNull())

# Compute running totals per user
window_spec = Window.partitionBy("user_id").orderBy("event_timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

events_with_running = events \
    .withColumn("event_number", F.row_number().over(Window.partitionBy("user_id").orderBy("event_timestamp"))) \
    .withColumn("running_events", F.count("*").over(window_spec))

events_with_running.show()

## 7. Cleanup

In [None]:
# Stop the Spark session when done
spark.stop()
print("Spark session stopped.")

---

## Tips for Interactive Development

1. **Use `.cache()` wisely** - Cache DataFrames you'll reuse multiple times
2. **Use `.limit()` for exploration** - Don't pull full tables unnecessarily
3. **Use `.explain()` to understand query plans** - Optimize before running expensive queries
4. **Use dev environment for exploration** - Switch to staging/prod for validation
5. **Stop your session when done** - Free up cluster resources for others