# ⚡ Zero Migration from Apache Spark to Snowflake Using Snowpark Connect

## 🚀 **Same PySpark Code, Snowflake's Superior Engine**

---

### 🎯 **What This Notebook Proves**

This notebook demonstrates the **game-changing reality** of **zero-code migration** to run Apache Spark code on Snowflake's cloud-native engine:


### 🪄 **The Zero Migration Magic**

**Change just TWO variables** and instantly transform your Spark workloads to run on Snowflake:

```python
# 🎛️ INSTANT TRANSFORMATION!
USE_SNOWPARK_CONNECT = True   # 🚀 Unlock Snowflake's  engine
CONDA_ENV_NAME = "your-env-name"  # Your existing environment works unchanged
```
### 🏆 **Why Snowpark Connect Transforms Your Business**

#### 💰 **Zero Migration Investment**
- **100% Code Compatibility**: Every line of existing PySpark code runs unchanged
- **Instant ROI**: No rewriting, no retraining, no project delays
- **Risk-Free Transition**: Test workloads without any code modifications

#### 🚀 **Snowflake's Superior Engine Advantages**
- **⚡ Elastic Auto-Scaling**: Automatic compute adjustment from small datasets to petabytes
- **🌐 Multi-Cloud Flexibility**: Run on AWS, Azure, or GCP without vendor lock-in
- **🔒 Enterprise Security**: Built-in governance, compliance, and data protection
- **💸 Pay-per-Second Billing**: Only pay for actual compute consumption, not idle clusters

---




## 🛠️ **Zero Migration Setup: From Local Spark to Snowflake**


### 📋 **Step 1: Create Your Environment**

#### **Install Snowpark Connect**
```bash
# 🚀 ENTERPRISE TRANSFORMATION READY!
conda create -n snowpark-connect-demo python=3.12
conda activate snowpark-connect-demo

# Your PySpark dependencies
conda install -c conda-forge pyspark matplotlib seaborn jupyter

# 🎯 Add Snowpark Connect for instant enterprise scaling
pip install snowpark-connect
```

### 📋 **Step 2: Unlock Snowflake's Enterprise Engine (Zero Code Changes Required)**

**Experience the power of enterprise-scale analytics** with the same PySpark code:

1. **🏢 Snowflake Account** with Snowpark Connect enabled  
   *→ Instant access to elastic, multi-cloud compute*
2. **⚡ Snowflake CLI** for seamless authentication  
   *→ Enterprise security with zero complexity*

#### **Install Snowflake CLI:**
```bash
pip install snowflake-cli-labs
```

#### **Configure Connection:**
```bash
# Create connection named 'spark-connect'
snow connection add --connection-name spark-connect \
  --account your-account \
  --user your-username \
  --password \
  --database your-database \
  --schema your-schema \
  --warehouse your-warehouse
```

