In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("Reading and Parsing JSON Files/Data").getOrCreate()
)

In [2]:
# Read Single line JSON file

df_single = spark.read.format("json").load("data/input/order_singleline.json")

In [3]:
df_single.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [4]:
df_single.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [7]:
# Read Multi line JSON file

df_multi = spark.read.format("json").option("multiline", True).load("data/input/order_multiline.json") # for multiline --> option("multiline", True)

In [8]:
df_multi.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [9]:
df_multi.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [10]:
df = spark.read.format("text").load("data/input/order_singleline.json")

In [11]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [13]:
df.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"order_id":"O101","customer_id":"C001","order_line_items":[{"item_id":"I001","qty":6,"amount":102.45},{"item_id":"I003","qty":2,"amount":2.01}],"contact":[9000010000,9000010001]}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [14]:
# With Schema

# define schema to view the selected columns defined under schema
_schema = "customer_id string, order_id string, contact array<long>"

df_schema = spark.read.format("json").schema(_schema).load("data/input/order_singleline.json")

In [15]:
df_schema.show()

+-----------+--------+--------------------+
|customer_id|order_id|             contact|
+-----------+--------+--------------------+
|       C001|    O101|[9000010000, 9000...|
+-----------+--------+--------------------+



In [19]:
# specify a complex schema
    
# note that contact array was of type 'long', now it's changed to 'string'
_schema = "contact array<string>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"
df_schema_new = spark.read.format("json").schema(_schema).load("data/input/order_singleline.json")

In [20]:
df_schema_new.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [21]:
df_schema_new.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [24]:
# Function from_json to read from a column

_schema = "contact array<string>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"

from pyspark.sql.functions import from_json

# adding column 'parsed' with value of 'value' column of 'df' dataframe
df_expanded = df.withColumn("parsed", from_json(df.value, _schema))

In [25]:
df_expanded.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- contact: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- order_id: string (nullable = true)
 |    |-- order_line_items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- amount: double (nullable = true)
 |    |    |    |-- item_id: string (nullable = true)
 |    |    |    |-- qty: long (nullable = true)



In [26]:
df_expanded.show()

+--------------------+--------------------+
|               value|              parsed|
+--------------------+--------------------+
|{"order_id":"O101...|{[9000010000, 900...|
+--------------------+--------------------+



In [27]:
# Function to_json to parse a JSON string
from pyspark.sql.functions import to_json

df_unparsed = df_expanded.withColumn("unparsed", to_json(df_expanded.parsed))

In [28]:
df_unparsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- contact: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- order_id: string (nullable = true)
 |    |-- order_line_items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- amount: double (nullable = true)
 |    |    |    |-- item_id: string (nullable = true)
 |    |    |    |-- qty: long (nullable = true)
 |-- unparsed: string (nullable = true)



In [29]:
df_unparsed.select("unparsed").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|unparsed                                                                                                                                                                               |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"contact":["9000010000","9000010001"],"customer_id":"C001","order_id":"O101","order_line_items":[{"amount":102.45,"item_id":"I001","qty":6},{"amount":2.01,"item_id":"I003","qty":2}]}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [36]:
# Get values from Parsed JSON

# observe 'parsed.*', it means it wil select all the key under 'parsed' key
df_1 = df_expanded.select("parsed.*")

In [37]:
df_1.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [38]:
from pyspark.sql.functions import explode

df_2 = df_1.withColumn("expanded_line_items", explode("order_line_items"))

In [39]:
df_2.show()

+--------------------+-----------+--------+--------------------+-------------------+
|             contact|customer_id|order_id|    order_line_items|expanded_line_items|
+--------------------+-----------+--------+--------------------+-------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|  {102.45, I001, 6}|
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|    {2.01, I003, 2}|
+--------------------+-----------+--------+--------------------+-------------------+



In [47]:
# df_3 = df_2.select("contact", "customer_id", "order_id", "expanded_line_items.amount") --> to select particular value under key 'expanded_line_items'
# df_3 = df_2.select("contact", "customer_id", "order_id", "expanded_line_items.item_id")
# df_3 = df_2.select("contact", "customer_id", "order_id", "expanded_line_items.qty")

df_3 = df_2.select("contact", "customer_id", "order_id", "expanded_line_items.*")

In [48]:
df_3.show()

+--------------------+-----------+--------+------+-------+---+
|             contact|customer_id|order_id|amount|item_id|qty|
+--------------------+-----------+--------+------+-------+---+
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|
+--------------------+-----------+--------+------+-------+---+



In [49]:
# Explode Array fields
# Explode function is one of the most frequently asked in interviews

df_final = df_3.withColumn("contact_expanded", explode("contact"))

In [50]:
df_final.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- item_id: string (nullable = true)
 |-- qty: long (nullable = true)
 |-- contact_expanded: string (nullable = true)



In [51]:
df_final.show()

+--------------------+-----------+--------+------+-------+---+----------------+
|             contact|customer_id|order_id|amount|item_id|qty|contact_expanded|
+--------------------+-----------+--------+------+-------+---+----------------+
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|      9000010000|
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|      9000010001|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|      9000010000|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|      9000010001|
+--------------------+-----------+--------+------+-------+---+----------------+



In [53]:
df_final.drop("contact").show()  # 'contact' col is exploded to 'contact_expanded', so dropped as values are same

+-----------+--------+------+-------+---+----------------+
|customer_id|order_id|amount|item_id|qty|contact_expanded|
+-----------+--------+------+-------+---+----------------+
|       C001|    O101|102.45|   I001|  6|      9000010000|
|       C001|    O101|102.45|   I001|  6|      9000010001|
|       C001|    O101|  2.01|   I003|  2|      9000010000|
|       C001|    O101|  2.01|   I003|  2|      9000010001|
+-----------+--------+------+-------+---+----------------+



In [54]:
# key concepts
# expand the json
# flatten the json

In [55]:
spark.stop()