# Introduction to Snowpark Connect for Apache Spark

### What You'll Learn:
- How Snowpark Connect executes PySpark on Snowflake infrastructure
- Data ingestion patterns (tables, stages, cloud storage)
- Transformations, joins, and aggregations
- Writing data with partitioning and compression
- Building production pipelines with telemetry

## What is Snowpark Connect?

Snowpark Connect allows you to run the **PySpark DataFrame API** on **Snowflake infrastructure**.

In [None]:
# import packages
# initialize session
# print 

### Key Concepts:

**Execution Model:**
- Your DataFrame operations are translated to Snowflake SQL
- Computation happens in Snowflake warehouses
- Results stream back via Apache Arrow format
- No Spark cluster, driver, or executors

**Query Pushdown:**
- ‚úÖ **Fully Optimized:** DataFrame operations, SQL functions, aggregations push down to Snowflake
- ‚ö†Ô∏è **Performance Impact:** Python UDFs run client-side (fetch data ‚Üí process ‚Üí send back)
- üí° **Better Alternative:** Use built-in SQL functions instead of UDFs

## Data Ingestion

In [None]:
# =============================================================================
# DATA INGESTION METHODS
# =============================================================================
#
# [Import statements for session and data types go here]
#
# =============================================================================

# -----------------------------------------------------------------------------
# 1. READ FROM SNOWFLAKE TABLES
# -----------------------------------------------------------------------------
#
# [Read table by name]
#   - Fastest method - data is already in Snowflake, no file parsing
#
# [Execute SQL query and return results]
#   - Useful for filtering at read time to reduce data transfer
#   - Supports all Snowflake SQL syntax
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 2. READ FROM SNOWFLAKE STAGES
# -----------------------------------------------------------------------------
#
# PARQUET:
#   [Read parquet from stage path]
#
# CSV:
#   [Read CSV from stage path]
#   Important options to specify:
#     - Whether first row is a header
#     - Whether to auto-detect column types (avoid for large files!)
#     - Field delimiter character
#     - Quote and escape characters
#     - How to handle null values
#     - Date format pattern
#
# JSON:
#   [Read JSON from stage path]
#   Important options to specify:
#     - Whether JSON spans multiple lines (required if it does)
#     - Whether to allow comments
#     - Date format pattern
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 3. DIRECT CLOUD STORAGE ACCESS
# -----------------------------------------------------------------------------
#
# [Read from S3]
#   - Requires AWS credentials configured
#
# [Read from Google Cloud Storage]
#   - Requires GCP credentials configured
#
# [Read from Azure Blob Storage]
#   - Requires Azure credentials configured
#
# NOTE: Using Snowflake stages is often simpler (handles auth automatically)
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 4. READ MULTIPLE FILES
# -----------------------------------------------------------------------------
#
# [Read with wildcard patterns]
#   - Match all files of a type in a directory
#   - Match files with a naming pattern
#
# [Read recursively from nested directories]
#   - Searches all subdirectories
#
# -----------------------------------------------------------------------------

# =============================================================================
# HANDLING LARGE CSVs
# ============================================================================
#
# ‚ùå SLOW PATTERN:
#    [Read CSV with auto-detect types enabled]
#    Problem: Scans entire file just to guess column types
#
# ‚úÖ FAST PATTERN:
#    [Define schema with column names and types]
#    [Read CSV with predefined schema]
#    Benefit: No extra scan
#
# ‚úÖ COMPRESSED FILES:
#    [Read gzipped CSV or JSON files]
#    Benefit: Smaller transfer size, auto-decompressed on read
#
# ‚úÖ BEST PRACTICE - Convert to Parquet:
#    [Read CSV once with schema]
#    [Write out as Parquet]
#    [Read from Parquet for all future queries]
#    Benefit: Fast columnar reads for repeated analysis
#
# =============================================================================


## Data Transformations


In [None]:
# =============================================================================
# DATA TRANSFORMATIONS
# =============================================================================
#
# All transformations push down to Snowflake SQL - no data leaves the warehouse
# until you explicitly collect results
#
# =============================================================================

