# PySpark Cheat Sheet (Advanced)
### Delta Lake on AWS | Apache Tika for Unstructured Data | JSON Parsing

## 🧪 Delta Lake on AWS (Databricks)

### Setup Delta Table

In [None]:
# Writing Delta table
df.write.format("delta").mode("overwrite").save("s3://your-bucket/path/to/delta-table")

# Reading Delta table
df = spark.read.format("delta").load("s3://your-bucket/path/to/delta-table")

### Delta Table Management

In [None]:
from delta.tables import DeltaTable

# Create DeltaTable object
deltaTable = DeltaTable.forPath(spark, "s3://your-bucket/path/to/delta-table")

# Update
deltaTable.update(
    condition = "id = 5",
    set = { "status": "'inactive'" }
)

# Delete
deltaTable.delete("status = 'inactive'")

# Merge (Upsert)
deltaTable.alias("t").merge(
    source = updatesDF.alias("u"),
    condition = "t.id = u.id"
).whenMatchedUpdate(set = { "name": "u.name" }) \
 .whenNotMatchedInsert(values = { "id": "u.id", "name": "u.name" }) \
 .execute()

### Time Travel and Versioning

In [None]:
# Read previous version
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("s3://your-bucket/path/to/delta-table")

# Read by timestamp
df_time = spark.read.format("delta").option("timestampAsOf", "2024-01-01").load("s3://your-bucket/path")

### Optimize and Vacuum

In [None]:
# Optimize for performance
spark.sql("OPTIMIZE delta.`s3://your-bucket/path/to/delta-table`")

# Remove old data files
spark.sql("VACUUM delta.`s3://your-bucket/path/to/delta-table` RETAIN 168 HOURS")

## 📄 Parsing Unstructured Data with Apache Tika in PySpark

### Setup Apache Tika

In [None]:
# Install dependencies
# pip install tika

from tika import parser

# Parse document (PDF, Word, etc.)
parsed = parser.from_file("/path/to/document.pdf")
text = parsed["content"]

### Convert Parsed Output to DataFrame

In [None]:
from pyspark.sql import Row

# Example with list of files
files = ["/docs/file1.pdf", "/docs/file2.docx"]

# Parse files into rows
rows = [Row(filename=f, content=parser.from_file(f)["content"]) for f in files]

# Convert to DataFrame
df_unstructured = spark.createDataFrame(rows)
df_unstructured.show(truncate=100)

### Clean and Tokenize Text

In [None]:
from pyspark.sql.functions import regexp_replace, lower, split

df_cleaned = df_unstructured.withColumn("content",
    regexp_replace(lower(col("content")), "[^a-zA-Z\s]", "")
).withColumn("words", split(col("content"), "\s+"))

## 🧾 JSON Parsing & Transformation in PySpark

### Read JSON with Nested Schema

In [None]:
df_json = spark.read.json("s3://your-bucket/path/data.json", multiLine=True)
df_json.printSchema()
df_json.show(truncate=100)

### Extract and Flatten Nested Fields

In [None]:
df_flat = df_json.select(
    col("user.id").alias("user_id"),
    col("user.name").alias("user_name"),
    col("event.type").alias("event_type"),
    col("timestamp")
)

### Explode Arrays in JSON

In [None]:
from pyspark.sql.functions import explode

df_exploded = df_json.withColumn("tag", explode(col("tags")))
df_exploded.select("tag", "user.name").show()