In [0]:
# SparkSession
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName('Databricks Shell')
    .master('local[*]')
    .getOrCreate()
)

spark

In [0]:
# Read single line JSON file

df_single = spark.read.format('json').load('/FileStore/tables/order_singleline.json')

In [0]:
df_single.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [0]:
df_single.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [0]:
# Read multiple line JSON file

df_multi = spark.read.format('json').option('multiline', True).load('/FileStore/tables/order_multiline.json')

In [0]:
df_multi.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [0]:
df_multi.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [0]:
df = spark.read.format('text').load('/FileStore/tables/order_singleline.json')

In [0]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [0]:
df.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"order_id":"O101","customer_id":"C001","order_line_items":[{"item_id":"I001","qty":6,"amount":102.45},{"item_id":"I003","qty":2,"amount":2.01}],"contact":[9000010000,9000010001]}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [0]:
# with schema

_schema = "customer_id string, order_id string, contact array<long>"

df_schema = spark.read.format('json').schema(_schema).load('/FileStore/tables/order_singleline.json')

In [0]:
df_schema.show()

+-----------+--------+--------------------+
|customer_id|order_id|             contact|
+-----------+--------+--------------------+
|       C001|    O101|[9000010000, 9000...|
+-----------+--------+--------------------+



In [0]:
root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)

In [0]:
_schema = "contact array<long>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"

In [0]:
df_schema_new = spark.read.format('json').schema(_schema).load('/FileStore/tables/order_singleline.json')

In [0]:
df_schema_new.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [0]:
df_schema_new.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [0]:
# Function to_json to parse a JSON string
_schema = "contact array<long>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"

from pyspark.sql.functions import from_json

df_expanded = df.withColumn('parsed', from_json(df.value, _schema))

In [0]:
df_expanded.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- contact: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- order_id: string (nullable = true)
 |    |-- order_line_items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- amount: double (nullable = true)
 |    |    |    |-- item_id: string (nullable = true)
 |    |    |    |-- qty: long (nullable = true)



In [0]:
df_expanded.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------+
|value                                                                                                                                                                              |parsed                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------+
|{"order_id":"O101","customer_id":"C001","order_line_items":[{"item_id":"I001","qty":6,"amount":102.45},{"item_id":"I003","qty":2,"amount":2.01}],"contact":[9000010000,9000010001]}|{[9000010000, 9000010001], C001, O101, [{1

In [0]:
# function to_json to parse a JSON string
from pyspark.sql.functions import to_json
df_unparsed = df_expanded.withColumn('unparsed', to_json(df_expanded.parsed))

In [0]:
df_unparsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- contact: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- order_id: string (nullable = true)
 |    |-- order_line_items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- amount: double (nullable = true)
 |    |    |    |-- item_id: string (nullable = true)
 |    |    |    |-- qty: long (nullable = true)
 |-- unparsed: string (nullable = true)



In [0]:
df_unparsed.show()

+--------------------+--------------------+--------------------+
|               value|              parsed|            unparsed|
+--------------------+--------------------+--------------------+
|{"order_id":"O101...|{[9000010000, 900...|{"contact":[90000...|
+--------------------+--------------------+--------------------+



In [0]:
df_unparsed.select('unparsed').show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|unparsed                                                                                                                                                                           |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"contact":[9000010000,9000010001],"customer_id":"C001","order_id":"O101","order_line_items":[{"amount":102.45,"item_id":"I001","qty":6},{"amount":2.01,"item_id":"I003","qty":2}]}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [0]:
# get value from parsed JSON

df_expanded.show()

+--------------------+--------------------+
|               value|              parsed|
+--------------------+--------------------+
|{"order_id":"O101...|{[9000010000, 900...|
+--------------------+--------------------+



In [0]:

# get values from parsed JSON

df_1 = df_expanded.select('parsed.*')

In [0]:
from pyspark.sql.functions import explode

df_2 = df_1.withColumn('expanded_line_items', explode('order_line_items'))

In [0]:
 df_2.show(truncate=False)

+------------------------+-----------+--------+------------------------------------+-------------------+
|contact                 |customer_id|order_id|order_line_items                    |expanded_line_items|
+------------------------+-----------+--------+------------------------------------+-------------------+
|[9000010000, 9000010001]|C001       |O101    |[{102.45, I001, 6}, {2.01, I003, 2}]|{102.45, I001, 6}  |
|[9000010000, 9000010001]|C001       |O101    |[{102.45, I001, 6}, {2.01, I003, 2}]|{2.01, I003, 2}    |
+------------------------+-----------+--------+------------------------------------+-------------------+



In [0]:
df_3 = df_2.select('contact', 'customer_id', 'order_id', 'expanded_line_items.*')

In [0]:
df_3.show()

+--------------------+-----------+--------+------+-------+---+
|             contact|customer_id|order_id|amount|item_id|qty|
+--------------------+-----------+--------+------+-------+---+
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|
+--------------------+-----------+--------+------+-------+---+



In [0]:
df_final = df_3.withColumn('contact_expanded', explode('contact'))

In [0]:
df_final.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- item_id: string (nullable = true)
 |-- qty: long (nullable = true)
 |-- contact_expanded: long (nullable = true)



In [0]:
df_final.show()

+--------------------+-----------+--------+------+-------+---+----------------+
|             contact|customer_id|order_id|amount|item_id|qty|contact_expanded|
+--------------------+-----------+--------+------+-------+---+----------------+
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|      9000010000|
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|      9000010001|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|      9000010000|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|      9000010001|
+--------------------+-----------+--------+------+-------+---+----------------+