# -----------------------------------------------------------------------------
# 1. SELECTING AND FILTERING
# -----------------------------------------------------------------------------
#
# [Select specific columns from a DataFrame]
#   - Choose which columns to keep
#   - Can rename columns during selection
#
# [Filter rows based on conditions]
#   - Keep only rows matching criteria
#   - Combine multiple conditions with AND/OR logic
#
# [Remove duplicate rows]
#   - Based on all columns or specific subset
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 2. ADDING AND MODIFYING COLUMNS
# -----------------------------------------------------------------------------
#
# [Add a new column with calculated values]
#   - Derived from existing columns
#   - Can use built-in functions (math, string, date, etc.)
#
# [Rename existing columns]
#   - Single column or multiple at once
#
# [Cast column to different data type]
#   - String to number, timestamp to date, etc.
#
# [Replace null values with defaults]
#   - Fill missing data with specified values
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 3. AGGREGATIONS
# -----------------------------------------------------------------------------
#
# [Group rows by one or more columns]
#   - Foundation for all aggregate calculations
#
# [Apply aggregate functions to groups]
#   Common aggregations:
#     - Count rows or non-null values
#     - Sum numeric values
#     - Calculate average/mean
#     - Find min/max values
#     - Collect values into a list
#     - Count distinct values
#
# [Rename aggregated columns]
#   - Give meaningful names to results
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 4. JOINS
# -----------------------------------------------------------------------------
#
# [Join two DataFrames on matching columns]
#   Join types:
#     - Inner: only matching rows from both sides
#     - Left: all rows from left, matching from right
#     - Right: all rows from right, matching from left
#     - Outer/Full: all rows from both sides
#     - Cross: every combination (cartesian product)
#
# [Broadcast small tables for faster joins]
#   - Hint to replicate small table to all nodes
#   - Significantly faster when one side is small (< few MB)
#
# [Handle column name conflicts after join]
#   - Disambiguate when both tables have same column names
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 5. WINDOW FUNCTIONS
# -----------------------------------------------------------------------------
#
# [Define a window specification]
#   - Partition by: group rows for calculation
#   - Order by: sequence within each partition
#   - Frame: which rows to include in calculation
#
# [Ranking functions]
#   - Assign row numbers within partitions
#   - Rank with gaps for ties
#   - Dense rank without gaps
#   - Percentile rank
#
# [Analytic functions]
#   - Access previous row value (lag)
#   - Access next row value (lead)
#   - First/last value in window
#
# [Running calculations]
#   - Running sum/total
#   - Running average
#   - Running min/max
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 6. DATE AND TIME OPERATIONS
# -----------------------------------------------------------------------------
#
# [Extract components from timestamps]
#   - Year, month, day
#   - Hour, minute, second
#   - Day of week, day of year
#   - Week of year, quarter
#
# [Convert strings to dates/timestamps]
#   - Parse with specified format pattern
#
# [Date arithmetic]
#   - Add/subtract days, months, years
#   - Calculate difference between dates
#   - Truncate to start of period (month, week, etc.)
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 7. STRING OPERATIONS
# -----------------------------------------------------------------------------
#
# [Transform string case]
#   - Upper, lower, title case
#
# [Extract and manipulate substrings]
#   - Get portion of string by position
#   - Split string into array
#   - Concatenate multiple strings
#
# [Pattern matching and replacement]
#   - Check if string contains pattern
#   - Replace matching text
#   - Extract using regular expressions
#
# [Trim and pad strings]
#   - Remove leading/trailing whitespace
#   - Pad to fixed length
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 8. SORTING AND LIMITING
# -----------------------------------------------------------------------------
#
# [Sort results by one or more columns]
#   - Ascending or descending order
#   - Handle nulls first or last
#
# [Limit number of rows returned]
#   - Take first N rows
#   - Useful for previewing or top-N queries
#
# =============================================================================


### Feature Support Matrix:

Understanding what PySpark features are supported helps you write efficient code.

#### ‚úÖ Fully Supported DataFrame Operations:
- `select`, `filter`, `where`
- `groupBy`, `agg` (all aggregation functions)
- `join` (inner, left, right, outer, broadcast)
- `orderBy`, `sort`
- `distinct`, `dropDuplicates`
- Window functions (`row_number`, `rank`, `lag`, `lead`, etc.)
- Built-in functions (95%+ coverage)
- `cache`, `persist` (creates temp tables in Snowflake)

