# Guide to Snowpark Connect for Apache Spark

This comprehensive guide covers everything you need to know about Snowpark Connect for Apache Spark - from fundamentals to advanced patterns.

## What You'll Learn:
- What is Snowpark Connect and how it works
- Quick start and initialization
- Data I/O and transformation patterns
- Performance considerations and best practices

## What is Snowpark Connect?

Snowpark Connect allows you to run the **PySpark DataFrame API** on **Snowflake infrastructure** - no Spark cluster needed!

### Key Concepts:

**Execution Model:**
- Your DataFrame operations are translated to Snowflake SQL
- Computation happens in Snowflake warehouses (not Spark executors)
- Results stream back via Apache Arrow format
- No Spark cluster, driver, or executors required

**Query Pushdown:**
- ✅ **Fully Optimized:** DataFrame operations, SQL functions, aggregations push down to Snowflake
- ⚠️ **Performance Impact:** Python UDFs run client-side (fetch data → process → send back)
- 💡 **Better Alternative:** Use built-in SQL functions instead of UDFs

### Feature Support Matrix:

Understanding what PySpark features are supported helps you write efficient code.

#### ✅ Fully Supported DataFrame Operations:
- `select`, `filter`, `where`
- `groupBy`, `agg` (all aggregation functions)
- `join` (inner, left, right, outer, broadcast)
- `orderBy`, `sort`
- `distinct`, `dropDuplicates`
- Window functions (`row_number`, `rank`, `lag`, `lead`, etc.)
- Built-in functions (95%+ coverage)
- `cache`, `persist` (creates temp tables in Snowflake)

#### ⚠️ Limited Support:
- `repartition` (logical operation only)
- `coalesce` (similar to repartition)
- Python UDFs (work but slow - avoid if possible)
- Pandas UDFs (work but slow - avoid if possible)
- MLlib (partial - transformers work, estimators limited)

#### ❌ NOT Supported:
- RDD API completely
- `.rdd`, `.foreach()`, `.foreachPartition()`
- Structured Streaming
- GraphX
- Custom data sources
- `.checkpoint()`

### Data Types Support

**✅ Supported:**
- String, Integer, Long, Float, Double, Decimal
- Boolean, Date, Timestamp
- Array, Map, Struct
- Binary

**❌ Not Supported:**
- DayTimeIntervalType
- YearMonthIntervalType
- UserDefinedTypes

#### Supported File Formats:
- ✅ Parquet, CSV, JSON, Avro, ORC
- ❌ Delta Lake, Hudi not supported

## Quick Start

Let's get up and running with Snowpark Connect!

In [None]:
# Step 1: Initialize Spark Session with Snowflake
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("SnowparkConnect-Guide") \
    .config("spark.sql.catalog.snowflake", "com.snowflake.snowpark.SnowflakeCatalog") \
    .config("sfURL", "your-account.snowflakecomputing.com") \
    .config("sfUser", "your_username") \
    .config("sfPassword", "your_password") \
    .config("sfDatabase", "MY_DATABASE") \
    .config("sfSchema", "MY_SCHEMA") \
    .config("sfWarehouse", "MY_WAREHOUSE") \
    .getOrCreate()

print("✅ Spark session initialized successfully!")
print(f"Spark version: {spark.version}")


In [None]:
# Step 2: Run Your First Query
# Read a table from Snowflake
df = spark.table("MY_TABLE")

# Standard PySpark operations
result = df.filter(col("status") == "active") \
    .groupBy("category") \
    .agg(count("*").alias("count"), 
         sum("amount").alias("total_amount")) \
    .orderBy(desc("total_amount"))

# Display results
result.show()

print("✅ First query executed successfully!")


In [None]:
# Step 3: Verify Execution with Explain
# Use explain() to see how your query is executed
result.explain(mode="extended")

# This shows you:
# - Logical plan: What you want to compute
# - Optimized plan: How Snowflake will execute it
# - Physical plan: The actual execution strategy


In [None]:
# ===== READING DATA =====

# 1. Read from Snowflake Tables
# Simple table reference
df1 = spark.table("MY_TABLE")

# Fully qualified name (database.schema.table)
df2 = spark.table("MY_DB.MY_SCHEMA.MY_TABLE")

# Using SQL
df3 = spark.sql("SELECT * FROM MY_TABLE WHERE status = 'active'")

print("✅ Read from Snowflake tables")


In [None]:
# 2. Read from Snowflake Stages

# Internal stage - Parquet
df_parquet = spark.read.parquet("@my_stage/path/to/data/")

# External stage - CSV
df_csv = spark.read.csv("@external_stage/file.csv")

# S3 via external stage
df_s3 = spark.read.parquet("@s3_stage/prefix/")

# JSON with options
df_json = spark.read \
    .option("multiLine", "true") \
    .json("@my_stage/data.json")

print("✅ Read from Snowflake stages")


In [None]:
# ===== WRITING DATA =====

# 1. Write to Tables

# Overwrite existing table
df.write.mode("overwrite").saveAsTable("MY_OUTPUT_TABLE")

