# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.
+ **spark.jars.packages:** define the delta version package that will be used by the notebook.
+ **spark.sql.extension:** enable the delta extension on spark.
+ **spark.sql.catalog.spark_catalog:** define the spark catalog as the delta lake catalog

Here we use InteractiveShell to render table with jupyterlab_sql_editor

In [1]:
import pyspark
from delta import *
from pyspark.sql import SparkSession

builder = pyspark.sql.SparkSession.builder.appName("pyspark-notebook")\
    .master("spark://spark-master:7077")\
    .config("spark.executor.memory", "512m")\
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0")\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    
spark = configure_spark_with_delta_pip(builder).getOrCreate()

IndentationError: unexpected indent (1122296765.py, line 6)

In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'
%load_ext jupyterlab_sql_editor.ipython_magic.sparksql
%config SparkSql.cacheTTL=3600

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

## 3. The Data

### 3.1. Introduction

We will be using Spark Python API to read, process and write data. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/index.html).

### 3.2. Read

Let's read some UK's macroeconomic data ([source](https://www.kaggle.com/bank-of-england/a-millennium-of-macroeconomic-data)) from the cluster's simulated **Hadoop distributed file system (HDFS)** into a Spark dataframe.

In [None]:
data = spark.read.csv(path="data/uk-macroeconomic-data.csv", sep=",", header=True)

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [None]:
data.count()

In [None]:
len(data.columns)

In [None]:
data.printSchema()

### 3.3. Process

In this example, we will get UK's population and unemployment rate thoughtout the years. Let's start by selecting the relevant columns.

In [None]:
unemployment = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [None]:
unemployment.show(n=10)

We successfully selected the desired columns but two problems were found:
+ The first line contains no data but the unit of measurement of each column;
+ There are many years with missing population and unemployment data.

Let's then remove the first line.

In [None]:
cols_description = unemployment.filter(unemployment['Description'] == 'Units')

In [None]:
cols_description.show()

In [None]:
unemployment = unemployment.join(other=cols_description, on=['Description'], how='left_anti')

In [None]:
unemployment.show(n=10)

Nice! Now, let's drop the dataframe rows with missing data and refactor its columns names.

In [None]:
unemployment = unemployment.dropna()

In [None]:
unemployment = unemployment.\
               withColumnRenamed("Description", 'year').\
               withColumnRenamed("Population (GB+NI)", "population").\
               withColumnRenamed("Unemployment rate", "unemployment_rate")

In [None]:
unemployment.show(n=10)

### 3.4. Write

Lastly, we persist the unemployment data into the cluster's simulated **HDFS**.

In [None]:
unemployment.repartition(1).write.csv(path="data/uk-macroeconomic-unemployment-data.csv", sep=",", header=True, mode="overwrite")


In [None]:
temp_table_name = "temp_test"
unemployment.createOrReplaceTempView(temp_table_name)
permanent_table_name = "test"
load_path = 'spark-warehouse/test'
if not DeltaTable.isDeltaTable(spark, load_path):
    unemployment.write.format("delta").saveAsTable(permanent_table_name)

In [None]:
read_format = 'delta'

table = spark.read.format(read_format).load(load_path)
table.show(20)

In [None]:
unemployment.createOrReplaceTempView(permanent_table_name)
spark.sql("select * from test").show(10)

In [None]:
%%sparksql --output html --limit 3
SELECT 
    t1.year
FROM
    test AS t1

In [None]:
%%sparksql --output html --limit 20
SELECT 
    *
FROM
    test AS t1