#### ‚ö†Ô∏è Limited Support:
- `repartition` (logical operation only)
- `coalesce` (similar to repartition)
- Python UDFs (work but slow - avoid if possible)
- Pandas UDFs (work but slow - avoid if possible)
- MLlib (partial - transformers work, estimators limited)

#### ‚ùå NOT Supported:
- RDD API completely
- `.rdd`, `.foreach()`, `.foreachPartition()`
- Structured Streaming
- GraphX
- Custom data sources
- `.checkpoint()`


### Data Types Support

**‚úÖ Supported:**
- String, Integer, Long, Float, Double, Decimal
- Boolean, Date, Timestamp
- Array, Map, Struct
- Binary

**‚ùå Not Supported:**
- DayTimeIntervalType
- YearMonthIntervalType
- UserDefinedTypes

#### Supported File Formats:
- ‚úÖ Parquet, CSV, JSON, Avro, ORC
- ‚ùå Delta Lake, Hudi not supported


In [None]:
# =============================================================================
# WRITING DATA
# =============================================================================
#
# Write transformed data back to Snowflake tables, stages, or cloud storage
#
# =============================================================================

# -----------------------------------------------------------------------------
# 1. WRITE TO SNOWFLAKE TABLES
# -----------------------------------------------------------------------------
#
# [Save DataFrame as a Snowflake table]
#   Write modes:
#     - Overwrite: replace existing table entirely
#     - Append: add rows to existing table
#     - Ignore: skip if table already exists
#     - Error (default): fail if table exists
#
# [Write to fully qualified table name]
#   - Specify database.schema.table explicitly
#   - Useful when writing to different schema than default
#
# NOTE: This is the fastest write path - data stays in Snowflake
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 2. WRITE TO SNOWFLAKE STAGES
# -----------------------------------------------------------------------------
#
# PARQUET (recommended):
#   [Write DataFrame as Parquet to stage]
#   - Best for analytical workloads
#   - Columnar format, compressed by default
#   - Preserves schema information
#
# CSV:
#   [Write DataFrame as CSV to stage]
#   Options to consider:
#     - Include header row or not
#     - Compression format (gzip, none, etc.)
#     - Field delimiter
#     - Quote character for strings
#
# JSON:
#   [Write DataFrame as JSON to stage]
#   - One JSON object per row
#   - Consider compression for large outputs
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 3. WRITE TO CLOUD STORAGE
# -----------------------------------------------------------------------------
#
# [Write to S3]
#   - Requires AWS credentials configured
#
# [Write to Google Cloud Storage]
#   - Requires GCP credentials configured
#
# [Write to Azure Blob Storage]
#   - Requires Azure credentials configured
#
# NOTE: Using Snowflake stages is often simpler (handles auth automatically)
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 4. PARTITIONING FOR PERFORMANCE
# -----------------------------------------------------------------------------
#
# [Write with partitioning by columns]
#   - Creates directory structure based on partition values
#   - Enables partition pruning on reads
#   - Common patterns: partition by date, region, category
#
# Benefits:
#   - Faster queries that filter on partition columns
#   - Enables parallel reads of different partitions
#   - Easier data management (delete old partitions)
#
# Considerations:
#   - Too many partitions = too many small files
#   - Choose columns with moderate cardinality
#   - Date-based partitioning is very common
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 5. COMPRESSION OPTIONS
# -----------------------------------------------------------------------------
#
# [Specify compression when writing]
#   Supported formats:
#     - Snappy: fast, moderate compression (default for Parquet)
#     - Gzip: slower, better compression
#     - LZ4: very fast, moderate compression
#     - Zstd: good balance of speed and compression
#     - None: no compression (rarely recommended)
#
# Trade-offs:
#   - Higher compression = smaller files but slower writes
#   - Snappy/LZ4 good for frequently accessed data
#   - Gzip/Zstd good for archival or infrequent access
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 6. CONTROLLING OUTPUT FILE COUNT
# -----------------------------------------------------------------------------
#
# [Reduce number of output files]
#   - Combine partitions before writing
#   - Useful when you want fewer, larger files
#   - Single file output for small datasets
#
# [Increase number of output files]
#   - Repartition to more partitions before writing
#   - Useful for parallelism on very large datasets
#
# Best practices:
#   - Target 100MB-1GB per file for optimal read performance
#   - Too many small files = slow reads (file listing overhead)
#   - Too few large files = limited parallelism
#
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# 7. WRITE MODES SUMMARY
# -----------------------------------------------------------------------------
#
# Overwrite:
#   - Replaces all existing data
#   - Use when you want a fresh snapshot
#   - ‚ö†Ô∏è  Destructive - existing data is lost
#
# Append:
#   - Adds new rows to existing data
#   - Use for incremental loads
#   - Beware of duplicates if re-running
#
# Ignore:
#   - Skips write if destination exists
#   - Use for idempotent operations
#   - No error, but data not written
#
# Error (default):
#   - Fails if destination exists
#   - Safest option to prevent accidental overwrites
#   - Must explicitly choose overwrite/append
#
# =============================================================================