# Append to existing table
df.write.mode("append").saveAsTable("MY_OUTPUT_TABLE")

# Create if not exists (skip if exists)
df.write.mode("ignore").saveAsTable("MY_OUTPUT_TABLE")

# Error if table exists (default)
df.write.saveAsTable("MY_NEW_TABLE")

print("✅ Write to Snowflake tables")


In [None]:
# 2. Write to Stages

# Write Parquet to internal stage
df.write.parquet("@my_stage/output/")

# Write CSV with options
df.write \
    .option("header", "true") \
    .option("compression", "gzip") \
    .csv("@my_stage/output/data.csv")

# Write with partitioning
df.write \
    .partitionBy("year", "month") \
    .parquet("@my_stage/partitioned_data/")

print("✅ Write to Snowflake stages")


In [None]:
# Examples of Fully Supported Operations

# 1. Select, filter, aggregation
result = spark.table("ORDERS") \
    .select("order_id", "customer_id", "amount", "order_date") \
    .filter(col("amount") > 100) \
    .groupBy("customer_id") \
    .agg(
        count("*").alias("order_count"),
        sum("amount").alias("total_spent"),
        avg("amount").alias("avg_order_value")
    )

# 2. Joins
customers = spark.table("CUSTOMERS")
orders = spark.table("ORDERS")

joined = customers.join(orders, "customer_id") \
    .select(
        customers["name"],
        orders["order_id"],
        orders["amount"]
    )

# 3. Window functions
from pyspark.sql.window import Window

window_spec = Window.partitionBy("customer_id").orderBy("order_date")

ranked_orders = orders \
    .withColumn("order_rank", row_number().over(window_spec)) \
    .withColumn("running_total", sum("amount").over(window_spec))

print("✅ All fully supported operations work efficiently!")


In [None]:

# Example: Time-series Analysis
orders = spark.table("ORDERS")

# Analyze sales trends over time
daily_sales = orders \
    .withColumn("order_date", to_date(col("order_timestamp"))) \
    .groupBy("order_date") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("amount").alias("daily_revenue"),
        avg("amount").alias("avg_order_value"),
        max("amount").alias("max_order"),
        min("amount").alias("min_order")
    ) \
    .orderBy(desc("order_date"))

print("✅ Time-series analysis example")

In [None]:
# Example: Menu Item Performance
# TODO - update with actual APIs and string literals

menu_performance = order_detail \
    .join(order_header, "ORDER_ID") \
    .join(menu, "MENU_ITEM_ID") \
    .groupBy("MENU_ITEM_NAME", "ITEM_CATEGORY") \
    .agg(
        count("ORDER_DETAIL_ID").alias("times_ordered"),
        sum("QUANTITY").alias("total_quantity"),
        sum("PRICE").alias("total_revenue"),
        avg("PRICE").alias("avg_price")
    ) \
    .withColumn("revenue_per_order", col("total_revenue") / col("times_ordered")) \
    .orderBy(desc("total_revenue"))

print("🍔 Top Menu Items by Revenue:")
menu_performance.show(15)

# Time-based analysis with date functions
seasonal_trends = order_header \
    .withColumn("month", month(col("ORDER_TS"))) \
    .withColumn("day_of_week", dayofweek(col("ORDER_TS"))) \
    .withColumn("hour", hour(col("ORDER_TS"))) \
    .groupBy("month", "day_of_week") \
    .agg(
        count("ORDER_ID").alias("order_count"),
        avg("ORDER_AMOUNT").alias("avg_amount")
    ) \
    .orderBy("month", "day_of_week")

print("📈 Seasonal Ordering Patterns:")
seasonal_trends.show()

## Performance Considerations

Follow these best practices to get optimal performance from Snowpark Connect.

### 1. Use SQL Functions Over UDFs
Python UDFs require data to be transferred to the client, processed, and sent back - this is 10-100x slower than native operations!

### 2. Broadcast Joins for Small Tables
When joining a large table with a small dimension table, use `broadcast()` to optimize the join.

### 3. Cache Frequently Accessed DataFrames
Caching creates temporary tables in Snowflake for faster repeated access. Remember to `unpersist()` when done!

### 4. Minimize Data Movement
Process data in Snowflake and only transfer final results. Avoid `collect()` on large datasets!

### 5. Partition Awareness
Filter on partitioned columns to enable partition pruning and reduce data scanned.


## Additional Resources

**Official Documentation:**
- [Snowflake Documentation](https://docs.snowflake.com/)
- [PySpark API Reference](https://spark.apache.org/docs/latest/api/python/)

**Next Steps:**
- Experiment with your own data
- Explore advanced window functions
- Try complex multi-table joins
- Optimize queries with explain plans

In [None]:
# Final cleanup and session management
# When you're done, clean up cached data
spark.catalog.clearCache()

print("✅ Guide complete! You're ready to use Snowpark Connect for Apache Spark!")
print("🎯 Remember: Use SQL functions, cache wisely, and process in Snowflake!")