In [1]:
# 1. Install required packages
!pip install -q faker pyspark

# 2. Imports
import json, random
from faker import Faker
from datetime import datetime, timedelta

# 3. Generate a large nested JSON Lines dataset
fake = Faker()
NUM_RECORDS = 10_000     # adjust up/down for scale
OUTPUT_JSONL = "/content/big_movies_dataset.jsonl"

def random_date_str():
    start = datetime.now() - timedelta(days=50*365)
    return (start + timedelta(days=random.randint(0, 50*365))).strftime("%Y-%m-%d")

GENRES = ["Action","Comedy","Drama","Horror","Sci‑Fi","Romance","Thriller","Documentary"]
RATING_SRCS = ["IMDb","Rotten Tomatoes","Metacritic"]

with open(OUTPUT_JSONL, "w") as f:
    for i in range(NUM_RECORDS):
        rec = {
            "movie_id": i+1,
            "title": fake.sentence(nb_words=3).rstrip("."),
            "release_date": random_date_str(),
            "duration_minutes": random.randint(80,180),
            "genres": random.sample(GENRES, k=random.randint(1,3)),
            "director": {
                "name": fake.name(),
                "birthdate": random_date_str(),
                "nationality": fake.country()
            },
            "cast": [
                {"actor_id": fake.uuid4(),
                 "name": fake.name(),
                 "role": fake.job()}
                for _ in range(random.randint(2,5))
            ],
            "ratings": {src: round(random.uniform(1,10),1) for src in RATING_SRCS},
            "box_office": {
                "budget_usd": random.randint(1_000_000,200_000_000),
                "gross_usd": random.randint(1_000_000,800_000_000)
            },
            "metadata": {
                "awards": {
                    "nominations": random.randint(0,20),
                    "wins": random.randint(0,10)
                },
                "languages": random.sample([fake.language_code() for _ in range(10)], k=2),
                "production_companies": [
                    {"company_id": fake.uuid4(),
                     "name": fake.company(),
                     "founded": random_date_str()}
                    for _ in range(random.randint(1,3))
                ]
            }
        }
        f.write(json.dumps(rec) + "\n")
print(f"➡️ Generated {NUM_RECORDS} records at {OUTPUT_JSONL}")

# 4. Start a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("FlattenNestedJSON") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# 5. Read the JSON Lines file
df = spark.read.json(OUTPUT_JSONL)

# 6. Flatten nested columns
from pyspark.sql.functions import col, explode

# Explode arrays to separate rows
df_flat = (
    df
    .withColumn("director_name",       col("director.name"))
    .withColumn("director_birthdate",  col("director.birthdate"))
    .withColumn("director_nationality",col("director.nationality"))
    .drop("director")

    .withColumn("rating_imdb",         col("ratings.IMDb"))
    .withColumn("rating_rt",           col("ratings.`Rotten Tomatoes`"))
    .withColumn("rating_meta",         col("ratings.Metacritic"))
    .drop("ratings")

    .withColumn("budget_usd",          col("box_office.budget_usd"))
    .withColumn("gross_usd",           col("box_office.gross_usd"))
    .drop("box_office")

    # Awards
    .withColumn("award_noms",          col("metadata.awards.nominations"))
    .withColumn("award_wins",          col("metadata.awards.wins"))
    .drop("metadata.awards")

    # Keep languages array as-is or explode if needed
    .withColumn("languages",           col("metadata.languages"))

    # Explode cast and production_companies into separate rows
    .withColumn("cast_member",         explode(col("cast")))
    .withColumn("actor_id",            col("cast_member.actor_id"))
    .withColumn("actor_name",          col("cast_member.name"))
    .withColumn("actor_role",          col("cast_member.role"))
    .drop("cast_member").drop("cast")

    .withColumn("prod_company",        explode(col("metadata.production_companies")))
    .withColumn("company_id",          col("prod_company.company_id"))
    .withColumn("company_name",        col("prod_company.name"))
    .withColumn("company_founded",     col("prod_company.founded"))
    .drop("prod_company").drop("metadata")
)

# 7. Write flattened DataFrame out as Parquet
PARQUET_PATH = "/content/movies_flattened_parquet"
df_flat.write.mode("overwrite").parquet(PARQUET_PATH)
print(f"➡️ Flattened data written to Parquet at {PARQUET_PATH}")

# 8. Register as an external Parquet table in Spark SQL
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS movies_external
  USING PARQUET
  OPTIONS (path '{PARQUET_PATH}')
""")
print("✅ External Parquet table 'movies_external' is ready in Spark SQL.")

# (Optional) Preview
spark.sql("SELECT * FROM movies_external LIMIT 5").show(truncate=False)


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.9 MB[0m [31m3.9 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.7/1.9 MB[0m [31m10.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m18.6 MB/s[0m eta [36m0:00:00[0m
[?25h➡️ Generated 10000 records at /content/big_movies_dataset.jsonl
➡️ Flattened data written to Parquet at /content/movies_flattened_parquet
✅ External Parquet table 'movies_external' is ready in Spark SQL.
+----------------+--------+--------+------------+----------+-------------+------------------+--------------------------+-----------+---------+-----------+----------+---------+----------+----------+---------+------------------------------------+----------------+-------------------+------------