# PySpark Read and Write JSON file into GCS Bucket in GCP

1. PySpark SQL provides ***read.json("path")*** to read a single line or multiline (multiple lines) JSON file into PySpark DataFrame and ***write.json("path")*** to save or write to JSON file.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, BooleanType

# Create SparkSession from builder
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local") \
                    .appName('Cloud and AI Analytics') \
                    .getOrCreate()

### Simple JSON

In [2]:
path = "gs://bqdemogcp-pde/json_input/gcsdata.json"  #gcsdata.json
bucket_name = "bqdemogcp-pde"

In [3]:
# Read JSON file into dataframe
df = spark.read.format("json") \
    .option("inferSchema", True)\
    .option("header", True)\
    .load(path)

When Used, format("json") method, you can also specify the Data sources by their fully qualified name as below.

In [4]:
df.show()

+-----------+------------+-----+-----------+-------+
|       City|RecordNumber|State|ZipCodeType|Zipcode|
+-----------+------------+-----+-----------+-------+
|PARC PARQUE|           1|   PR|   STANDARD|    704|
+-----------+------------+-----+-----------+-------+



### Reading files with a user-specified custom schema

In [5]:
path1 = "gs://bqdemogcp-pde/jsondata.json"   #jsondata.json

In [6]:
custom_schema = StructType([
    StructField("schema", StringType(), False),
    StructField("id", StringType(), True),
    StructField("app", StringType(), True),
    StructField("description", StringType(), True)])

In [7]:
df = spark.read.format("json") \
    .schema(custom_schema) \
    .load(path1)

In [8]:
df.show()

+------+---+---+-----------+
|schema| id|app|description|
+------+---+---+-----------+
|     a|  1|foo|       test|
|     b|  2|bar|      test2|
+------+---+---+-----------+



In [9]:
df.printSchema()

root
 |-- schema: string (nullable = true)
 |-- id: string (nullable = true)
 |-- app: string (nullable = true)
 |-- description: string (nullable = true)



### When you use format("json") method, you can also specify the Data sources by their fully qualified name as below.

In [10]:
# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json').option("mode", "PERMISSIVE").load(path1)
df.printSchema()
df.show()

root
 |-- app: string (nullable = true)
 |-- description: string (nullable = true)
 |-- id: string (nullable = true)
 |-- schema: string (nullable = true)

+---+-----------+---+------+
|app|description| id|schema|
+---+-----------+---+------+
|foo|       test|  1|     a|
|bar|      test2|  2|     b|
+---+-----------+---+------+



### Read JSON file from multiline

1. PySpark JSON data source provides multiple options to read files in different options, use multiline option to read JSON files scattered across multiple lines. 
2. By default multiline option, is set to false.

In [11]:
multiline_df = "gs://bqdemogcp-pde/multilinedata.json"  #multilinedata.json

In [12]:
# Read multiline json file
multiline_df = spark.read.format("json") \
    .option("inferSchema", True)\
    .option("header", True)\
    .load(multiline_df)

In [13]:
multiline_df.show()

+-------------------+------------+-----+-----------+-------+---------------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|_corrupt_record|
+-------------------+------------+-----+-----------+-------+---------------+
|               null|        null| null|       null|   null|              [|
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|           null|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|           null|
|               null|        null| null|       null|   null|              ]|
+-------------------+------------+-----+-----------+-------+---------------+



### Reading multiple files at a time

1. Using the read.json() method you can also read multiple JSON files from different gcs paths, just pass all file names with fully qualified paths by separating comma

In [14]:
# Read multiple files
df = spark.read.format("json") \
    .option("inferSchema", True)\
    .option("header", True)\
    .load(["gs://bqdemogcp-pde/json_input/gcsdata.json", "gs://bqdemogcp-pde/json_input/gcsdata1.json"])  #gcsdata.json, gcsdata1.json

In [15]:
df.printSchema()

root
 |-- City: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)



In [16]:
df.show()

+-----------+------------+-----+-----------+-------+
|       City|RecordNumber|State|ZipCodeType|Zipcode|
+-----------+------------+-----+-----------+-------+
|PARC PARQUE|           1|   PR|   STANDARD|    704|
|PARC PARQUE|           2|   IN|    PREMIUM|    705|
+-----------+------------+-----+-----------+-------+



### Reading all files in a directory

1. Read all JSON files from a directory into DataFrame just by passing gcs directory as a path to the json() method.

In [17]:
# Read all JSON files from a folder
df = spark.read.format("json") \
    .option("inferSchema", True)\
    .option("header", True)\
    .load("gs://bqdemogcp-pde/json_input/*.json")  #json_input - folder

In [18]:
df.printSchema()

root
 |-- City: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)



In [19]:
df.show()

+-----------+------------+-----+-----------+-------+
|       City|RecordNumber|State|ZipCodeType|Zipcode|
+-----------+------------+-----+-----------+-------+
|PARC PARQUE|           1|   PR|   STANDARD|    704|
|PARC PARQUE|           2|   IN|    PREMIUM|    705|
+-----------+------------+-----+-----------+-------+



### Write JSON data to GCS bucket

In [20]:
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

In [21]:
schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

In [22]:
pysparkDF = spark.createDataFrame(data=data2,schema=schema)

In [23]:
pysparkDF.printSchema()
pysparkDF.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [24]:
print("gs://{}/{}/data1.json".format(bucket_name, "json_output"))

gs://bqdemogcp-pde/json_output/data1.json


In [25]:
#write to gcs bucket
pysparkDF.repartition(1).write.mode("overwrite").format('json').save("gs://{}/{}/employee.json".format(bucket_name, "json_output"))

In [None]:
#df.write.mode('Overwrite').json("gs://{}/{}/data.json".format(bucket_name, "json_output"))