# Abstracting Data with DataFrames

## Introduction:

This notebook aims to focus on furthering the understanding of PySpark fundamentals in data structures called DataFrames. Interestingly, DataFrames in PySpark takes advantage of the developments and improvements from Project Tungsten and Catalyst Optimiser.

## Project Tungsten:

The following is a direct quote from https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html. It briefly describes the project. 

Project Tungsten will be the largest change to Spark’s execution engine since the project’s inception. It focuses on substantially improving the efficiency of memory and CPU for Spark applications, to push performance closer to the limits of modern hardware. This effort includes three initiatives:

- __Memory Management and Binary Processing__: leveraging application semantics to manage memory explicitly and eliminate the overhead of JVM object model and garbage collection.
- __Cache-aware computation__: algorithms and data structures to exploit memory hierarchy.
- __Code generation__: using code generation to exploit modern compilers and CPUs.

## Catalyst Optimiser:

The following is a direct quote from https://databricks.com/glossary/catalyst-optimizer. It briefly describes the project.

Catalyst is based on functional programming constructs in Scala and designed with these key two purposes:

- Easily add new optimization techniques and features to Spark SQL.
- Enable external developers to extend the optimizer (e.g. adding data source specific rules, support for new data types, etc.).


## Breakdown of this Notebook:
- Creating DataFrames
- Accessing underlying RDDs
- Performance optimisations
- Inferring the schema using Reflections
- Specifying the schema programmatically
- Creating a temporary Table
- Using SQL to interact with DataFrames
- Overview of the DataFrame transformations and Actions.

## 1 PySpark Machine Configuration:

Here it only uses two processing cores from the CPU, and it set up by the following code.

In [1]:
%%configure
{
    "executorCores" : 4
}

In [2]:
from pyspark.sql.types import *

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 2 Setup the Correct Directory:

In [3]:
import os

# Change the Path:
path = '++++your working directory here++++/Datasets/'
os.chdir(path)
folder_pathway = os.getcwd()

# print(folder_pathway)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3 Creating DataFrames:

First is to create some sample data and column headers as follows.

In [4]:
sample_data = sc.parallelize(
    [(1, 'MacBook Pro', 2015, '15"', '16GB', '512GB SSD', 13.75, 9.48, 0.61, 4.02), 
    (2, 'MacBook', 2016, '12"', '8GB', '256GB SSD', 11.04, 7.74, 0.52, 2.03), 
    (3, 'MacBook Air', 2016, '13.3"', '8GB', '128GB SSD', 12.8, 8.94, 0.68, 2.96), 
    (4, 'iMac', 2017, '27"', '64GB', '1TB SSD', 25.6, 8.0, 20.3, 20.8)]
)

col_names = ['Id', 'Model', 'Year', 'ScreenSize', 'RAM', 'HDD', 'W', 'D', 'H', 'Weight']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Create the dataFrame from the sample data:

In [5]:
sample_df = spark.createDataFrame(sample_data, col_names)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Inspect:
sample_df.take(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(Id=1, Model='MacBook Pro', Year=2015, ScreenSize='15"', RAM='16GB', HDD='512GB SSD', W=13.75, D=9.48, H=0.61, Weight=4.02), Row(Id=2, Model='MacBook', Year=2016, ScreenSize='12"', RAM='8GB', HDD='256GB SSD', W=11.04, D=7.74, H=0.52, Weight=2.03)]

#### As it can be seen, unlike RDDs, a DataFrame is a collection of Row(.......) objects and that a Row(......) object consists of data that is named.

## 3.1 To have a look at the DataFrame Data:

This can be done by using the .show() method.

In [7]:
sample_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----------+----+----------+----+---------+-----+----+----+------+
| Id|      Model|Year|ScreenSize| RAM|      HDD|    W|   D|   H|Weight|
+---+-----------+----+----------+----+---------+-----+----+----+------+
|  1|MacBook Pro|2015|       15"|16GB|512GB SSD|13.75|9.48|0.61|  4.02|
|  2|    MacBook|2016|       12"| 8GB|256GB SSD|11.04|7.74|0.52|  2.03|
|  3|MacBook Air|2016|     13.3"| 8GB|128GB SSD| 12.8|8.94|0.68|  2.96|
|  4|       iMac|2017|       27"|64GB|  1TB SSD| 25.6| 8.0|20.3|  20.8|
+---+-----------+----+----------+----+---------+-----+----+----+------+

## 3.2 To have a look at the Schema:

As DataFrames have a schema, and that the columns of the DataFrame have matching datatypes as the original sample_data RDD.

It can be examined like so.

