## Data Lab: Spark SQL
> Including a JSON flattening task

In [None]:
import logging
from pyspark.sql import SparkSession

#### Create Spark Session. Remember this is our entry point for Dataframes or Datasets, not RDDs! 

In [None]:

def rdd_to_dataframe(data, schema):
    """
    Example: This fn creates a Spark RDD, loads it into a Spark DataFrame, and returns the DataFrame 
    """
        
    # Create a SparkSession
    spark = SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

    try:
        # Create an RDD from the input data, using Spark Context not Session!
        rdd = spark.sparkContext.parallelize(data)

        # Convert RDD to DataFrame
        df = spark.createDataFrame(rdd, schema)

        # Return the DataFrame, without stopping the SparkSession
        return df

    except Exception as e:
        # Log error and Stop the SparkSession
        logging.error('Error while transforming RDD to DF: {}'.format(e))
        spark.stop()


----
Create some random data

In [None]:
# Data sample
dept_data = [(1,"Big Data"), (2, "Finance"), (3,"Marketing")]
dept_schema = ["department_id", "department_name"]

In [None]:
# Data sample
emp_data = [(1,"Carlos", 17), (1,"Bob", 30), (2,"Jasmin", 26),(3,"Nishi", 36)]
emp_schema = ["department_id","employee_name", "age"]

---

### Let's now use the Spark RDD as a Spark Dataframe

In [None]:
# Call function, to transform RDD into DF
df_emp = rdd_to_dataframe(emp_data, emp_schema)
df_dept = rdd_to_dataframe(dept_data, dept_schema)

In [None]:
# Show schema
df_dept.show()

### Use Spark SQL, to join 2 datasets

In [None]:
# Do we have a session running?
spark = SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

In [None]:
# Register as view
df_emp.createOrReplaceTempView('employees')
df_dept.createOrReplaceTempView('departments')

In [None]:
# Query sample, using Spark SQL
spark.sql('''
            select emp.*, dept.*
            from employees as emp
                inner join departments as dept on (emp.department_id = dept.department_id) 
            ''').show()

In [None]:
# Let's now save the JOINED RESULTSET into a new Temporary View -- NO WHERE CLAUSE
spark.sql('''
        select emp.employee_name, emp.age, emp.department_id, dept.department_name
        from employees as emp
            inner join departments as dept on (emp.department_id = dept.department_id)
        ''').createOrReplaceTempView('dept_employees')

---

## Let's try now something a bit more complicated 🚀

### Join a third dataset, but with a different format

In [None]:
# Let's load the data first
df_budgets = spark.read.option("multiline","true").json('datasets/json/department_budgets.json')

In [None]:
# Show schema
df_budgets.printSchema()

In [None]:
# We can still query the data, using Json paths: 
df_budgets.select('offices').where('department_id == 1').show(truncate=False)

### Querying without flattening JSON

In [None]:
# Register as Temporary View
df_budgets.createOrReplaceTempView('budgets_json')

## This could lead to Spaghetti code!! 🚨

i.e.
- What if bud.offices have a variable number of items? 

In [None]:
# Let's join the third dataset. 
spark.sql('''
          select  emp.employee_name, 
                  emp.department_id, 
                  bud.budget, 
                  bud.budget_period, 
                  bud.offices[0].cost_center.office as office_1,
                  bud.offices[0].cost_center.budget_status as budget_status_1,
                  bud.offices[1].cost_center.office as office_2,
                  bud.offices[1].cost_center.budget_status as budget_status_2,
                  bud.offices[2].cost_center.office as office_3,
                  bud.offices[2].cost_center.budget_status as budget_status_3,
                  nvl(bud.budget_authorizer[0].cto.name,"no CTO registered")  as cto_name,
                  nvl(bud.budget_authorizer[0].cto.last_name,"no CTO registered") as cto_last_name
          from dept_employees as emp
            inner join budgets_json as bud on (emp.department_id = bud.department_id)
          ''').show()

### Flattening JSON into Columnar format is normally easier, cleaner and more scalable. 
- Suggestion: always test and benchmark performance, to compare Json Paths access vs. flattening

In [None]:
# - Import required libs. These should be on top
import logging
from pyspark.sql.types import ArrayType, StructType
from pyspark.sql.functions import explode_outer, col

### There are tons of different approaches to flatten/explode JSON. This is just an example
- Credits: Function adapted from [nmukerje/Pyspark Flatten json](https://bit.ly/43ZegOL) repo in GitHub

In [None]:
def flatten_dataframe(df):
    """
    Spark function to flatten nested structs. Function adapted from GitHub: https://bit.ly/43ZegOL
    :param df: Spark dataframe with semi-structured types, such as StructType or ArrayType

    :return: Spark dataframe
    """    
    try:
        # compute Complex Fields (Lists and Structs) in Schema   
        complex_fields = dict([(field.name, field.dataType)
                                    for field in df.schema.fields
                                    if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
            
        while len(complex_fields)!=0:
            col_name=list(complex_fields.keys())[0]
            
            # if StructType then convert all sub element to columns.
            # i.e. flatten structs
            if (type(complex_fields[col_name]) == StructType):
                expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
                df=df.select("*", *expanded).drop(col_name)
            
            # if ArrayType then add the Array Elements as Rows using the explode function
            # i.e. explode Arrays
            elif (type(complex_fields[col_name]) == ArrayType):    
                df=df.withColumn(col_name, explode_outer(col_name))
            
            # recompute remaining Complex Fields in Schema       
            complex_fields = dict([(field.name, field.dataType)
                                    for field in df.schema.fields
                                    if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
        return df
    
    except Exception as e:
        logging.error('Error while flattening JSON data: {}'.format(e))

In [None]:
# Let's now flatten the data.
df_budgets_flat = flatten_dataframe(df_budgets) 

### However, think about Data Duplication when unnesting data!⚠️

In [None]:
# Show sample
df_budgets_flat.show()

In [None]:
# New flatten schema
df_budgets_flat.printSchema()

In [None]:
# Same process: register JSON DF as Temporary View
df_budgets_flat.createOrReplaceTempView('budgets_flat')

In [None]:
# Let's join the third dataset
spark.sql('''
          select *
          from dept_employees
          ''').show()

In [None]:
# Let's join the third dataset
spark.sql('''
          select emp.department_id, emp.employee_name, emp.department_name, bud.budget, bud.budget_period, bud.offices_cost_center_office, bud.budget_authorizer_cto_name
          from dept_employees as emp
            inner join budgets_flat as bud on (emp.department_id = bud.department_id)
          ''').show(n=50)

### Here you can write the output of the 3 datasets join, just as we did in the previous lab 3