<a href="https://colab.research.google.com/github/jmbanda/BigDataProgramming_2019/blob/master/Chapter_6_Advanced_Spark_Programming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dataframe basics for PySpark

**Colab only code**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install -q pandas
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

**Setup Spark Session**

In [0]:
import findspark
import pandas as pd #This imports Pandas (we will be using this extensively later)
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

sc = spark.sparkContext

Let's start with a subset of the Titanic data on Kaggle and load it into a pandas dataframe, then convert it into a Spark dataframe.

In [0]:
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William'},
         'Sex': {0: 'male', 1: 'female', 2: 'female', 3: 'female', 4: 'male'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}}

data2 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Age': {0: 22, 1: 38, 2: 26, 3: 35, 4: 35},
         'Fare': {0: 7.3, 1: 71.3, 2: 7.9, 3: 53.1, 4: 8.0},
         'Pclass': {0: 3, 1: 1, 2: 3, 3: 1, 4: 3}}

df1_pd = pd.DataFrame(data1, columns=data1.keys())
df2_pd = pd.DataFrame(data2, columns=data2.keys())

Let's look at our Panda's dataframe: df1_pd contents

In [0]:
df1_pd

Let's look at the other Panda's dataframe 

In [0]:
df2_pd

**We now convert the Panda's dataframe to a Spark dataframe and display its contents**

In [0]:
df1 = spark.createDataFrame(df1_pd)
df2 = spark.createDataFrame(df2_pd)
df1.show()

Let's see the schema

In [0]:
df1.printSchema()

# Basic dataframe transformations

![alt text](https://changhsinlee.com/figure/source/2018-03-04-pyspark-dataframe-basics/dataframe-verbs.png)

**Select**

Takes either a list of column names or an unpacked list.

In [0]:
cols1 = ['PassengerId', 'Name']
df1.select(cols1).show()

**Filter**

Filter with column expression

In [0]:
df1.filter(df1.Sex == 'female').show()

Filter with SQL expression. Note the double and single quotes

In [0]:
df1.filter("Sex='male'").show()

**Mutate, or creating new columns**

Creating new columns in Spark uses `.withColumn()`

In [0]:
df2.withColumn('AgeTimesFare', df2.Age*df2.Fare).show()

**Summarize and group by**

To summarize or aggregate a dataframe, first you need to convert the dataframe to a *GroupedData* object with `groupby()`, then call aggregate transformations.

In [0]:
gdf2 = df2.groupby('Pclass')
gdf2

Note: We take the average of more than one column by passing an unpacked list of column names

In [0]:
avg_cols = ['Age', 'Fare']
gdf2.avg(*avg_cols).show() #Note the *, this is unpacking the list!

Note: To call multiple aggregation functions at once, pass a dictionary

In [0]:
gdf2.agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'}).show()

These column names look ugly (at least to me) let's make them read nicely using `toDF()`

In [0]:
(
  gdf2.agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})
    .toDF('Pclass', 'counts', 'average_age', 'total_fare')
    .show()
)

**Arrange (sort)**

Use the `.sort()` method to sort the dataframes.

In [0]:
df2.sort('Fare', ascending=False).show()

**Joins and unions**

There are two ways to combine dataframes --- joins and unions. The idea here is the same as joining and unioning tables in SQL.

**Joins**

Let's Join the two titanic dataframes by the column *PassengerId*

In [0]:
df1.join(df2, ['PassengerId']).show()

**Nonequi joins**

Here is an example of nonequi join. They can be very slow due to skewed data, but this is one operation that Spark can do while Hive can not.

In [0]:
df1.join(df2, df1.PassengerId <= df2.PassengerId).show()

**Unions**

`Union()` returns a dataframe from the union of two dataframes

In [0]:
df1.union(df1).show()

# Going deeper

**Explain(), transformations, and actions**

When you create a dataframe in PySpark, unlike Python objects, dataframes are lazy evaluated. What it means is that most operations are *transformations* that modify the execution plan on how Spark should handle the data, but the plan is not executed until we call an *action*.

For example, if I want to `join` df1 and df2 on the key *PassengerId* as before:

In [0]:
df1.explain()

In [0]:
df2.explain()

In [0]:
dfj1 = df1.join(df2, ['PassengerId'])
dfj1.explain()

In this case, `join()` is a transformation that laid out a plan for Spark to join the two dataframes, but it wasn't executed unless you call an action, such as `.count()`, that has to go through the actual data defined by df1 and df2 in order to return a Python object (integer).

In [0]:
dfj1.count()

# Data persistence: cache() and checkpoint()

**caching**

Proper caching is the key to high performance Spark. 


A rule of thumb is that: **Cache a dataframe when it is used multiple times in the script.**

Keep in mind that it is only cached after the *first action*. If for whatever reason you want to make sure the data is cached before you save the dataframe, then you have to call an action like `.count()`.

In [0]:
df1.cache()

This also works as .cache() is an inplace method.

In [0]:
df1 = df1.cache()

To check if a dataframe is cached, check the storageLevel property.

In [0]:
df1.storageLevel

To **un-cache** a dataframe, use `unpersist()`

In [0]:
df1.unpersist()
df1.storageLevel

**Checkpointing**

`checkpoint()` truncates the execution plan and saves the checkpointed dataframe to a temporary location on the disk.

It is recommended to do caching before checkpointing so Spark doesn't have to read in the dataframe from disk after it's checkpointed.

To use `checkpoint()`, you need to specify the temporary file location to save the datafame to by accessing the sparkContext object from SparkSession.

In [0]:
sc = spark.sparkContext
sc.setCheckpointDir("/checkpointdir") # save to ./checkpointdir

For example, let's join df1 to itself 3 times:

In [0]:
df = df1.join(df1, ['PassengerId'])
df.join(df1, ['PassengerId']).explain()

Let's `checkpoint()` after the first `join` to truncate the plan.

In [0]:
df = df1.join(df1, ['PassengerId']).checkpoint()
df.join(df1, ['PassengerId']).explain()

**Partitions and repartition()**

A common cause of performance problems is having too many partitions. 

To check the number of partitions, use `.rdd.getNumPartitions()`

In [0]:
df1.rdd.getNumPartitions()

This dataframe, despite having only 5 rows, has 2 partitions.

This is too many, let's repartition to only 1 partition.

In [0]:
df1_repartitioned = df1.repartition(1)
df1_repartitioned.rdd.getNumPartitions()

Sources:
https://changhsinlee.com/pyspark-dataframe-basics/

More about unpacking: 
https://thispointer.com/python-how-to-unpack-list-tuple-or-dictionary-to-function-arguments-using/