# 2. Flatten Nested JSON Data

**Goal**: Use Spark SQL functions (`explode`, `posexplode`, `col`) to transform the nested JSON hierarchy into a flat, relational table format.

---

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, posexplode

spark = SparkSession.builder \
    .appName("ProjectSpark-Flatten") \
    .getOrCreate()

In [2]:
# Load Raw Data
input_path = "../data/raw/2015-01-01-15.json"
df_raw = spark.read.json(input_path)

# Filter only PushEvents which have 'commits' array in payload
df_push = df_raw.filter(df_raw.type == "PushEvent")
print(f"Push Events to process: {df_push.count()}")

Push Events to process: 5815


## Step 1: Explode and Select
We will explode `payload.commits` array. Each element in the array will generate a new row.

In [3]:
df_flat = (
    df_push
    .select(
        col("id").alias("event_id"),
        col("created_at").alias("event_time"),
        col("actor.login").alias("actor_login"),
        col("repo.name").alias("repo_name"),
        col("payload.head").alias("head_sha"),
        # explode creates a new row for each commit in the array
        explode(col("payload.commits")).alias("commit")
    )
)

df_flat.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- actor_login: string (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- head_sha: string (nullable = true)
 |-- commit: struct (nullable = true)
 |    |-- author: struct (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- distinct: boolean (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- sha: string (nullable = true)
 |    |-- url: string (nullable = true)



## Step 2: Flatten Nested Structs
Now we access fields inside the exploded `commit` struct (Level 4-5 depth).

In [4]:
df_final = df_flat.select(
    col("event_id"),
    col("event_time"),
    col("actor_login"),
    col("repo_name"),
    col("commit.sha").alias("commit_sha"),
    col("commit.author.name").alias("author_name"),
    col("commit.author.email").alias("author_email"),
    col("commit.message").alias("commit_message")
)

df_final.show(5, truncate=False)

+----------+--------------------+-------------+---------------------------+----------------------------------------+-------------------+-------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|event_id  |event_time          |actor_login  |repo_name                  |commit_sha                              |author_name        |author_email                                           |commit_message                                                                                                                                                  |
+----------+--------------------+-------------+---------------------------+----------------------------------------+-------------------+-------------------------------------------------------+----------------------------------------------------------------------------------------------------

## Save Output
Saving to CSV for easy reporting.

In [5]:
output_csv = "../data/processed/github_commits_flat.csv"
output_parquet = "../data/processed/github_commits_flat.parquet"

# Write CSV (coalesce(1) to get single file for easy handling)
# Added quoteAll=True to handle commas in commit messages
df_final.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .option("quoteAll", "true") \
    .csv(output_csv)

# Write Parquet
df_final.write.mode("overwrite").parquet(output_parquet)

print("Data saved successfully!")

Data saved successfully!