In [8]:
sample_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Id: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- ScreenSize: string (nullable = true)
 |-- RAM: string (nullable = true)
 |-- HDD: string (nullable = true)
 |-- W: double (nullable = true)
 |-- D: double (nullable = true)
 |-- H: double (nullable = true)
 |-- Weight: double (nullable = true)

## 3.3 Create DataFrame from a JSON file:

source: https://github.com/kavgan/spark-examples/blob/master/sample-data/description.json

In [9]:
sample_data_json_df = ( spark.read.json(folder_pathway + '/Datasets/' + 'description.json') )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
sample_data_json_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)

In [11]:
sample_data_json_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+
|                text|      title|
+--------------------+-----------+
|Data (/ˈdeɪtə/ DA...|       Data|
|Big data is a ter...|   Big Data|
|Natural language ...|        NLP|
|Text mining, also...|Text Mining|
+--------------------+-----------+

## 3.4 Create a DataFrame from CSV file:

Unlike loading it in from a JSON file, a CSV file here requires more parameters. These paramters are "header" and "inferSchema". The header parameter will try to assign the right data-type to each column, and the inderSchema parameter will assign strings as the default.

Source: https://github.com/kavgan/spark-examples/blob/master/sample-data/description.csv

In [12]:
sample_data_csv_df = ( spark.read.csv(folder_pathway + '/Datasets/' + 'description.csv', 
                                       header=True, inferSchema=True) )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
sample_data_csv_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)