In [None]:
# =============================================================================
# END-TO-END DATA PIPELINE WITH TELEMETRY
# =============================================================================
#
# A production-ready pipeline structure with monitoring, error handling,
# and best practices integrated throughout
#
# =============================================================================

# -----------------------------------------------------------------------------
# PIPELINE STRUCTURE OVERVIEW
# -----------------------------------------------------------------------------
#
# 1. Setup & Configuration
# 2. Telemetry Initialization
# 3. Data Ingestion (with monitoring)
# 4. Data Validation
# 5. Transformations (with monitoring)
# 6. Data Quality Checks
# 7. Write Output (with monitoring)
# 8. Cleanup & Summary
#
# -----------------------------------------------------------------------------

# =============================================================================
# STEP 1: SETUP & CONFIGURATION
# =============================================================================
#
# [Import required packages]
#   - Session management
#   - DataFrame functions
#   - Data types for schema definition
#   - Telemetry module for monitoring
#
# [Define configuration variables]
#   - Source table/stage paths
#   - Output table/stage paths
#   - Date ranges or filter parameters
#   - Environment-specific settings (dev/prod)
#
# [Initialize session with Snowflake connection]
#   - Best practice: Use environment variables for credentials
#   - Best practice: Set appropriate warehouse size for workload
#
# =============================================================================

# =============================================================================
# STEP 2: TELEMETRY INITIALIZATION
# =============================================================================
#
# [Initialize telemetry/logging]
#   - Set up custom event tracking
#   - Configure pipeline run identifier (for tracing)
#   - Record pipeline start time
#
# [Log pipeline metadata]
#   - Pipeline name and version
#   - Input parameters
#   - Environment (dev/staging/prod)
#
# Why telemetry matters:
#   - Track execution time and performance
#   - Identify bottlenecks in processing
#   - Debug failures with detailed context
#   - Monitor data volumes over time
#   - Alert on anomalies or failures
#
# =============================================================================

# =============================================================================
# STEP 3: DATA INGESTION (with monitoring)
# =============================================================================
#
# [Start ingestion telemetry span]
#   - Record: source location, expected row count
#
# [Read source data]
#   - Best practice: Use predefined schema (avoid inferSchema)
#   - Best practice: Read from Snowflake tables when possible
#   - Best practice: Filter at read time to reduce data volume
#
# [Record ingestion metrics]
#   - Rows read
#   - Columns loaded
#   - Time elapsed
#
# [End ingestion telemetry span]
#
# =============================================================================

# =============================================================================
# STEP 4: DATA VALIDATION
# =============================================================================
#
# [Validate source data before processing]
#   Checks to perform:
#     - Row count within expected range
#     - Required columns present
#     - No unexpected nulls in key columns
#     - Data types match expectations
#
# [Handle validation failures]
#   - Log detailed error information
#   - Decide: fail pipeline or continue with warnings
#   - Best practice: Fail fast on critical validation errors
#
# [Record validation metrics]
#   - Pass/fail status
#   - Specific checks that failed
#
# =============================================================================

