In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("ComplexJSON").getOrCreate()

data = [
    {
        "deviceId": "D001",
        "location": {"city": "Bangalore", "country": "India"},
        "metrics": [
            {"type": "temperature", "value": 30},
            {"type": "humidity", "value": 60}
        ],
        "status": "active",
        "eventTime": "2024-01-01 10:00:00"
    },
    {
        "deviceId": "D002",
        "location": {"city": "Chennai", "country": "India"},
        "metrics": [
            {"type": "temperature", "value": 35},
            {"type": "humidity", "value": 55}
        ],
        "status": "inactive",
        "eventTime": "2024-01-01 11:00:00"
    }
]

df = spark.createDataFrame(data)
df.show(truncate=False)
df.printSchema()


+--------+-------------------+-------------------------------------+---------------------------------------------------------------------+--------+
|deviceId|eventTime          |location                             |metrics                                                              |status  |
+--------+-------------------+-------------------------------------+---------------------------------------------------------------------+--------+
|D001    |2024-01-01 10:00:00|{city -> Bangalore, country -> India}|[{type -> temperature, value -> 30}, {type -> humidity, value -> 60}]|active  |
|D002    |2024-01-01 11:00:00|{city -> Chennai, country -> India}  |[{type -> temperature, value -> 35}, {type -> humidity, value -> 55}]|inactive|
+--------+-------------------+-------------------------------------+---------------------------------------------------------------------+--------+

root
 |-- deviceId: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- location: map (nulla

In [0]:
df1 = df.select(
    "deviceId",
    col("location.city").alias("city"),
    col("location.country").alias("country"),
    "status",
    "eventTime"
)

df1.show()


+--------+---------+-------+--------+-------------------+
|deviceId|     city|country|  status|          eventTime|
+--------+---------+-------+--------+-------------------+
|    D001|Bangalore|  India|  active|2024-01-01 10:00:00|
|    D002|  Chennai|  India|inactive|2024-01-01 11:00:00|
+--------+---------+-------+--------+-------------------+



In [0]:
df2 = df.select(
    "deviceId",
    explode("metrics").alias("metric"),
    "status"
)

df2.show(truncate=False)


+--------+----------------------------------+--------+
|deviceId|metric                            |status  |
+--------+----------------------------------+--------+
|D001    |{type -> temperature, value -> 30}|active  |
|D001    |{type -> humidity, value -> 60}   |active  |
|D002    |{type -> temperature, value -> 35}|inactive|
|D002    |{type -> humidity, value -> 55}   |inactive|
+--------+----------------------------------+--------+



In [0]:
df3 = df2.select(
    "deviceId",
    col("metric.type").alias("metric_type"),
    col("metric.value").alias("metric_value"),
    "status"
)

df3.show()


+--------+-----------+------------+--------+
|deviceId|metric_type|metric_value|  status|
+--------+-----------+------------+--------+
|    D001|temperature|          30|  active|
|    D001|   humidity|          60|  active|
|    D002|temperature|          35|inactive|
|    D002|   humidity|          55|inactive|
+--------+-----------+------------+--------+



AGGREGATION

In [0]:
avg_metric = df3.groupBy("metric_type") \
                .agg(avg("metric_value").alias("avg_value"))

avg_metric.show()


+-----------+---------+
|metric_type|avg_value|
+-----------+---------+
|temperature|     32.5|
|   humidity|     57.5|
+-----------+---------+



In [0]:
max_temp = df3.filter(col("metric_type") == "temperature") \
              .groupBy("deviceId") \
              .agg(max("metric_value").alias("max_temperature"))

max_temp.show()


+--------+---------------+
|deviceId|max_temperature|
+--------+---------------+
|    D001|             30|
|    D002|             35|
+--------+---------------+



Conditional aggregation

In [0]:
status_count = df3.groupBy("status") \
                  .agg(count("*").alias("total_events"))

status_count.show()

+--------+------------+
|  status|total_events|
+--------+------------+
|  active|           2|
|inactive|           2|
+--------+------------+



In [0]:
pivot_df = df3.groupBy("deviceId") \
              .pivot("metric_type") \
              .agg(avg("metric_value"))

pivot_df.show()


+--------+--------+-----------+
|deviceId|humidity|temperature|
+--------+--------+-----------+
|    D001|    60.0|       30.0|
|    D002|    55.0|       35.0|
+--------+--------+-----------+



TIME based aggregation

In [0]:
df_time = df.withColumn(
    "eventTime",
    to_timestamp("eventTime")
)


In [0]:
df_time_exploded = df_time \
    .withColumn("metric", explode("metrics")) \
    .select(
        "deviceId",
        hour("eventTime").alias("hour"),
        col("metric.type").alias("type"),
        col("metric.value").alias("value")
    )

df_time_exploded \
    .filter(col("type") == "temperature") \
    .groupBy("hour") \
    .agg(avg("value").alias("avg_temp")) \
    .show()


+----+--------+
|hour|avg_temp|
+----+--------+
|  10|    30.0|
|  11|    35.0|
+----+--------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

spark = SparkSession.builder.appName("JSON Ops").getOrCreate()


In [0]:
data = [
    {
        "deviceId": "D001",
        "location": {"city": "Bangalore", "country": "India"},
        "metrics": [
            {"type": "temperature", "value": 30},
            {"type": "humidity", "value": 60}
        ]
    }
]

df = spark.createDataFrame(data)


In [0]:
df.show(truncate=False)
df.printSchema()


+--------+-------------------------------------+---------------------------------------------------------------------+
|deviceId|location                             |metrics                                                              |
+--------+-------------------------------------+---------------------------------------------------------------------+
|D001    |{city -> Bangalore, country -> India}|[{type -> temperature, value -> 30}, {type -> humidity, value -> 60}]|
+--------+-------------------------------------+---------------------------------------------------------------------+

root
 |-- deviceId: string (nullable = true)
 |-- location: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- metrics: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)



