# Guidance Practice SDPD2 - 2021

## 1. Introduction

In this notebook we present practical advice on reading nested JSON documents in structured datasets with Spark. In particular, we discuss how to preform the initial load and data conversion of mixed and complex objects included in a dataset.

In this document we will use the [TMDB 5000 movie dataset](https://www.kaggle.com/tmdb/tmdb-movie-metadata) to illustrate data reading and processing examples.

## 2. Initial setup

### 2.1 Optional: configure JVM version

**IMPORTANT**: Execute the following cell only if you need to point to a specific JVM, different from the default one installed in your system.

In [None]:
# Load external packages programatically
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["JAVA_HOME"]

# os.environ["PYSPARK_SUBMIT_ARGS"] = (
#     "--packages {0} pyspark-shell".format(packages)
# )

### 2.2 Set up SparkSession

We start with the setup of our SparkSession.

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.driver.cores", 1)
    .appName("Analysis TMDB-5000")
    .getOrCreate() )
sc = spark.sparkContext

spark

## 3. Data cleaning and preparation

### 3.1 Data loading

**Snoop raw files content**

In [2]:
%system head -n 2 data/TMDB5000/tmdb_5000_credits.csv

['movie_id,title,cast,crew',
 '19995,Avatar,"[{""cast_id"": 242, ""character"": ""Jake Sully"", ""credit_id"": ""5602a8a7c3a3685532001c9a"", ""gender"": 2, ""id"": 65731, ""name"": ""Sam Worthington"", ""order"": 0}, {""cast_id"": 3, ""character"": ""Neytiri"", ""credit_id"": ""52fe48009251416c750ac9cb"", ""gender"": 1, ""id"": 8691, ""name"": ""Zoe Saldana"", ""order"": 1}, {""cast_id"": 25, ""character"": ""Dr. Grace Augustine"", ""credit_id"": ""52fe48009251416c750aca39"", ""gender"": 1, ""id"": 10205, ""name"": ""Sigourney Weaver"", ""order"": 2}, {""cast_id"": 4, ""character"": ""Col. Quaritch"", ""credit_id"": ""52fe48009251416c750ac9cf"", ""gender"": 2, ""id"": 32747, ""name"": ""Stephen Lang"", ""order"": 3}, {""cast_id"": 5, ""character"": ""Trudy Chacon"", ""credit_id"": ""52fe48009251416c750ac9d3"", ""gender"": 1, ""id"": 17647, ""name"": ""Michelle Rodriguez"", ""order"": 4}, {""cast_id"": 8, ""character"": ""Selfridge"", ""credit_id"": ""52fe48009251416c750ac9e1"", ""gende

In [3]:
%system head -n 2 data/TMDB5000/tmdb_5000_movies.csv

['budget,genres,homepage,id,keywords,original_language,original_title,overview,popularity,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,vote_average,vote_count',
 '237000000,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""name"": ""Adventure""}, {""id"": 14, ""name"": ""Fantasy""}, {""id"": 878, ""name"": ""Science Fiction""}]",http://www.avatarmovie.com/,19995,"[{""id"": 1463, ""name"": ""culture clash""}, {""id"": 2964, ""name"": ""future""}, {""id"": 3386, ""name"": ""space war""}, {""id"": 3388, ""name"": ""space colony""}, {""id"": 3679, ""name"": ""society""}, {""id"": 3801, ""name"": ""space travel""}, {""id"": 9685, ""name"": ""futuristic""}, {""id"": 9840, ""name"": ""romance""}, {""id"": 9882, ""name"": ""space""}, {""id"": 9951, ""name"": ""alien""}, {""id"": 10148, ""name"": ""tribe""}, {""id"": 10158, ""name"": ""alien planet""}, {""id"": 10987, ""name"": ""cgi""}, {""id"": 11399, ""name"": ""marine""}, {"

### 3.2 Reading JSON data in credits file: schema

In [2]:
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import * # from_json, col, collect_list, getItem

from pprint import pprint
# Uncomment the following line to experiment with rapidjson
# You can also try ultrajson, orjson and other alternatives
# import rapidjson
import json

In [3]:
# Define data scheme for credits file
creditsSchema = StructType([StructField("movie_id", LongType(), True), 
                           StructField("title", StringType(), True),
                           StructField("cast", StringType(), True),
                           StructField("crew", StringType(), True)])

**Escaping double quotes**: Escaping doble quotes with 'double-double quotes' like in `'""cast_id""'` is the standard valid way to escape this character in a CSV file, according to [RFC 4180]() (section 7, page 2):

>If double-quotes are used to enclose fields, then a double-quote
appearing inside a field must be escaped by preceding it with
another double quote.

**For Spark 2.x users**: Unfortunately, **Spark version 2.x is not compliant with this standard** (whereas Pandas, for instance, does support this standard by default, when reading from a CSV file). To solve this, we must tune the options in the reader function to let it know about the standard convention.

Apparently, Spark 3.0 will eventually follow the standard convention: https://issues.apache.org/jira/browse/SPARK-22236.

Credits: https://stackoverflow.com/questions/40413526/reading-csv-files-with-quoted-fields-containing-embedded-commas.

### 3.3 Reading JSON data in credits file: reading data

As of Spark 3.0.1, we must continue to escape double quotes to read CSV files following RFC 4180.

In [4]:
# Tuning options to be compliant with RFC 4180
# defining how to escape double quotes
credits = (spark.read.option("header", "true")
#           .option("dateFormat", "yyyy-MM-dd") // Default
           .option("quote", "\"")  # Tuning escape double quotes
           .option("escape", "\"") # Tuning escape double quotes
           .csv("tmdb_5000_credits.csv", schema=creditsSchema)
          )

In [5]:
credits.show(2)

+--------+--------------------+--------------------+--------------------+
|movie_id|               title|                cast|                crew|
+--------+--------------------+--------------------+--------------------+
|   19995|              Avatar|[{"cast_id": 242,...|[{"credit_id": "5...|
|     285|Pirates of the Ca...|[{"cast_id": 4, "...|[{"credit_id": "5...|
+--------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [10]:
credits.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)



### 3.4 Parsing JSON fields

Now, we need to read the content of the JSON columns using the corresponding schema. We can consult several information sources with useful notes about this process:

* https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
* https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html
* https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.from_json

Basically, the Spark DataFrame method `from_json()` will read the String content of each cell, interpreting the JSON info according to the reading schema that we pass as the second argument to this method.

In [12]:
# Setup schema for each JSON column
cast_json_schema = ArrayType(StructType([StructField("cast_id", IntegerType()),
                                         StructField("character", StringType()),
                                         StructField("credit_id", StringType()),
                                         StructField("gender", StringType()),
                                         StructField("id", IntegerType()),
                                         StructField("name", StringType()),
                                         StructField("order", IntegerType())]))

crew_json_schema = ArrayType(StructType([StructField("credit_id", StringType()),
                                         StructField("department", StringType()),
                                         StructField("gender", StringType()),
                                         StructField("id", IntegerType()),
                                         StructField("job", StringType()),
                                         StructField("name", StringType())]))

In [13]:
# Use function from_json and the schemas above to parse the content of JSON columns,
# updating the general schema of the DataFrame and preserving the other columns
credits = (credits.withColumn('cast', from_json(col('cast'), cast_json_schema))
           .withColumn('crew', from_json(col('crew'), crew_json_schema)) )

In [14]:
credits.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cast_id: integer (nullable = true)
 |    |    |-- character: string (nullable = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- gender: string (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- order: integer (nullable = true)
 |-- crew: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- department: string (nullable = true)
 |    |    |-- gender: string (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- job: string (nullable = true)
 |    |    |-- name: string (nullable = true)



**Important note**: We should notice that, given the input data structure, for each JSON column we have a list of key-value elements. Therefore, according to the [examples in the Databricks blog](https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html) (cf. section "Selecting a single array or map element"), we must use either the method `getItem()` or brackets `[]` indexing a particular element in the list of JSON items.

In [15]:
credits.select('cast').show(5)

+--------------------+
|                cast|
+--------------------+
|[[242, Jake Sully...|
|[[4, Captain Jack...|
|[[1, James Bond, ...|
|[[2, Bruce Wayne ...|
|[[5, John Carter,...|
+--------------------+
only showing top 5 rows



In [16]:
# In all cases, we take the field from the first item in the list of cast info
# using positional argument getItem(0)
credits.select(col('title'),
               col('cast').getItem(0).getItem('cast_id').alias('cast_id'),
               col('cast').getItem(0).getItem('character').alias('character'),
               col('cast').getItem(0).getItem('name').alias('name')).show(5)

+--------------------+-------+--------------------+---------------+
|               title|cast_id|           character|           name|
+--------------------+-------+--------------------+---------------+
|              Avatar|    242|          Jake Sully|Sam Worthington|
|Pirates of the Ca...|      4|Captain Jack Sparrow|    Johnny Depp|
|             Spectre|      1|          James Bond|   Daniel Craig|
|The Dark Knight R...|      2|Bruce Wayne / Batman| Christian Bale|
|         John Carter|      5|         John Carter|  Taylor Kitsch|
+--------------------+-------+--------------------+---------------+
only showing top 5 rows



Additionally, if we want to create a temporary table, generating one new row for each element in an array of items, we can use the convenient DataFrame method `explode()`, as explained in [examples in the Databricks blog](https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html), section "Creating a row for each array or map element".

Let's see this in two steps:

1. How `explode()` works.
2. How to select items from the list in each row after applying `explode()`.

In [17]:
credits.select(col('title'),
               explode(col('cast')).alias('cast')).take(10)

[Row(title='Avatar', cast=Row(cast_id=242, character='Jake Sully', credit_id='5602a8a7c3a3685532001c9a', gender='2', id=65731, name='Sam Worthington', order=0)),
 Row(title='Avatar', cast=Row(cast_id=3, character='Neytiri', credit_id='52fe48009251416c750ac9cb', gender='1', id=8691, name='Zoe Saldana', order=1)),
 Row(title='Avatar', cast=Row(cast_id=25, character='Dr. Grace Augustine', credit_id='52fe48009251416c750aca39', gender='1', id=10205, name='Sigourney Weaver', order=2)),
 Row(title='Avatar', cast=Row(cast_id=4, character='Col. Quaritch', credit_id='52fe48009251416c750ac9cf', gender='2', id=32747, name='Stephen Lang', order=3)),
 Row(title='Avatar', cast=Row(cast_id=5, character='Trudy Chacon', credit_id='52fe48009251416c750ac9d3', gender='1', id=17647, name='Michelle Rodriguez', order=4)),
 Row(title='Avatar', cast=Row(cast_id=8, character='Selfridge', credit_id='52fe48009251416c750ac9e1', gender='2', id=1771, name='Giovanni Ribisi', order=5)),
 Row(title='Avatar', cast=Row(ca

In [18]:
# Using show, it is evident the result of applying explode()
# However, in this view the cast items are (incorrectly) shown
# as a list of values. In the example above, we saw that, in fact,
# each field in the cast item preserves its key (equivalent to a dict)
credits.select(col('title'),
               explode(col('cast')).alias('cast')).show(10)

+------+--------------------+
| title|                cast|
+------+--------------------+
|Avatar|[242, Jake Sully,...|
|Avatar|[3, Neytiri, 52fe...|
|Avatar|[25, Dr. Grace Au...|
|Avatar|[4, Col. Quaritch...|
|Avatar|[5, Trudy Chacon,...|
|Avatar|[8, Selfridge, 52...|
|Avatar|[7, Norm Spellman...|
|Avatar|[9, Moat, 52fe480...|
|Avatar|[11, Eytukan, 52f...|
|Avatar|[10, Tsu'Tey, 52f...|
+------+--------------------+
only showing top 10 rows



In [19]:
# Therefore, we can use getItem() on the table returned
# by explode(). Note that the second select is applied to
# the DataFrame returned by the first select
(credits.select(col('title'),
               explode(col('cast')).alias('cast'))
 .select(col('title'), col('cast').getItem('name').alias('name'))
 .show(10)
)

+------+------------------+
| title|              name|
+------+------------------+
|Avatar|   Sam Worthington|
|Avatar|       Zoe Saldana|
|Avatar|  Sigourney Weaver|
|Avatar|      Stephen Lang|
|Avatar|Michelle Rodriguez|
|Avatar|   Giovanni Ribisi|
|Avatar|  Joel David Moore|
|Avatar|       CCH Pounder|
|Avatar|         Wes Studi|
|Avatar|        Laz Alonso|
+------+------------------+
only showing top 10 rows



## Stop SparkSession

In [20]:
spark.stop()