In [14]:
sample_data_csv_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+
|      title|                text|
+-----------+--------------------+
|       Data|Data (/ˈdeɪtə/ DA...|
|   Big Data|Big data is a ter...|
|        NLP|                text|
|Text Mining|Text mining, also...|
+-----------+--------------------+

## 4 Accessing the underlying RDDs:

DataFrames under the hood does continue to use RDDs. This section will describe the process of interacting with the underlying RDD of a DataFrame.

## 4.1 Import the Required Libraries:


In [15]:
import pyspark.sql as sql
import pyspark.sql.functions as f

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 4.2 Extract the Size and Type of the HDD:

These will be represented in separate columns. The minimum volume needed will be calculated for each computer specifications and put into boxes. 

### To do this:
1 -> Begin by extracting the ".rdd" from the sample_df. \
2 -> Use the ".map()" transformation to ass the HDD_size column to the schema.
3 -> As working with RDD, there is a need to retain all the other columns, so these are converted into row objects with the "Row()" function and then into a dictionary by using ".asDict()" method. This is so that it can be later unpacked using the " ** ". 

    NOTE in Python: 
    - A single * preceding a list of tuples is passed as a parameter to a function, it then passes each element of the list as a separate argument to the function. 
    - Whereas the double ** will take the first element and turn it into a keyword parameter and the second element will be the value to be passed. 
    

4 -> Inside this Row(), the second argument is where the name of the column "HDD_size" is passed and set to the desired value. Here, the ".HDD" column is split and the first element is extracted as it is the HDD_size. \
5 -> The same is done for the "HDD_type" column. \
6 -> The 3rd ".map()" function adds the Volume information and follows the same method as the last two steps (Steps 4+5)\
7 -> The ".toDF()" method is used to convert the RDD back into the DataFrame. \
8 -> To ensure that the column names and the row data of the DataFrame is not empty, the ".select()" function is used to select the relevant information. Additionally, the Volume column is rounded with the ".round()" function and the Cubic Inch volume is created using the ".alias()" methid.\



In [16]:
transformed_dat_sample = (
    sample_df
    .rdd
    .map(lambda row: sql.Row(**row.asDict(), HDD_size = row.HDD.split(' ')[0])
        ).map(lambda row: sql.Row(**row.asDict(), HDD_type = row.HDD.split(' ')[1])
             ).map(lambda row: sql.Row(**row.asDict(), Volume = row.H * row.D * row.W)
                  )
    .toDF()
    .select(
        sample_df.columns + ['HDD_size', 'HDD_type', f.round(f.col('Volume')).alias('Volume_cuIn')]
    )
)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Inspect the RDD under the hood:
transformed_dat_sample.rdd.take(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(Id=1, Model='MacBook Pro', Year=2015, ScreenSize='15"', RAM='16GB', HDD='512GB SSD', W=13.75, D=9.48, H=0.61, Weight=4.02, HDD_size='512GB', HDD_type='SSD', Volume_cuIn=80.0)]

#### As it can be seen above, the "transformed_dat_sample" produces a single item list that consist of the element Row(...)

In [18]:
# Inspect the DataFrame:
sample_data.take(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[(1, 'MacBook Pro', 2015, '15"', '16GB', '512GB SSD', 13.75, 9.48, 0.61, 4.02)]

#### As it can be seen above, the "sample_data" produces a single item list, however, the item is now a tuple.

In [19]:
# Show the whole DataFrame:
transformed_dat_sample.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----------+----+----------+----+---------+-----+----+----+------+--------+--------+-----------+
| Id|      Model|Year|ScreenSize| RAM|      HDD|    W|   D|   H|Weight|HDD_size|HDD_type|Volume_cuIn|
+---+-----------+----+----------+----+---------+-----+----+----+------+--------+--------+-----------+
|  1|MacBook Pro|2015|       15"|16GB|512GB SSD|13.75|9.48|0.61|  4.02|   512GB|     SSD|       80.0|
|  2|    MacBook|2016|       12"| 8GB|256GB SSD|11.04|7.74|0.52|  2.03|   256GB|     SSD|       44.0|
|  3|MacBook Air|2016|     13.3"| 8GB|128GB SSD| 12.8|8.94|0.68|  2.96|   128GB|     SSD|       78.0|
|  4|       iMac|2017|       27"|64GB|  1TB SSD| 25.6| 8.0|20.3|  20.8|     1TB|     SSD|     4157.0|
+---+-----------+----+----------+----+---------+-----+----+----+------+--------+--------+-----------+

## 5 Performance Optimisations on User Defined Functions (UDFs):

DataFrames on PySpark did have performance improvements, however, when it came to using User Define Functions, PySpark would constantly switch runtimes between Python and JVM and resulted in a great performance hit. 

This was fixed when Spark adapted the improvements from project Arrow (https://arrow.apache.org). This was where a single memory space was created and used by all environments and relieved Spark from the constant need to copying and converting between objects. More details can be found from the above mentioned link.

## 5.1 Import the required Libraries:

NOTE: 
- pip install pyarrow
- may require a different version. Try -> pip install pyarrow==0.14.1 

In [20]:
import pyspark.sql.functions as f 
import pandas as pd
from scipy import stats

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 5.2 Demonstration:

### What is happening here?

1 -> Firstly, create a Spark dataFrame that has a range from 0 to 1 million, with the column name as "val". These values should be randomise, by using ".rand()". /
2 -> Cache the dataFrame with the ".cache()" method. \
3 -> Show the contents of the DataFrame with the ".show()" method. \
4 -> Secondly, define the "pandas_cdf(...)" method. This declares that it is using a vectorised UDF in PySpark. The first parameter is set to "double". Note that this can be either a DDL-formatted type string or a pyspark.sqp.types.DataType. The second parameter is a function type, and to set it to return a single column, the ".PandasUDFType.SCALAR" is used. If the operation is required for multiple columns, it would be set as ".PandasUDFType.GROUPED_MAP". \
5 -> Next, is the UDF of "pandas_pdf()" function where it takes in a single column and returns a pandas.Series object. The values of the normal CDF numbers. \
6 -> Finally, the dataFrame is transformed by use of the UDF with a new column name and shown with the ".show()" method.


In [21]:
# Make a BIG dataFrame:
df_big = (
    spark
    .range(0, 1000000)
    .withColumn('val', f.rand())
)

# Cache it:
df_big.cache()

# Inspect:
df_big.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------------------+
| id|                val|
+---+-------------------+
|  0|0.22006021557371036|
|  1| 0.8855370768262674|
|  2|0.03904606825839285|
+---+-------------------+
only showing top 3 rows

In [None]:
# Declaration:
@f.pandas_udf('double', f.PandasUDFType.SCALAR)

# User Define Function:
def pandas_pdf(v):
    return pd.Series(stats.norm.pdf(v))

# Apply the UDF:
(
    df_big
    .withColumn('probability', pandas_pdf(df_big.val))
    .show(5)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 5.3 Compare the performance of the approaches:

#### Test 1 - Vectorised pandas UDF

The "test_pandas_pdf()" uses the pandas_pdf() to retrieve the PDF from the normal distribution where it then performs a ".count()" operation and prints out the results by using the ".show()" method.

In [None]:
def test_pandas_pdf():
    return (
        df_big
        .withColumn('probability', pandas_pdf(df_big.val))
        .agg(f.count(f.col('probability')))
        .show()
    )

# Run and time the function:
%timeit -n 1 test_pandas_pdf()

#### Test 2 - Row-byRow version with Python to JVM conversion.

The test_pdf() method will be similar but instead, uses the "pdf()" method to perform a row-by-row compute version of the UDFs.

In [None]:
# Declaration:
@f.udf('double')

# UDF:
def pdf(v):
    return float(stats.norm.pdf(v))

#
def test_pdf():
    return(
        df_big
        .withColumn('probability', pdf(df_big.val))
        .agg(f.count(f.col('probability')))
        .show()
    )


# Run and time the function:
%timeit -n 1 test_pdf()

#### Observation:



## 6 Inferring the schema utilising Reflection:

