JSON Functions	    | Description
--------------------|---------------------
from_json()       	| Converts JSON string into Struct type or Map type.
--------------------|----------------
to_json()	        | Converts MapType or Struct type to JSON string.
--------------------|-------------
json_tuple()	    | Extract the Data from JSON and create them as a new columns.
--------------------|------------------
get_json_object()	| Extracts JSON element from a JSON string based on json path specified.
--------------------|-------------------
schema_of_json()	| Create schema string from JSON string

##  Create DataFrame with Column containing JSON String


In [1]:
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

jsonString="""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
df=spark.createDataFrame([(1, jsonString)],["id","value"])
df.show(truncate=False)


24/08/29 11:27:55 WARN Utils: Your hostname, manu-pc resolves to a loopback address: 127.0.1.1; using 192.168.157.41 instead (on interface wlp58s0)
24/08/29 11:27:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/29 11:27:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/29 11:27:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+



24/08/29 11:28:10 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [15]:
jsonString="""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
df=spark.createDataFrame([(1, jsonString)],["id","value"])
df.show(truncate=False)

+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+



In [16]:
#Convert JSON string column to Map type
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)

root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+



In [6]:
# to_json
from pyspark.sql.functions import to_json,col
df2.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)

+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+



In [8]:
# get_json_object()

from pyspark.sql.functions import get_json_object

df.select("id", get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")).show(truncate=False)

+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1  |STANDARD   |
+---+-----------+



In [9]:
# schema_of_json()

# Use schema_of_json() to create schema string from JSON string column.


from pyspark.sql.functions import schema_of_json, lit

schemaStr = spark.range(1).select(schema_of_json(lit("""{"Zipcode": 704, "ZipCodeType": "STANDARD", "City": "PARC PARQUE", "State": "PR"}""")))\
    .collect()[0][0]

print(schemaStr)



STRUCT<City: STRING, State: STRING, ZipCodeType: STRING, Zipcode: BIGINT>


## 3. Complete Example of PySpark JSON Functions

In [10]:
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

jsonString="""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
df=spark.createDataFrame([(1, jsonString)],["id","value"])
df.show(truncate=False)

#Convert JSON string column to Map type
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)

from pyspark.sql.functions import to_json,col
df2.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)

from pyspark.sql.functions import json_tuple
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
    .toDF("id","Zipcode","ZipCodeType","City") \
    .show(truncate=False)

from pyspark.sql.functions import get_json_object
df.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")) \
    .show(truncate=False)

from pyspark.sql.functions import schema_of_json,lit
schemaStr=spark.range(1) \
    .select(schema_of_json(lit("""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""))) \
    .collect()[0][0]
print(schemaStr)

+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+

root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+

+---+---------------------------------

# Json file read

In [17]:
df = spark.read.load(
    'example.json', 
    format='json',
    multiLine=True, 
    schema=None)

df.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------+-----------------------------------------------+-------+
|Components                                                                                                                                                                                                                                                                                                                                                                                                

In [22]:
from math import trunc
from os import truncate
from pyspark.sql.functions import explode,col

df.withColumn("components",explode(col('components'))).show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------+-----------------------------------------------+-------+
|components                                                                                                                                                                                                                                                                                                                                                        |Name          |ToolVendor|ToolVersion                                    |Version|
+-------------------------------------------------------------------------------------------------------------------------

In [27]:
df.printSchema()

