# Getting Started with PySpark

This notebook is pre-configured to connect to the **Apache Spark cluster**.
The `spark` (SparkSession) and `sc` (SparkContext) variables are automatically available.

---

## 1. Verify Cluster Connection

In [None]:
# spark and sc are auto-injected by the PySpark kernel
print(f"Spark version: {spark.version}")
print(f"Master:        {sc.master}")
print(f"App ID:        {sc.applicationId}")
print(f"App name:      {spark.sparkContext.appName}")

## 2. Create a DataFrame

In [None]:
data = [
    ("Alice", "Engineering", 85000),
    ("Bob", "Marketing", 72000),
    ("Charlie", "Engineering", 92000),
    ("Diana", "Marketing", 68000),
    ("Eve", "Engineering", 95000),
    ("Frank", "Sales", 78000),
]

df = spark.createDataFrame(data, ["name", "department", "salary"])
df.show()

## 3. DataFrame Operations

In [None]:
# Filter and sort
df.filter(df.salary > 75000).orderBy("salary", ascending=False).show()

In [None]:
# Aggregation by department
from pyspark.sql import functions as F

df.groupBy("department").agg(
    F.count("name").alias("employees"),
    F.avg("salary").alias("avg_salary"),
    F.max("salary").alias("max_salary"),
).orderBy("department").show()

## 4. Spark SQL

In [None]:
# Register as SQL table
df.createOrReplaceTempView("employees")

# Query with Spark SQL
spark.sql("""
    SELECT department,
           COUNT(*) AS headcount,
           ROUND(AVG(salary), 0) AS avg_salary
    FROM employees
    GROUP BY department
    ORDER BY avg_salary DESC
""").show()

## 5. Spark SQL Magic Cells

Use `%%sparksql` to write SQL directly in a cell (auto-loaded on startup).

In [None]:
%%sparksql
SELECT name, department, salary
FROM employees
WHERE salary > 80000
ORDER BY salary DESC

## 6. Hetzner Object Storage (S3)

Read and write data directly to Hetzner Object Storage. The S3 credentials are auto-configured via environment variables. The bucket name is available as `HETZNER_S3_BUCKET`.

In [None]:
import os

bucket = os.environ.get("HETZNER_S3_BUCKET", "")
if bucket:
    # Write sample data as CSV to S3
    df.coalesce(1).write.mode("overwrite").option("header", True).csv(f"s3a://{bucket}/sample-data/employees")
    print(f"Written to s3a://{bucket}/sample-data/employees/")

    # Read it back
    df_s3 = spark.read.csv(f"s3a://{bucket}/sample-data/employees", header=True, inferSchema=True)
    df_s3.show()
else:
    print("Hetzner Object Storage not configured (HETZNER_S3_BUCKET is empty)")

## 7. Cluster Info

Check the **Spark Master Web UI** for detailed cluster monitoring:
- Workers, running applications, completed jobs
- Available at `https://spark.YOUR_DOMAIN`

In [None]:
# Cluster resources
print(f"Default parallelism: {sc.defaultParallelism}")
print(f"Spark UI: {sc.uiWebUrl}")