# Distributed Data Processing with PySpark

## Objectives
- Bridge the gap between local tools (Pandas) and infrastructure-level big data processing (`PySpark`).
- Understand the map-reduce paradigm for processing logs or metrics that won't fit into your local machine's RAM.
- Set up a local Spark Session to aggregate and query dataset sizes that typically require a cluster.

## Expected Outcome
- A functional local PySpark pipeline capable of grouping and extracting metrics from millions of log rows.

## Challenge
- Rewrite a standard Pandas DataFrame `groupby()` utilizing PySpark RDDs or Spark DataFrames.

In [None]:
# !pip install pyspark

### 1. Initializing Spark
Unlike Pandas, Spark requires an active "Session" or "Context" that connects your Python code to the JVM (Java Virtual Machine) backend that does the heavy lifting.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, count, avg

# Set up a local spark session using all available CPU cores (*)
spark = SparkSession.builder \
    .appName("InfraLogAnalysis") \
    .master("local[*]") \
    .getOrCreate()

### 2. Simulating a Gigantic Log File
We'll generate a dummy dataset of server events that resembles a real-world ELK stack export.

In [None]:
import pandas as pd
import numpy as np

# Create 500,000 synthetic log records
num_records = 500000
sample_services = ['auth-service', 'billing-api', 'frontend-ui', 'database-pg']
sample_levels = ['INFO', 'WARN', 'ERROR', 'DEBUG']

data = {
    "timestamp": pd.date_range(start="2025-01-01", periods=num_records, freq="15S"),
    "service": np.random.choice(sample_services, num_records, p=[0.4, 0.2, 0.3, 0.1]),
    "log_level": np.random.choice(sample_levels, num_records, p=[0.7, 0.1, 0.05, 0.15]),
    "response_time_ms": np.random.gamma(shape=2.0, scale=50.0, size=num_records)
}

# Convert Pandas DataFrame to PySpark DataFrame
pdf = pd.DataFrame(data)
df = spark.createDataFrame(pdf)

print("Spark DataFrame Schema:")
df.printSchema()

### 3. Distributed Queries (Lazy Evaluation)
In Spark, defining a query doesn't execute it immediately. Execution happens only when an action (like `.show()` or `.collect()`) is called.

In [None]:
# Aggregating average response times and error counts by service
# If this was a 100GB dataset, this query would farm out to your cluster implicitly
service_metrics = df.filter(col("log_level").isin(["ERROR", "WARN"])) \
    .groupBy("service", "log_level") \
    .agg(
        count("*").alias("total_occurrences"),
        avg("response_time_ms").alias("avg_latency_ms")
    ) \
    .orderBy("total_occurrences", ascending=False)

print("Top Services with Warnings and Errors:")
service_metrics.show()

In [None]:
# Stop the Spark context to free up memory
spark.stop()