In [0]:
df.select("deviceId", "location.city").show()


+--------+---------+
|deviceId|     city|
+--------+---------+
|    D001|Bangalore|
+--------+---------+



In [0]:
df.select("deviceId", explode("metrics").alias("metric")) \
  .select("deviceId", "metric.type", "metric.value") \
  .show()


+--------+-----------+-----+
|deviceId|       type|value|
+--------+-----------+-----+
|    D001|temperature|   30|
|    D001|   humidity|   60|
+--------+-----------+-----+



In [0]:
json_df = spark.createDataFrame(
    [('{"deviceId":"D002","status":"active"}',)],
    ["json_str"]
)

schema = StructType([
    StructField("deviceId", StringType()),
    StructField("status", StringType())
])

json_df.select(from_json("json_str", schema).alias("data")) \
       .select("data.*") \
       .show()


+--------+------+
|deviceId|status|
+--------+------+
|    D002|active|
+--------+------+



In [0]:
df.write.mode("overwrite").json("output_json")


[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-7576252510737378>, line 1[0m
[0;32m----> 1[0m df[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39mjson([38;5;124m"[39m[38;5;124moutput_json[39m[38;5;124m"[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/readwriter.py:764[0m, in [0;36mDataFrameWriter.json[0;34m(self, path, mode, compression, dateFormat, timestampFormat, lineSep, encoding, ignoreNullFields)[0m
[1;32m    755[0m [38;5;28mself[39m[38;5;241m.[39mmode(mode)
[1;32m    756[0m [38;5;28mself[39m[38;5;241m.[39m_set_opts(
[1;32m    757[0m     compression[38;5;241m=[39mcompression,
[1;32m    758[0m     dateFormat[38;5;241m=[39mdateFormat,
[0;32m   (...)[0m
[1;32m    762[0m     ignoreNullFields[38;