# Data discovery: Spark SQL

In [1]:
import logging
from pyspark.sql import SparkSession

## Create Spark Session. Remember this is our entry point for Dataframes or Datasets, not RDDs

In [2]:
#  This fn creates a Spark RDD, loads it into a Spark DataFrame, and returns the DataFrame
def rdd_to_dataframe(data, schema):        
    # Create a SparkSession
    spark = SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

    try:
        # Create an RDD from the input data, using Spark Context not Session!
        rdd = spark.sparkContext.parallelize(data)

        # Convert RDD to DataFrame
        df = spark.createDataFrame(rdd, schema)

        # Return the DataFrame, without stopping the SparkSession
        return df

    except Exception as e:
        # Log error and Stop the SparkSession
        logging.error('Error while transforming RDD to DF: {}'.format(e))
        spark.stop()


In [3]:
# Data sample
dept_data = [(1,"Big Data"), (2, "Finance"), (3,"Marketing")]
dept_schema = ["department_id", "department_name"]

In [4]:
# Data sample
emp_data = [(1,"Carlos", 17), (1,"Bob", 30), (2,"Jasmin", 26),(3,"Nishi", 36)]
emp_schema = ["department_id","employee_name", "age"]

---

## Let's now use the Spark RDD as a Spark Dataframe

In [5]:
# Call function, to transform RDD into DF
df_emp = rdd_to_dataframe(emp_data, emp_schema)
df_dept = rdd_to_dataframe(dept_data, dept_schema)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/20 13:10:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

In [6]:
# Show schema
df_dept.show()

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|       Big Data|
|            2|        Finance|
|            3|      Marketing|
+-------------+---------------+



## Use Spark SQL, to join 2 datasets

In [7]:
# Do we have a session running?
spark = SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

In [8]:
# Register as view
df_emp.createOrReplaceTempView('employees')
df_dept.createOrReplaceTempView('departments')

In [9]:
# Query sample, using Spark SQL
spark.sql('''
            select emp.*, dept.*
            from employees as emp
                inner join departments as dept on (emp.department_id = dept.department_id) 
            ''').show()

[Stage 8:>                                                          (0 + 8) / 8]

+-------------+-------------+---+-------------+---------------+
|department_id|employee_name|age|department_id|department_name|
+-------------+-------------+---+-------------+---------------+
|            1|       Carlos| 17|            1|       Big Data|
|            1|          Bob| 30|            1|       Big Data|
|            2|       Jasmin| 26|            2|        Finance|
|            3|        Nishi| 36|            3|      Marketing|
+-------------+-------------+---+-------------+---------------+



                                                                                

In [10]:
# Let's now save the JOINED RESULTSET into a new Temporary View -- NO WHERE CLAUSE
spark.sql('''
        select emp.employee_name, emp.age, emp.department_id, dept.department_name
        from employees as emp
            inner join departments as dept on (emp.department_id = dept.department_id)
        ''').createOrReplaceTempView('dept_employees')

---

## Join a third dataset, but with a different format

In [11]:
# Let's load the data first
df_budgets = spark.read.option("multiline","true").json('datasets/json/department_budgets.json')

In [12]:
# Show schema
df_budgets.printSchema()

root
 |-- budget: long (nullable = true)
 |-- budget_authorizer: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cto: struct (nullable = true)
 |    |    |    |-- last_name: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- office: string (nullable = true)
 |-- budget_period: string (nullable = true)
 |-- department_id: long (nullable = true)
 |-- offices: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cost_center: struct (nullable = true)
 |    |    |    |-- budget_status: string (nullable = true)
 |    |    |    |-- office: string (nullable = true)



In [13]:
# We can still query the data, using Json paths: 
df_budgets.select('offices').where('department_id == 1').show(truncate=False)

+-------------------------------------------------------------------------+
|offices                                                                  |
+-------------------------------------------------------------------------+
|[{{denied, new york}}, {{approved, mumbai}}, {{approved, san francisco}}]|
+-------------------------------------------------------------------------+



## Querying without flattening JSON

In [14]:
# Register as Temporary View
df_budgets.createOrReplaceTempView('budgets_json')

In [15]:
# Let's join the third dataset. 
spark.sql('''
          select  emp.employee_name, 
                  emp.department_id, 
                  bud.budget, 
                  bud.budget_period, 
                  bud.offices[0].cost_center.office as office_1,
                  bud.offices[0].cost_center.budget_status as budget_status_1,
                  bud.offices[1].cost_center.office as office_2,
                  bud.offices[1].cost_center.budget_status as budget_status_2,
                  bud.offices[2].cost_center.office as office_3,
                  bud.offices[2].cost_center.budget_status as budget_status_3,
                  nvl(bud.budget_authorizer[0].cto.name,"no CTO registered")  as cto_name,
                  nvl(bud.budget_authorizer[0].cto.last_name,"no CTO registered") as cto_last_name
          from dept_employees as emp
            inner join budgets_json as bud on (emp.department_id = bud.department_id)
          ''').show()