**📖 Detailed Setup Guide:** 
- [Snowpark Connect Documentation](https://docs.snowflake.com/en/developer-guide/snowpark-connect/snowpark-connect-overview)
- [Snowflake CLI Setup](https://docs.snowflake.com/en/developer-guide/snowflake-cli/connecting/specify-credentials)

### 📋 **Step 3: Configure This Notebook**

Update the variables in the next cell with your settings:
- Set `CONDA_ENV_NAME` to your environment name
- Set `USE_SNOWPARK_CONNECT` to `True` if you want to use Snowflake

---


### Configuration

Update the two variables in the next cell:
- `USE_SNOWPARK_CONNECT`: True for Snowflake, False for local Spark
- `CONDA_ENV_NAME`: your conda environment name

In [None]:
# ===============================================
# 🎛️ CONFIGURATION - UPDATE THESE FOR YOUR SETUP!
# ===============================================

# 🔄 ENGINE SELECTION - Choose your execution engine
USE_SNOWPARK_CONNECT = True  # Set to True for Snowflake, False for Apache Spark

# 🐍 ENVIRONMENT NAME - Update with your conda environment name
CONDA_ENV_NAME = "snowpark-connect-demo"  # Replace with your actual environment name

# ===============================================
# Everything else is identical regardless of your settings! 🪄
# ===============================================

# Auto-detect conda installation path (works on most systems)
import os
import sys

# Try to detect conda path automatically
if 'CONDA_PREFIX' in os.environ:
    conda_base = os.environ['CONDA_PREFIX'].replace(f'/envs/{CONDA_ENV_NAME}', '')
elif 'CONDA_DEFAULT_ENV' in os.environ:
    conda_base = '/opt/anaconda3'  # Default location
else:
    # Common conda installation paths
    potential_paths = [
        '/opt/anaconda3',
        '/opt/miniconda3', 
        os.path.expanduser('~/anaconda3'),
        os.path.expanduser('~/miniconda3')
    ]
    conda_base = None
    for path in potential_paths:
        if os.path.exists(path):
            conda_base = path
            break
    
    if conda_base is None:
        conda_base = '/opt/anaconda3'  # Fallback default

# Construct environment path
ENV_PATH = f"{conda_base}/envs/{CONDA_ENV_NAME}"

# Display configuration
print("🎛️ === NOTEBOOK CONFIGURATION ===")
print(f"🔧 Selected Engine: {'❄️  Snowpark Connect (Snowflake)' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark (Local)'}")
print(f"🐍 Conda Environment: {CONDA_ENV_NAME}")
print(f"📁 Environment Path: {ENV_PATH}")
print(f"🌐 Infrastructure: {'☁️  Cloud-native Snowflake' if USE_SNOWPARK_CONNECT else '💻 Local computation'}")

# Verify environment exists
if os.path.exists(ENV_PATH):
    print(f"✅ Environment found: {ENV_PATH}")
else:
    print(f"⚠️  Environment not found at: {ENV_PATH}")  
    print(f"💡 Please update CONDA_ENV_NAME or create the environment using setup instructions above")

print(f"\n🚀 Ready to execute identical PySpark code!")
print(f"🪄 Same logic, {'different' if USE_SNOWPARK_CONNECT else 'local'} execution engine!")


### Environment configuration

The following cell automatically configures Python paths based on your conda environment name. This ensures the notebook works with any properly named conda environment.


In [None]:
# 🌍 ENVIRONMENT CONFIGURATION - Uses your conda environment automatically!
# This configuration works with ANY properly named conda environment

import os

# Clear any conflicting environment variables
os.environ.pop('PYTHONPATH', None)

# Set Python paths dynamically based on your environment name
python_executable = f"{ENV_PATH}/bin/python"
spark_home = f"{ENV_PATH}/lib/python3.12/site-packages/pyspark"

# Configure PySpark environment variables
os.environ['PYSPARK_PYTHON'] = python_executable
os.environ['PYSPARK_DRIVER_PYTHON'] = python_executable  
os.environ['SPARK_HOME'] = spark_home

print("🐍 DYNAMIC Python environment configured:")
print(f"   🎯 Environment: {CONDA_ENV_NAME}")
print(f"   🐍 Driver Python: {os.environ['PYSPARK_DRIVER_PYTHON']}")
print(f"   🐍 Worker Python: {os.environ['PYSPARK_PYTHON']}")
print(f"   🏠 Spark Home: {os.environ['SPARK_HOME']}")
print(f"\n✅ Environment ready for: {'❄️ Snowpark Connect' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}")

# Verification
if os.path.exists(python_executable):
    print(f"✅ Python executable verified: {python_executable}")
else:
    print(f"⚠️  Python executable not found: {python_executable}")
    print(f"💡 Please check your conda environment name and ensure it's created correctly")

print(f"\n🎉 Configuration complete! Ready for {'Snowflake' if USE_SNOWPARK_CONNECT else 'local Spark'} execution!")


## 📚 **Library Imports**

### 🔄 **Identical Imports for Both Engines**

Notice how the imports are **exactly the same** regardless of which engine we're using.
- Same DataFrame operations
- Same SQL functions  
- Same data types
- Same visualization libraries


In [None]:
# 📚 IDENTICAL imports - Same for both Apache Spark and Snowpark Connect!
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, round as spark_round, avg, sum as spark_sum, 
    count, max as spark_max, min as spark_min, 
    year, month, dayofweek, hour,
    udf, lit
)
from pyspark.sql.types import StringType, FloatType, StructType, StructField, DoubleType, IntegerType, TimestampType
from datetime import datetime, timedelta
import random
import matplotlib.pyplot as plt
import seaborn as sns

print("📚 libraries imported successfully!")
print(f"🎯 Ready for: {'❄️ Snowpark Connect' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'} execution!")


## ⚡ **The Transformation Moment: From Local to Enterprise**

### 🚨 **THE ONLY CHANGE NEEDED FOR ZERO MIGRATION!**

This is where the **enterprise magic happens**! One simple variable change transforms your Spark workload to run on Snowflake:

| **Apache Spark** | `SparkSession.builder`

| **❄️ Snowpark Connect** | `snowpark_connect.get_session()` 


### 🏆 **Why Snowpark Connect Changes Everything**
- **💰 Zero Retraining Cost**: Your team's PySpark expertise instantly scales to enterprise
- **⚡ Instant Production**: No architectural changes, no code rewrites, no deployment complexity  
- **🌐 Enterprise Security**: Built-in governance without code modifications
- **📈 Elastic Performance**: Auto-scale from development datasets to production petabytes


In [None]:
# ⚡ ZERO MIGRATION TRANSFORMATION - Watch your Spark code run on Snowflake!

if USE_SNOWPARK_CONNECT:
    # ❄️ ENTERPRISE SNOWPARK CONNECT - THE GAME CHANGER!
    print("❄️ Connecting to Snowflake's cloud-native engine...")
    print("💡 Same PySpark code, Snowflake infrastructure!")
    
    try:
        from snowflake import snowpark_connect
        
        # Enable enterprise Spark Connect mode
        os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
        
        # Connect to Snowflake's superior engine
        snowpark_connect.start_session()
        spark = snowpark_connect.get_session()
        
        print(f"\n🎉 ENTERPRISE TRANSFORMATION COMPLETE!")
        print(f"⚡ Spark version: {spark.version}")
        print(f"❄️ Execution engine: 🏆 SNOWFLAKE")
        
    except ImportError:
        print("❌ Missing Snowpark Connect package!")
        print("💡 Enable Snowpark Connect: pip install snowpark-connect")
        print("📖 Setup guide: https://docs.snowflake.com/en/developer-guide/snowpark-connect/snowpark-connect-overview")
        raise
    except Exception as e:
        print(f"❌ Snowflake connection issue: {e}")
        print("💡 Verify your Snowflake CLI configuration:")
        print("   snow connection test -c spark-connect")
        raise
        
else:
    # 🟦 LOCAL APACHE SPARK - DEVELOPMENT MODE
    print("🟦 === LOCAL DEVELOPMENT MODE ===")
    print("💻 Initializing local Spark session...")
    print(f"⚠️  LIMITED to local resources in environment: {CONDA_ENV_NAME}")
    
    spark = SparkSession.builder \
        .appName("NYC Taxi Analysis - Local Development") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()
    
    print(f"\n✅ Local Spark session created!")
    print(f"⚡ Spark version: {spark.version}")
 

---

# 🚀 **ZERO MIGRATION PROOF**

## 🎯 **From Here Forward: Your Existing PySpark Code Unchanged!**

Everything below this point is **100% identical PySpark code** that runs seamlessly on both local development and Snowflake. 

> **🏆 Enterprise Migration Truth**: Every line of production PySpark code you've written will run unchanged on Snowflake's engine.

---

## 📊 **Data Processing**

### 🗽 ** NYC Taxi Dataset**

We'll process **50,000 realistic taxi trip records** to demonstrate enterprise-grade analytics capabilities.


In [None]:
# 🏗️ IDENTICAL DATA GENERATION - Works on both engines!
print(f"🏗️ Generating NYC Taxi data using: {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}")

# IDENTICAL SCHEMA - Same structure for both engines
schema = StructType([
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("payment_type", IntegerType(), True)
])

# IDENTICAL DATA GENERATION ALGORITHM
sample_data = []
base_time = datetime(2023, 1, 15, 6, 0, 0)

print("📊 Creating 50,000 sample taxi trip records...")
for i in range(50000):  # 50K records
    # Realistic pickup times (6 AM to 11 PM peak distribution)
    hour_offset = random.randint(0, 17)
    minute_offset = random.randint(0, 59)
    pickup_time = base_time.replace(hour=(base_time.hour + hour_offset) % 24, minute=minute_offset)
    
    # Realistic trip duration (3-60 minutes)
    trip_duration_minutes = random.randint(3, 60)
    dropoff_time = pickup_time + timedelta(minutes=trip_duration_minutes)
    
    # Realistic distance based on NYC traffic speed (8-25 mph)
    base_speed = random.uniform(8, 25)
    trip_distance = round((trip_duration_minutes / 60) * base_speed, 2)
    
    # NYC taxi fare structure: base + distance + time
    base_fare = 2.50
    distance_fare = trip_distance * random.uniform(2.40, 3.20)  # per mile
    time_fare = trip_duration_minutes * random.uniform(0.30, 0.50)  # per minute
    fare_amount = round(base_fare + distance_fare + time_fare, 2)
    
    # Realistic tipping behavior (0-25% of fare)
    tip_percentage = random.uniform(0, 0.25)
    tip_amount = round(fare_amount * tip_percentage, 2)
    
    sample_data.append((
        pickup_time,
        dropoff_time,
        random.randint(1, 4),  # 1-4 passengers
        trip_distance,
        fare_amount,
        tip_amount,
        random.randint(1, 4)  # payment types: 1=Credit, 2=Cash, 3=No Charge, 4=Dispute
    ))

# IDENTICAL DATAFRAME CREATION
taxi_df = spark.createDataFrame(sample_data, schema)

print(f"✅ Generated realistic taxi dataset!")
print(f"📈 Dataset Statistics (processed by {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}):")
print(f"   📊 Total records: {taxi_df.count():,}")
print(f"   📋 Columns: {len(taxi_df.columns)}")

# IDENTICAL SCHEMA DISPLAY
print(f"\n🏗️ Schema (executed on {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}):")
taxi_df.printSchema()

print(f"\n🎉 Data generation complete using {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'} engine!")


## 🧹 **Data Cleaning and Feature Engineering**

Now we'll implement the **complete data processing pipeline** using PySpark operations:

#### 📋 **Processing Steps:**
1. **🧹 Data Quality Filters** - Remove invalid records and outliers
2. **⚙️ Feature Engineering** - Create derived columns for analysis  
3. **📊 Data Validation** - Ensure realistic values and constraints

### 🎯 **Code Portability Demo**
> These are **standard PySpark DataFrame operations** that work identically on both engines. Your existing data pipelines can switch engines with zero code changes!


In [None]:
# 🧹 Same filters and logic for both engines!
print(f"🧹 Applying data cleaning using: {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}")

# business rules
clean_taxi_df = taxi_df.filter(
    (col("trip_distance") > 0) & 
    (col("trip_distance") < 100) &  # Remove unrealistic long trips
    (col("fare_amount") > 0) & 
    (col("fare_amount") < 500) &    # Remove unrealistic high fares
    (col("passenger_count") > 0) & 
    (col("passenger_count") <= 6)   # Max 6 passengers in taxi
)

# derived columns
enriched_taxi_df = clean_taxi_df.withColumn(
    "pickup_hour", hour(col("tpep_pickup_datetime"))
).withColumn(
    "pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))
).withColumn(
    "trip_duration_minutes", 
    (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 60
).withColumn(
    "speed_mph", 
    spark_round((col("trip_distance") / (col("trip_duration_minutes") / 60)), 2)
)

# realistic constraints
final_taxi_df = enriched_taxi_df.filter(
    (col("trip_duration_minutes") > 1) & 
    (col("trip_duration_minutes") < 300) &  # Max 5 hours
    (col("speed_mph") > 0) & 
    (col("speed_mph") < 80)  # Reasonable speed limit for NYC
)

# RESULTS SUMMARY
original_count = taxi_df.count()
final_count = final_taxi_df.count()
retention_rate = (final_count / original_count * 100)

print(f"📊 Data cleaning results (processed by {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}):")
print(f"   📥 Original records: {original_count:,}")
print(f"   📤 After cleaning: {final_count:,}")
print(f"   🎯 Data quality: {retention_rate:.1f}% retained")
print(f"   🗑️ Filtered out: {original_count - final_count:,} invalid records")
print(f"\n✅ Data cleaning completed using {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'} engine!")


## 🔧 **User-Defined Functions (UDFs)**


Let's demonstrate how **UDF** runs identically on both engines

#### 🎯 **UDF Examples:**
1. **`classify_trip_length()`** - Categorize trips by distance ranges
2. **`calculate_tip_percentage()`** - Compute tip as percentage of fare

### 🏆 **Enterprise Value**
> Your existing UDF investments are **100% portable**! The same custom business logic, same function signatures, same results - regardless of the execution engine.


In [None]:
# 🔧 IDENTICAL UDF DEFINITIONS - Same business logic for both engines!

def classify_trip_length(distance):
    """IDENTICAL FUNCTION - Classify trip length based on distance"""
    if distance < 1:
        return "Short"  # Quick neighborhood trips
    elif distance < 5:
        return "Medium"  # Typical city trips
    elif distance < 15:
        return "Long"    # Cross-borough trips
    else:
        return "Very Long"  # Airport/suburban trips

def calculate_tip_percentage(tip_amount, fare_amount):
    """IDENTICAL FUNCTION - Calculate tip percentage with error handling"""
    if fare_amount > 0:
        return round((tip_amount / fare_amount) * 100, 2)
    return 0.0

# IDENTICAL UDF REGISTRATION - Same API for both engines
classify_trip_udf = udf(classify_trip_length, StringType())
tip_percentage_udf = udf(calculate_tip_percentage, FloatType())

print(f"🔧 IDENTICAL User-Defined Functions registered on {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}:")
print("   📏 classify_trip_length() - Trip categorization logic")
print("   💰 calculate_tip_percentage() - Tipping behavior analysis")
print("   🎯 Both functions use identical business rules!")

# IDENTICAL UDF APPLICATION - Same DataFrame operations
udf_taxi_df = final_taxi_df.withColumn(
    "trip_category", classify_trip_udf(col("trip_distance"))
).withColumn(
    "tip_percentage", tip_percentage_udf(col("tip_amount"), col("fare_amount"))
)

print(f"\n✅ IDENTICAL UDFs applied successfully on {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}!")
print(f"\n📋 Sample with new columns (processed by {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}):")

# Show sample results with new UDF columns
udf_taxi_df.select(
    "trip_distance", "trip_category", "fare_amount", "tip_amount", "tip_percentage"
).show(10)

print("🎉 Notice: Same UDF logic, same results, different execution engine!")


## 🔍 **Spark SQL Analytics**

### 📊 **Advanced Analytics - Engine Agnostic SQL**

Now let's execute **complex SQL analytics** using the same queries on both engines. This demonstrates how your existing SQL investments are fully portable!

#### 🎯 **Analytical Goals:**
1. **📈 Trip Statistics by Category** - Understand distance-based patterns
2. **🕐 Peak Hours Analysis** - Identify demand patterns by hour
3. **💳 Payment Method Analysis** - Revenue and tipping insights

### 💡 **SQL Portability**
> Every SQL query, every aggregation function, every analytical pattern runs **identically** on both engines. Your SQL expertise transfers seamlessly!


In [None]:
# 📊 IDENTICAL SQL SETUP - Same temporary view registration
udf_taxi_df.createOrReplaceTempView("taxi_trips")

print(f"📊 DataFrame registered as 'taxi_trips' view on {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}")
print(f"🔍 Ready for IDENTICAL SQL analytics!")

# 📈 IDENTICAL SQL QUERY 1 - Trip statistics by category
print(f"\n🚀 Executing identical SQL on {'❄️ Snowflake cloud' if USE_SNOWPARK_CONNECT else '🟦 local Spark'}...")

trip_stats_sql = """
SELECT 
    trip_category,
    COUNT(*) as trip_count,
    ROUND(AVG(trip_distance), 2) as avg_distance,
    ROUND(AVG(fare_amount), 2) as avg_fare,
    ROUND(AVG(tip_percentage), 2) as avg_tip_pct,
    ROUND(AVG(speed_mph), 2) as avg_speed,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pct_of_trips
FROM taxi_trips 
GROUP BY trip_category
ORDER BY trip_count DESC
"""

trip_stats_df = spark.sql(trip_stats_sql)
print(f"\n📈 === Trip Statistics by Category ({'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'} Engine) ===")
trip_stats_df.show()

# 🕐 IDENTICAL SQL QUERY 2 - Peak hours analysis  
peak_hours_sql = """
SELECT 
    pickup_hour,
    COUNT(*) as trip_count,
    ROUND(AVG(fare_amount), 2) as avg_fare,
    ROUND(AVG(trip_distance), 2) as avg_distance,
    ROUND(AVG(speed_mph), 2) as avg_speed,
    ROUND(AVG(tip_percentage), 2) as avg_tip_pct
FROM taxi_trips 
GROUP BY pickup_hour
ORDER BY pickup_hour
"""

peak_hours_df = spark.sql(peak_hours_sql)
print(f"\n🕐 === Hourly Trip Patterns ({'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'} Engine) ===")
peak_hours_df.show(24)

# 💳 IDENTICAL SQL QUERY 3 - Payment method analysis
payment_analysis_sql = """
SELECT 
    CASE payment_type
        WHEN 1 THEN 'Credit Card'
        WHEN 2 THEN 'Cash'
        WHEN 3 THEN 'No Charge'
        WHEN 4 THEN 'Dispute'
        ELSE 'Unknown'
    END as payment_method,
    payment_type,
    COUNT(*) as trip_count,
    ROUND(AVG(fare_amount), 2) as avg_fare,
    ROUND(AVG(tip_amount), 2) as avg_tip,
    ROUND(AVG(tip_percentage), 2) as avg_tip_pct,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage_of_trips
FROM taxi_trips 
GROUP BY payment_type
ORDER BY trip_count DESC
"""

payment_analysis_df = spark.sql(payment_analysis_sql)
print(f"\n💳 === Payment Method Analysis ({'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'} Engine) ===")
payment_analysis_df.show()

print(f"\n✅ All SQL analytics completed successfully on {'❄️ Snowflake' if USE_SNOWPARK_CONNECT else '🟦 Apache Spark'}!")


In [None]:
# 📊 IDENTICAL SUMMARY CALCULATIONS - Same aggregations for both engines!
total_trips = udf_taxi_df.count()
avg_fare = udf_taxi_df.select(avg(col("fare_amount"))).collect()[0][0]
avg_distance = udf_taxi_df.select(avg(col("trip_distance"))).collect()[0][0]
total_revenue = udf_taxi_df.select(spark_sum(col("fare_amount"))).collect()[0][0]
avg_tip_pct = udf_taxi_df.select(avg(col("tip_percentage"))).collect()[0][0]
avg_speed = udf_taxi_df.select(avg(col("speed_mph"))).collect()[0][0]

engine_name = "❄️ SNOWFLAKE (Snowpark Connect)" if USE_SNOWPARK_CONNECT else "🟦 APACHE SPARK (Local)"
infrastructure = "☁️ Cloud-native elastic compute" if USE_SNOWPARK_CONNECT else "💻 Local JVM process"
environment_info = f"📁 Environment: {CONDA_ENV_NAME}"

print("=" * 80)
print(f"🚀 === EXECUTION SUMMARY: {engine_name} ===")
print("=" * 80)
print(f"📊 Total Trips Processed: {total_trips:,}")
print(f"💰 Average Fare: ${avg_fare:.2f}")
print(f"📏 Average Distance: {avg_distance:.2f} miles")
print(f"💵 Total Revenue: ${total_revenue:,.2f}")
print(f"🎯 Average Tip: {avg_tip_pct:.1f}%")
print(f"🏃 Average Speed: {avg_speed:.1f} mph")
print(f"⚡ Execution Engine: {engine_name}")
print(f"🏗️ Infrastructure: {infrastructure}")
print(f"{environment_info}")
print(f"🔄 Processing API: IDENTICAL PySpark DataFrame & SQL")
print("=" * 80)



### 🎯 **Complete Pipeline Execution Summary Using Snowpark Connect**


### 🏆 **What We've Proven**
- ✅ **Same Code**: Zero changes required for running Spark code on Snowflake
- ✅ **Same Results**: Identical analytical outcomes
- ✅ **Same API**: PySpark DataFrame and SQL operations
- ✅ **Same Business Logic**: UDFs and custom functions work identically
- ✅ **Same Data Quality**: Processing pipelines transfer seamlessly

# 🔗 **Additional Resources**

## 📖 **Setup Guides**
- **Snowpark Connect**: [Official Documentation](https://docs.snowflake.com/en/developer-guide/snowpark-connect/snowpark-connect-overview)
- **Snowflake CLI**: [Setup Guide](https://docs.snowflake.com/en/developer-guide/snowflake-cli/connecting/specify-credentials)

# 🙏 **Thank You for Reading!**