# 🚀 PySpark Quick Start Guide

Welcome to PySpark! This notebook will help you get started with Apache Spark in Python.

## ✅ Installation Verified
PySpark 3.5.3 (Stable) is installed and ready to use!

## 1️⃣ Create a Spark Session

The SparkSession is your entry point to using Spark.

In [None]:
# --- KERNEL CHECK ---
import sys
if sys.version_info < (3, 11) or sys.version_info >= (3, 12):
    print(f"\033[91m⚠️ WARNING: You are running Python {sys.version.split()[0]}!\033[0m")
    print("Please select the kernel: 'Python 3.11 (PySpark)' from the top right menu.")
    print("Using the wrong Python version will cause crashes on Windows.")
else:
    print("✅ Correct Python Kernel (3.11) detected!")

import os
import sys
from pyspark.sql import SparkSession
import pandas as pd
import tempfile
from pathlib import Path

# --- WINDOWS ENVIRONMENT SETUP ---
# Use the local Python 3.11 environment we created
current_dir = os.getcwd()
python_path = os.path.join(current_dir, "python311_env", "python.exe")

os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path
os.environ['JAVA_HOME'] = r'C:\Users\ksank\.gemini\java\jdk-17.0.9+8'
os.environ['HADOOP_HOME'] = r'C:\Users\ksank\.hadoop'

# --- CRASH FIX: Display Helper ---
def display_df(df, limit=10):
    """Safely display a Spark DataFrame using RDD collection."""
    # Use rdd.collect() as it proved more stable on Windows than direct df.collect()
    rows = df.limit(limit).rdd.collect()
    return pd.DataFrame(rows, columns=df.columns)

# Create Spark session with stability configs
# Use a custom temp directory to avoid Java NIO errors
spark_temp_dir = Path(r"C:\tmp\spark-temp")
spark_temp_dir.mkdir(parents=True, exist_ok=True)

spark = SparkSession.builder \
    .appName("MyFirstSparkApp") \
    .master("local[*]") \
    .config("spark.sql.warehouse.dir", "file:///C:/tmp/hive") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.pyspark.python", python_path) \
    .config("spark.pyspark.driver.python", python_path) \
    .config("spark.python.use.daemon", "false") \
    .config("spark.local.dir", str(spark_temp_dir)) \
    .config("spark.driver.extraJavaOptions", f"-Djava.io.tmpdir={str(spark_temp_dir)}") \
    .config("spark.executor.extraJavaOptions", f"-Djava.io.tmpdir={str(spark_temp_dir)}") \
    .getOrCreate()

print(f"✅ Spark {spark.version} session created!")
print(f"📊 Running on: {spark.sparkContext.master}")

### 💡 Important Note for Windows Users
If you experience crashes with `df.show()`, please use our `display_df(df)` helper function defined above. It uses `collect()` internally which is stable on Windows.

## 2️⃣ Create Your First DataFrame

Let's create a simple DataFrame from Python data.

In [None]:
# Sample data
data = [
    ("Alice", 34, "Data Engineer", 95000),
    ("Bob", 45, "Data Scientist", 110000),
    ("Charlie", 28, "Analytics Engineer", 85000),
    ("Diana", 32, "ML Engineer", 105000),
    ("Eve", 29, "Data Analyst", 75000)
]

columns = ["Name", "Age", "Role", "Salary"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Show the data
display_df(df)


## 3️⃣ Basic DataFrame Operations

In [None]:
# View schema
print("📋 DataFrame Schema:")
df.printSchema()

In [None]:
# Select specific columns
print("\n👥 Names and Roles:")
display_df(df.select("Name", "Role"))


In [None]:
# Filter data
print("\n💰 High earners (Salary > 90k):")
display_df(df.filter(df.Salary > 90000))


In [None]:
# Group by and aggregate
print("\n📊 Average salary by role:")
display_df(df.groupBy("Role").avg("Salary"), truncate=False)


## 4️⃣ Using SQL Queries

You can also use SQL syntax with Spark!

In [None]:
# Register DataFrame as a temporary view
df.createOrReplaceTempView("employees")

# Run SQL query
result = spark.sql("""
    SELECT Role, COUNT(*) as count, AVG(Salary) as avg_salary
    FROM employees
    GROUP BY Role
    ORDER BY avg_salary DESC
""")

display_df(result)


## 5️⃣ Working with Transformations

Add new columns and transform data.

In [None]:
from pyspark.sql.functions import col, when

# Add a salary category column
df_with_category = df.withColumn(
    "Salary_Category",
    when(col("Salary") >= 100000, "High")
    .when(col("Salary") >= 80000, "Medium")
    .otherwise("Entry")
)

display_df(df_with_category)


## 6️⃣ Reading and Writing Data

Spark can read from various sources.

In [None]:
# Save as CSV
df.write.mode("overwrite").csv("output/employees.csv", header=True)
print("✅ Data saved to CSV")

# Save as Parquet (columnar format, more efficient)
df.write.mode("overwrite").parquet("output/employees.parquet")
print("✅ Data saved to Parquet")

In [None]:
# Read back from Parquet
df_from_parquet = spark.read.parquet("output/employees.parquet")
print("📂 Data loaded from Parquet:")
display_df(df_from_parquet)


## 7️⃣ Performance: Caching

Cache DataFrames that you'll reuse multiple times.

In [None]:
# Cache the DataFrame in memory
df.cache()
print("✅ DataFrame cached")

# Subsequent operations will be faster
df.count()  # Triggers caching
df.filter(df.Age > 30).count()  # Uses cached data

## 🛑 Stop the Spark Session

Always stop your Spark session when done to free up resources.

In [None]:
spark.stop()
print("✅ Spark session stopped")

---

## 📚 Next Steps

Now that you have PySpark working, you can:

1. **Learn more transformations**: `select()`, `filter()`, `groupBy()`, `join()`, `orderBy()`
2. **Explore PySpark SQL functions**: `pyspark.sql.functions`
3. **Work with real datasets**: Load CSV, JSON, Parquet files
4. **Optimize performance**: Partitioning, broadcasting, caching
5. **Machine Learning**: Use `pyspark.ml` for MLlib

### Useful Resources:
- [PySpark Documentation](https://spark.apache.org/docs/latest/api/python/)
- [PySpark SQL Functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)
- [Spark SQL Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)

Happy Sparking! 🚀