root
 |-- Components: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Copyright: string (nullable = true)
 |    |    |-- Data: string (nullable = true)
 |    |    |-- Groups: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- Hash: struct (nullable = true)
 |    |    |    |-- Algorithm: string (nullable = true)
 |    |    |    |-- Content: string (nullable = true)
 |    |    |-- IsRoot: boolean (nullable = true)
 |    |    |-- Kind: string (nullable = true)
 |    |    |-- License: struct (nullable = true)
 |    |    |    |-- Id: string (nullable = true)
 |    |    |    |-- Name: string (nullable = true)
 |    |    |    |-- Url: string (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Purl: string (nullable = true)
 |    |    |-- UUID: string (nullable = true)
 |    |    |-- Version: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- ToolVendor: string (nullab

In [28]:
df.select('components')

+--------------------+
|          components|
+--------------------+
|[{NULL, 1.0.0, [P...|
+--------------------+



### conclusion

To read the components of a json file create a df like below by identifying the valid keys

In [43]:
df_exploded = df.withColumn('Components',explode(col('Components')))

df_exploded.select(
col("Components.Name"),
col("Components.Data"),
col("Components.UUID"),
col("Components.Purl")


).show()

+---------------+------+--------------------+--------------------+
|           Name|  Data|                UUID|                Purl|
+---------------+------+--------------------+--------------------+
| PySparkExample| 1.0.0|fd90c2b1-4450-4c7...|pkg:covenant/dotn...|
|Newtonsoft.Json|13.0.2|e2701915-6fcc-4a4...|pkg:nuget/Newtons...|
+---------------+------+--------------------+--------------------+



In [46]:
df_single_partition = df.coalesce(1)

df_single_partition.write.mode('overwrite').json('example2.json')

## reading with pandas

In [29]:
import pandas as pd 

j_df = pd.read_json('example.json')
print(j_df['Components'])

0    {'Data': '1.0.0', 'UUID': 'fd90c2b1-4450-4c72-...
1    {'Data': '13.0.2', 'UUID': 'e2701915-6fcc-4a4c...
Name: Components, dtype: object


In [32]:
from IPython.display import display
display(j_df['Components'])


0    {'Data': '1.0.0', 'UUID': 'fd90c2b1-4450-4c72-...
1    {'Data': '13.0.2', 'UUID': 'e2701915-6fcc-4a4c...
Name: Components, dtype: object

In [33]:
components_df = j_df['Components'].apply(pd.Series)
components_df

Unnamed: 0,Data,UUID,Purl,Name,Version,Groups,IsRoot,Kind,Copyright,Hash,License
0,1.0.0,fd90c2b1-4450-4c72-b3f2-f395ba161749,pkg:covenant/dotnet/PySparkExample@1.0.0,PySparkExample,1.0.0,[PySparkExample.sln],True,,,,
1,13.0.2,e2701915-6fcc-4a4c-aa5e-f068b43bebe4,pkg:nuget/Newtonsoft.Json@13.0.2,Newtonsoft.Json,13.0.2,,,Library,Copyright © James Newton-King 2008,"{'Algorithm': 'SHA512', 'Content': 'D743AE673B...","{'Id': 'MIT', 'Name': 'MIT License', 'Url': 'h..."


### writing the data to mongo



In [48]:
# df to dict 
data = components_df.to_dict(orient='records')
data

[{'Data': '1.0.0',
  'UUID': 'fd90c2b1-4450-4c72-b3f2-f395ba161749',
  'Purl': 'pkg:covenant/dotnet/PySparkExample@1.0.0',
  'Name': 'PySparkExample',
  'Version': '1.0.0',
  'Groups': ['PySparkExample.sln'],
  'IsRoot': True,
  'Kind': nan,
  'Copyright': nan,
  'Hash': nan,
  'License': nan},
 {'Data': '13.0.2',
  'UUID': 'e2701915-6fcc-4a4c-aa5e-f068b43bebe4',
  'Purl': 'pkg:nuget/Newtonsoft.Json@13.0.2',
  'Name': 'Newtonsoft.Json',
  'Version': '13.0.2',
  'Groups': nan,
  'IsRoot': nan,
  'Kind': 'Library',
  'Copyright': 'Copyright © James Newton-King 2008',
  'Hash': {'Algorithm': 'SHA512',
   'Content': 'D743AE673BAC17FDBF53C05983DBA2FFDB99D7E6AF8CF5FE008D57AA30B6C6CA615D672C4140EEC516E529EB6AD5ACF29C20B5CC059C86F98C80865652ACDDE1'},
  'License': {'Id': 'MIT',
   'Name': 'MIT License',
   'Url': 'https://licenses.nuget.org/MIT'}}]

In [49]:
# testing connection with python first

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')

db = client['trial']
collection = db['collection1']
collection.insert_many(data)
client.close()

### trying to write data using pyspark

In [1]:
from pyspark.sql import SparkSession

mongo_spark_package = "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1"

spark = SparkSession.builder \
    .appName("MongoDBIntegration2") \
    .config("spark.jars.packages", mongo_spark_package) \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/trial.collection1") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/trial.collection1") \
    .getOrCreate()

# df = spark.read.format('com.mongodb.spark.sql.DefaultSource').load()
# df.show()

24/08/29 14:05:49 WARN Utils: Your hostname, manu-pc resolves to a loopback address: 127.0.1.1; using 192.168.157.41 instead (on interface wlp58s0)
24/08/29 14:05:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/manu/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/manu/.ivy2/cache
The jars for the packages stored in: /home/manu/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5c4e9459-30e1-4843-9887-c8923bd3f583;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 292ms :: artifacts dl 12ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |

In [2]:
# first testing the read if it works then write
df = spark.read.format('com.mongodb.spark.sql.DefaultSource').load()
df.show()


24/08/29 14:05:56 WARN MongoInferSchema: Field 'Copyright' contains conflicting types converting to StringType
24/08/29 14:05:56 WARN MongoInferSchema: Field 'Groups' contains conflicting types converting to StringType
24/08/29 14:05:56 WARN MongoInferSchema: Field 'Hash' contains conflicting types converting to StringType
24/08/29 14:05:56 WARN MongoInferSchema: Field 'IsRoot' contains conflicting types converting to StringType
24/08/29 14:05:56 WARN MongoInferSchema: Field 'Kind' contains conflicting types converting to StringType
24/08/29 14:05:56 WARN MongoInferSchema: Field 'License' contains conflicting types converting to StringType
[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+------+--------------------+--------------------+------+-------+--------------------+---------------+--------------------+--------------------+-------+--------------------+
|           Copyright|  Data|              Groups|                Hash|IsRoot|   Kind|             License|           Name|                Purl|                UUID|Version|                 _id|
+--------------------+------+--------------------+--------------------+------+-------+--------------------+---------------+--------------------+--------------------+-------+--------------------+
|                 NaN| 1.0.0|["PySparkExample....|                 NaN|  true|    NaN|                 NaN| PySparkExample|pkg:covenant/dotn...|fd90c2b1-4450-4c7...|  1.0.0|{66d02b587fce1965...|
|Copyright © James...|13.0.2|                 NaN|{"Algorithm": "SH...|   NaN|Library|{"Id": "MIT", "Na...|Newtonsoft.Json|pkg:nuget/Newtons...|e2701915-6fcc-4a4...| 13.0.2|{66d02b587fce1965...|
+--------------------+---

                                                                                

24/08/29 14:06:06 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
df = spark.read.load(
    'example.json', 
    format='json',
    multiLine=True, 
    schema=None)

df.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+----------+-----------------------------------------------+-------+
|Components                                                                                                                                                                                                                                                                                                                                                                                                

In [5]:
# sucessfully written the data to mongo
df.write.format('mongo').mode("append").save()

In [55]:
df_exploded = df.withColumn('Components',explode(col('Components')))

df_component = df_exploded.select(
col("Components.Name"),
col("Components.Data"),
col("Components.UUID"),
col("Components.Purl")
).show()

+---------------+------+--------------------+--------------------+
|           Name|  Data|                UUID|                Purl|
+---------------+------+--------------------+--------------------+
| PySparkExample| 1.0.0|fd90c2b1-4450-4c7...|pkg:covenant/dotn...|
|Newtonsoft.Json|13.0.2|e2701915-6fcc-4a4...|pkg:nuget/Newtons...|
+---------------+------+--------------------+--------------------+



In [None]:
# df = spark.read.format("json").load("example.json")
# df.show()

#not working will work on latter




In [19]:
spark.read.format('json').load('example.json').show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

## writing json file

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# create a SparkSession
spark = SparkSession.builder.appName("ReadJSONWithInferredSchema").getOrCreate()

# define the schema for the JSON data
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

# read the JSON file with the specified schema
df = spark.read.schema(schema).json("data.json")

# show the DataFrame
df.show()

24/08/16 23:15:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----+---+-----------+
|name|age|       city|
+----+---+-----------+
|John| 30|   New York|
|Jane| 25|Los Angeles|
| Bob| 40|    Chicago|
+----+---+-----------+



In [44]:
from pyspark.sql import SparkSession

# create a SparkSession
spark = SparkSession.builder.appName("WriteJSONFileWithParameters").getOrCreate()

# create a sample DataFrame
data = [("John", 30, "New York"), ("Jane", 25, "Los Angeles"), ("Bob", 40, "Chicago")]
df = spark.createDataFrame(data, ["name", "age", "city"])

# define the output JSON file path
json_file_path = "file.json"

# write the DataFrame as a JSON file with parameters
df.write \
    .option("dateFormat", "MM/dd/yyyy") \
    .option("lineSep", "\r\n") \
    .json(json_file_path)

# show the first 10 rows of the DataFrame
df.show(10)

IndentationError: unexpected indent (1908115939.py, line 16)

In [15]:
from pyspark.sql import SparkSession

# create a SparkSession
spark = SparkSession.builder.appName("WriteJSONFileWithParameters").getOrCreate()

# create a sample DataFrame
data = [("John", 30, "New York"), ("Jane", 25, "Los Angeles"), ("Bob", 40, "Chicago")]
df = spark.createDataFrame(data, ["name", "age", "city"])

# define the output JSON file path
json_file_path = "path/to/output/file.json"

# write the DataFrame as a JSON file with parameters
df.write \
    .option("compression", "gzip") \
    .option("dateFormat", "MM/dd/yyyy") \
    .option("lineSep", "\r\n") \
    .json(json_file_path)

# show the first 10 rows of the DataFrame
df.show(10)

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/media/manu/sec_storage/project_2024_2u/projects24/pyspark_prac/Notebook/path/to/output/file.json already exists. Set mode as "overwrite" to overwrite the existing path.