# =============================================================================
# STEP 5: TRANSFORMATIONS (with monitoring)
# =============================================================================
#
# [Start transformation telemetry span]
#
# --- STAGE 5A: Data Cleaning ---
#
# [Clean and standardize data]
#   - Handle null values (fill or filter)
#   - Trim whitespace from strings
#   - Standardize date formats
#   - Remove or flag invalid records
#
# --- STAGE 5B: Business Logic ---
#
# [Apply business transformations]
#   - Calculate derived columns
#   - Apply business rules
#   - Best practice: Use built-in SQL functions (not Python UDFs)
#
# --- STAGE 5C: Joins & Enrichment ---
#
# [Join with dimension/reference tables]
#   - Best practice: Broadcast small tables (< few MB)
#   - Best practice: Filter before joining to reduce volume
#
# [Log join metrics]
#   - Match rate (rows matched vs total)
#   - Unmatched records count
#
# --- STAGE 5D: Aggregations ---
#
# [Perform aggregations]
#   - Group by required dimensions
#   - Calculate summary statistics
#
# [Cache intermediate results if reused]
#   - Best practice: Cache only when accessed multiple times
#   - Best practice: Unpersist when done
#
# [End transformation telemetry span]
#
# =============================================================================

# =============================================================================
# STEP 6: DATA QUALITY CHECKS
# =============================================================================
#
# [Validate output data before writing]
#   Checks to perform:
#     - Output row count reasonable (not zero, not exploded)
#     - No duplicate keys (if applicable)
#     - Aggregations sum correctly (reconciliation)
#     - Value ranges within expectations
#
# [Compare with previous run (if applicable)]
#   - Row count variance within threshold
#   - Key metrics variance within threshold
#
# [Log quality check results]
#
# =============================================================================

# =============================================================================
# STEP 7: WRITE OUTPUT (with monitoring)
# =============================================================================
#
# [Start write telemetry span]
#
# [Write to destination]
#   - Best practice: Write to Snowflake table for fastest path
#   - Best practice: Use appropriate write mode
#   - Best practice: Partition large outputs by date
#   - Best practice: Use compression (snappy or gzip)
#
# [Verify write success]
#   - Confirm row count matches expected
#   - Check for write errors
#
# [Record write metrics]
#   - Rows written
#   - Write duration
#   - Destination location
#
# [End write telemetry span]
#
# =============================================================================

# =============================================================================
# STEP 8: CLEANUP & SUMMARY
# =============================================================================
#
# [Release resources]
#   - Unpersist any cached DataFrames
#   - Clear temporary tables
#   - Best practice: Always clean up even on failure (try/finally)
#
# [Calculate pipeline summary]
#   - Total execution time
#   - Rows processed (input to output)
#   - Data quality score
#   - Warnings or issues encountered
#
# [Log final telemetry]
#   - Pipeline completion status (success/failure)
#   - Summary metrics
#   - End timestamp
#
# =============================================================================

# =============================================================================
# BEST PRACTICES CHECKLIST (enforced throughout)
# =============================================================================
#
# PERFORMANCE:
#    - Use SQL functions instead of Python UDFs
#    - Broadcast small dimension tables in joins
#    - Filter early to reduce data volume
#    - Cache only when DataFrames are reused
#    - Aggregate early when possible
#
# RELIABILITY:
#    - Define schemas explicitly (don't infer)
#    - Validate data at input and output
#    - Handle errors gracefully with clear logging
#    - Use idempotent operations where possible
#    - Clean up resources in finally blocks
#
# OBSERVABILITY:
#    - Add telemetry spans around major operations
#    - Log row counts at each stage
#    - Record timing for performance analysis
#    - Track data quality metrics
#    - Include pipeline run ID for tracing
#
# MAINTAINABILITY:
#    - Use configuration variables (not hardcoded values)
#    - Break pipeline into clear stages
#    - Add comments explaining business logic
#    - Document expected inputs and outputs
#
# =============================================================================

### Performance Considerations

Follow these best practices to get optimal performance from Snowpark Connect.

1. Use SQL Functions Over UDFs: Python UDFs require data to be transferred to the client, processed, and sent back - this is 10-100x slower than native operations!
2. Broadcast Joins for Small Tables: When joining a large table with a small dimension table, use `broadcast()` to optimize the join.
3. Cache Frequently Accessed DataFrames: Caching creates temporary tables in Snowflake for faster repeated access. Remember to `unpersist()` when done!
4. Minimize Data Movement: Process data in Snowflake and only transfer final results. Avoid `collect()` on large datasets!
5. Partition Awareness: Filter on partitioned columns to enable partition pruning and reduce data scanned.

## Additional Resources

**Official Documentation:**
- [Snowflake Documentation](https://docs.snowflake.com/)
- [PySpark API Reference](https://spark.apache.org/docs/latest/api/python/)