In [0]:
# Read JSON file into DataFrame
df = spark.read.option("multiline", "true").json('dbfs:/FileStore/movie_sample.json')

In [0]:
# Show the first few rows of the DataFrame
# df.show(truncate=False)

# Print the schema of the DataFrame
df.printSchema()

root
 |-- box_office: struct (nullable = true)
 |    |-- budget: long (nullable = true)
 |    |-- gross: long (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dob: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- director: struct (nullable = true)
 |    |-- dob: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [0]:
# Flatten the DataFrame by selecting nested fields explicitly
flattened_df = df.select(
    "box_office.budget",
    "box_office.gross",
    "cast",
    "director.dob",
    "director.name",
    "genre",
    "title",
    "year"
)
flattened_df.printSchema()
display(flattened_df)

root
 |-- budget: long (nullable = true)
 |-- gross: long (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dob: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



budget,gross,cast,dob,name,genre,title,year
70000000,2300000000,"List(List(1965-03-14, Aamir Khan, Mahavir Singh Phogat), List(1992-01-11, Fatima Sana Shaikh, Geeta Phogat))",1973-05-26,Nitesh Tiwari,"Biography, Drama, Sport",Dangal,2016
180000000,650000000,"List(List(1979-10-23, Prabhas, Shivudu / Mahendra Baahubali), List(1984-12-14, Rana Daggubati, Bhallaladeva))",1973-10-10,S.S. Rajamouli,"Action, Drama",Baahubali: The Beginning,2015
55000000,460000000,"List(List(1965-03-14, Aamir Khan, Rancho), List(1970-06-01, R. Madhavan, Farhan Qureshi), List(1979-04-28, Sharman Joshi, Raju Rastogi))",1962-11-20,Rajkumar Hirani,"Comedy, Drama",3 Idiots,2009


In [0]:
from pyspark.sql.functions import explode

# Explode the array column
exploded_df = flattened_df.withColumn("cast", explode(flattened_df["cast"]))

# Remove column
exploded_df = exploded_df.drop("budget", "gross")

# Rename columns
exploded_df = exploded_df.withColumnRenamed("dob", "director_dob").withColumnRenamed("name", "director_name")

exploded_df.printSchema()

display(exploded_df)

root
 |-- cast: struct (nullable = true)
 |    |-- dob: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- director_dob: string (nullable = true)
 |-- director_name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



cast,director_dob,director_name,genre,title,year
"List(1965-03-14, Aamir Khan, Mahavir Singh Phogat)",1973-05-26,Nitesh Tiwari,"Biography, Drama, Sport",Dangal,2016
"List(1992-01-11, Fatima Sana Shaikh, Geeta Phogat)",1973-05-26,Nitesh Tiwari,"Biography, Drama, Sport",Dangal,2016
"List(1979-10-23, Prabhas, Shivudu / Mahendra Baahubali)",1973-10-10,S.S. Rajamouli,"Action, Drama",Baahubali: The Beginning,2015
"List(1984-12-14, Rana Daggubati, Bhallaladeva)",1973-10-10,S.S. Rajamouli,"Action, Drama",Baahubali: The Beginning,2015
"List(1965-03-14, Aamir Khan, Rancho)",1962-11-20,Rajkumar Hirani,"Comedy, Drama",3 Idiots,2009
"List(1970-06-01, R. Madhavan, Farhan Qureshi)",1962-11-20,Rajkumar Hirani,"Comedy, Drama",3 Idiots,2009
"List(1979-04-28, Sharman Joshi, Raju Rastogi)",1962-11-20,Rajkumar Hirani,"Comedy, Drama",3 Idiots,2009


In [0]:
# Select the individual columns from the parsed data
df_final = exploded_df.select(
    exploded_df["director_dob"],
    exploded_df["director_name"],
    exploded_df["genre"],
    exploded_df["title"],
    exploded_df["year"],
    exploded_df["cast.dob"].alias("date_of_birth"),
    exploded_df["cast.name"].alias("actor_name"),
    exploded_df["cast.role"].alias("actor_role")
)

display(df_final)

director_dob,director_name,genre,title,year,date_of_birth,actor_name,actor_role
1973-05-26,Nitesh Tiwari,"Biography, Drama, Sport",Dangal,2016,1965-03-14,Aamir Khan,Mahavir Singh Phogat
1973-05-26,Nitesh Tiwari,"Biography, Drama, Sport",Dangal,2016,1992-01-11,Fatima Sana Shaikh,Geeta Phogat
1973-10-10,S.S. Rajamouli,"Action, Drama",Baahubali: The Beginning,2015,1979-10-23,Prabhas,Shivudu / Mahendra Baahubali
1973-10-10,S.S. Rajamouli,"Action, Drama",Baahubali: The Beginning,2015,1984-12-14,Rana Daggubati,Bhallaladeva
1962-11-20,Rajkumar Hirani,"Comedy, Drama",3 Idiots,2009,1965-03-14,Aamir Khan,Rancho
1962-11-20,Rajkumar Hirani,"Comedy, Drama",3 Idiots,2009,1970-06-01,R. Madhavan,Farhan Qureshi
1962-11-20,Rajkumar Hirani,"Comedy, Drama",3 Idiots,2009,1979-04-28,Sharman Joshi,Raju Rastogi


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType

# Sample data
data = [(1, '{"dob": "1965-03-14", "name": "Aamir Khan", "role": "Mahavir Singh Phogat"}'),
        (2, '{"dob": "1992-01-11", "name": "Fatima Sana Shaikh", "role": "Geeta Phogat"}')]

# Create DataFrame
df = spark.createDataFrame(data, ["id", "json_data"])

# Show the resulting DataFrame
df.show()

df.printSchema()

# Define the schema for the JSON data
schema = StructType([
    StructField("dob", StringType(), True),
    StructField("name", StringType(), True),
    StructField("role", StringType(), True)
])

# Parse the JSON column into separate columns
df_parsed = df.withColumn("parsed_data", from_json("json_data", schema))

# Select the individual columns from the parsed data
df_final = df_parsed.select("id", "parsed_data.dob", "parsed_data.name", "parsed_data.role")

# Show the resulting DataFrame
df_final.show()


+---+--------------------+
| id|           json_data|
+---+--------------------+
|  1|{"dob": "1965-03-...|
|  2|{"dob": "1992-01-...|
+---+--------------------+

root
 |-- id: long (nullable = true)
 |-- json_data: string (nullable = true)

+---+----------+------------------+--------------------+
| id|       dob|              name|                role|
+---+----------+------------------+--------------------+
|  1|1965-03-14|        Aamir Khan|Mahavir Singh Phogat|
|  2|1992-01-11|Fatima Sana Shaikh|        Geeta Phogat|
+---+----------+------------------+--------------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Sample data
data = [(1, {"dob": "1965-03-14", "name": "Aamir Khan", "role": "Mahavir Singh Phogat"}),
        (2, {"dob": "1992-01-11", "name": "Fatima Sana Shaikh", "role": "Geeta Phogat"})]

# Define schema for the data
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("cast", StructType([
        StructField("dob", StringType(), True),
        StructField("name", StringType(), True),
        StructField("role", StringType(), True)
    ]), True)
])

# Create DataFrame from the sample data
df = spark.createDataFrame(data, schema)
df.printSchema()

# Flatten the structure
flattened_df = df.select(
    "id",
    "cast.dob",
    "cast.name",
    "cast.role"
)

# Show the flattened DataFrame
flattened_df.show()

root
 |-- id: integer (nullable = true)
 |-- cast: struct (nullable = true)
 |    |-- dob: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)

+---+----------+------------------+--------------------+
| id|       dob|              name|                role|
+---+----------+------------------+--------------------+
|  1|1965-03-14|        Aamir Khan|Mahavir Singh Phogat|
|  2|1992-01-11|Fatima Sana Shaikh|        Geeta Phogat|
+---+----------+------------------+--------------------+