+-------------+-------------+------+-------------+--------+---------------+--------+---------------+-------------+---------------+-----------------+-----------------+
|employee_name|department_id|budget|budget_period|office_1|budget_status_1|office_2|budget_status_2|     office_3|budget_status_3|         cto_name|    cto_last_name|
+-------------+-------------+------+-------------+--------+---------------+--------+---------------+-------------+---------------+-----------------+-----------------+
|       Carlos|            1| 16000|         year|new york|         denied|  mumbai|       approved|san francisco|       approved|no CTO registered|no CTO registered|
|          Bob|            1| 16000|         year|new york|         denied|  mumbai|       approved|san francisco|       approved|no CTO registered|no CTO registered|
|       Jasmin|            2| 23000|         year|    null|           null|    null|           null|         null|           null|              joe|              doe

## Flattening JSON into Columnar format is normally easier, cleaner and more scalable. 
- Suggestion: always test and benchmark performance, to compare Json Paths access vs. flattening

In [16]:
import logging
from pyspark.sql.types import ArrayType, StructType
from pyspark.sql.functions import explode_outer, col

## There are tons of different approaches to flatten/explode JSON. This is just an example
- Credits: Function adapted from [nmukerje/Pyspark Flatten json](https://bit.ly/43ZegOL) repo in GitHub

In [17]:
# Spark function to flatten nested structs. Function adapted from GitHub: https://bit.ly/43ZegOL Spark dataframe with semi-structured types, 
# such as StructType or ArrayType return Spark dataframe
def flatten_dataframe(df):
    try:
        # compute Complex Fields (Lists and Structs) in Schema   
        complex_fields = dict([(field.name, field.dataType)
                                    for field in df.schema.fields
                                    if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
            
        while len(complex_fields)!=0:
            col_name=list(complex_fields.keys())[0]
            
            # if StructType then convert all sub element to columns.
            # i.e. flatten structs
            if (type(complex_fields[col_name]) == StructType):
                expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
                df=df.select("*", *expanded).drop(col_name)
            
            # if ArrayType then add the Array Elements as Rows using the explode function
            # i.e. explode Arrays
            elif (type(complex_fields[col_name]) == ArrayType):    
                df=df.withColumn(col_name, explode_outer(col_name))
            
            # recompute remaining Complex Fields in Schema       
            complex_fields = dict([(field.name, field.dataType)
                                    for field in df.schema.fields
                                    if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
        return df
    
    except Exception as e:
        logging.error('Error while flattening JSON data: {}'.format(e))

In [18]:
# Let's now flatten the data.
df_budgets_flat = flatten_dataframe(df_budgets) 

## However, think about Data Duplication when unnesting data

In [19]:
# Show sample
df_budgets_flat.show()

+------+-------------+-------------+-------------------------------+--------------------------+----------------------------+---------------------------------+--------------------------+
|budget|budget_period|department_id|budget_authorizer_cto_last_name|budget_authorizer_cto_name|budget_authorizer_cto_office|offices_cost_center_budget_status|offices_cost_center_office|
+------+-------------+-------------+-------------------------------+--------------------------+----------------------------+---------------------------------+--------------------------+
| 16000|         year|            1|                           null|                      null|                        null|                           denied|                  new york|
| 16000|         year|            1|                           null|                      null|                        null|                         approved|                    mumbai|
| 16000|         year|            1|                           null|  

In [20]:
# New flatten schema
df_budgets_flat.printSchema()

root
 |-- budget: long (nullable = true)
 |-- budget_period: string (nullable = true)
 |-- department_id: long (nullable = true)
 |-- budget_authorizer_cto_last_name: string (nullable = true)
 |-- budget_authorizer_cto_name: string (nullable = true)
 |-- budget_authorizer_cto_office: string (nullable = true)
 |-- offices_cost_center_budget_status: string (nullable = true)
 |-- offices_cost_center_office: string (nullable = true)



In [21]:
# Same process: register JSON DF as Temporary View
df_budgets_flat.createOrReplaceTempView('budgets_flat')

In [22]:
# Let's join the third dataset
spark.sql('''
          select *
          from dept_employees
          ''').show()

+-------------+---+-------------+---------------+
|employee_name|age|department_id|department_name|
+-------------+---+-------------+---------------+
|       Carlos| 17|            1|       Big Data|
|          Bob| 30|            1|       Big Data|
|       Jasmin| 26|            2|        Finance|
|        Nishi| 36|            3|      Marketing|
+-------------+---+-------------+---------------+



In [23]:
# Let's join the third dataset
spark.sql('''
          select emp.department_id, emp.employee_name, emp.department_name, bud.budget, bud.budget_period, bud.offices_cost_center_office, bud.budget_authorizer_cto_name
          from dept_employees as emp
            inner join budgets_flat as bud on (emp.department_id = bud.department_id)
          ''').show(n=50)

23/08/20 13:10:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
+-------------+-------------+---------------+------+-------------+--------------------------+--------------------------+
|department_id|employee_name|department_name|budget|budget_period|offices_cost_center_office|budget_authorizer_cto_name|
+-------------+-------------+---------------+------+-------------+--------------------------+--------------------------+
|            1|       Carlos|       Big Data| 16000|         year|             san francisco|                      null|
|            1|       Carlos|       Big Data| 16000|         year|                    mumbai|                      null|
|            1|       Carlos|       Big Data| 16000|         year|                  new york|                      null|
|    