## JSON

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_json, get_json_object, schema_of_json, col
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create Spark session
spark = SparkSession.builder.appName("PySpark JSON Functions").getOrCreate()

# Sample data
data = [
    ("1", '{"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}}'),
    ("2", '{"name":"Bob","age":25,"address":{"city":"Los Angeles","zip":"90001"}}')
]

df = spark.createDataFrame(data, ["id", "person_json"])

# Show initial DataFrame
df.show(truncate=False)


+---+----------------------------------------------------------------------+
|id |person_json                                                           |
+---+----------------------------------------------------------------------+
|1  |{"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}} |
|2  |{"name":"Bob","age":25,"address":{"city":"Los Angeles","zip":"90001"}}|
+---+----------------------------------------------------------------------+



In [2]:
from pyspark.sql.functions import lit

In [3]:
sample = '{"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}}'
json_schema = schema_of_json(lit(sample))
json_schema


Column<'schema_of_json({"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}})'>

In [4]:
df.select(json_schema).show()

+-------------------------------------------------------------------------------------+
|schema_of_json({"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}})|
+-------------------------------------------------------------------------------------+
|                                                                 STRUCT<address: S...|
|                                                                 STRUCT<address: S...|
+-------------------------------------------------------------------------------------+



In [5]:
df.select(json_schema).first()

Row(schema_of_json({"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}})='STRUCT<address: STRUCT<city: STRING, zip: STRING>, age: BIGINT, name: STRING>')

In [6]:
df = df.withColumn('person', from_json('person_json', json_schema))
df.show()

+---+--------------------+--------------------+
| id|         person_json|              person|
+---+--------------------+--------------------+
|  1|{"name":"Alice","...|{{New York, 10001...|
|  2|{"name":"Bob","ag...|{{Los Angeles, 90...|
+---+--------------------+--------------------+



In [7]:
df.select('person.address.city').show()

+-----------+
|       city|
+-----------+
|   New York|
|Los Angeles|
+-----------+



In [8]:
df.select('person.*').show()

+--------------------+---+-----+
|             address|age| name|
+--------------------+---+-----+
|   {New York, 10001}| 30|Alice|
|{Los Angeles, 90001}| 25|  Bob|
+--------------------+---+-----+



In [9]:
df.select('person.address.*').show()

+-----------+-----+
|       city|  zip|
+-----------+-----+
|   New York|10001|
|Los Angeles|90001|
+-----------+-----+



In [10]:
df.select(get_json_object('person_json', '$.address.city')).show()

+--------------------------------------------+
|get_json_object(person_json, $.address.city)|
+--------------------------------------------+
|                                    New York|
|                                 Los Angeles|
+--------------------------------------------+



In [11]:
df.select(to_json('person')).show()

+--------------------+
|     to_json(person)|
+--------------------+
|{"address":{"city...|
|{"address":{"city...|
+--------------------+



In [14]:
from pyspark.sql.functions import json_tuple

# Form rows with selected fields from json column
df.select(json_tuple('person_json', 'name', 'age')).collect()

[Row(c0='Alice', c1='30'), Row(c0='Bob', c1='25')]

In [None]:
person = {"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}}
person["address"]['city']

'New York'

In [None]:
parsed_df = df.withColumn("person", from_json(col("person_json"), json_schema))
parsed_df.show()

+---+--------------------+--------------------+
| id|         person_json|              person|
+---+--------------------+--------------------+
|  1|{"name":"Alice","...|{{New York, 10001...|
|  2|{"name":"Bob","ag...|{{Los Angeles, 90...|
+---+--------------------+--------------------+



In [None]:
parsed_df.select("person.address.*").show(truncate=False)

+-----------+-----+
|city       |zip  |
+-----------+-----+
|New York   |10001|
|Los Angeles|90001|
+-----------+-----+



In [None]:
df.withColumn("city", get_json_object(col("person_json"), "$.address.city")) \
  .withColumn("age", get_json_object(col("person_json"), "$.age")) \
  .show(truncate=False)


+---+----------------------------------------------------------------------+-----------+---+
|id |person_json                                                           |city       |age|
+---+----------------------------------------------------------------------+-----------+---+
|1  |{"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}} |New York   |30 |
|2  |{"name":"Bob","age":25,"address":{"city":"Los Angeles","zip":"90001"}}|Los Angeles|25 |
+---+----------------------------------------------------------------------+-----------+---+



In [None]:
df.select(get_json_object('person_json', '$.address.city').alias('city')).show()

+-----------+
|       city|
+-----------+
|   New York|
|Los Angeles|
+-----------+



In [None]:
df.withColumn("city", get_json_object("person_json", "$.address.city")) \
  .withColumn("age", get_json_object("person_json", "$.age")) \
  .show(truncate=False)


+---+----------------------------------------------------------------------+-----------+---+
|id |person_json                                                           |city       |age|
+---+----------------------------------------------------------------------+-----------+---+
|1  |{"name":"Alice","age":30,"address":{"city":"New York","zip":"10001"}} |New York   |30 |
|2  |{"name":"Bob","age":25,"address":{"city":"Los Angeles","zip":"90001"}}|Los Angeles|25 |
+---+----------------------------------------------------------------------+-----------+---+



In [None]:
df.select(get_json_object(df.person_json, '$.address.city').alias('city')).show()

+-----------+
|       city|
+-----------+
|   New York|
|Los Angeles|
+-----------+



In [None]:
from pyspark.sql.functions import explode

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

data = [("Alice", "apple,banana,orange"), ("Bob", "grape,kiwi")]
df = spark.createDataFrame(data, ["name", "fruits_string"])

df_exploded = df.withColumn("fruits_array", split(df["fruits_string"], ","))
df_exploded = df_exploded.drop("fruits_string").withColumn("fruit", explode("fruits_array"))

df_exploded.show()


+-----+--------------------+------+
| name|        fruits_array| fruit|
+-----+--------------------+------+
|Alice|[apple, banana, o...| apple|
|Alice|[apple, banana, o...|banana|
|Alice|[apple, banana, o...|orange|
|  Bob|       [grape, kiwi]| grape|
|  Bob|       [grape, kiwi]|  kiwi|
+-----+--------------------+------+



In [None]:
from pyspark.sql.functions import struct

# Create a new struct and convert to JSON
structured_df = df.withColumn("new_json", to_json(struct(lit("Eve").alias("name"), lit(40).alias("age"))))
structured_df.select("id", "new_json").show(truncate=False)

+---+-----------------------+
|id |new_json               |
+---+-----------------------+
|1  |{"name":"Eve","age":40}|
|2  |{"name":"Eve","age":40}|
+---+-----------------------+

