# The Apache Spark Scala API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Scala API.

It works on this [Docker Cluster](https://github.com/datainsightat/bigdata_development_environment.git)

## 2. The Spark Cluster

### 2.1. Get Spark

Let's start by importing Apache Spark from Maven repository (mind the Spark **version**).

In [7]:
import $ivy.`org.apache.spark::spark-sql:3.2.0`;

Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.12/3.2.0/spark-sql_2.12-3.2.0.pom
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.12/3.2.0/spark-sql_2.12-3.2.0.pom
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.12/3.2.0/spark-parent_2.12-3.2.0.pom
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.12/3.2.0/spark-parent_2.12-3.2.0.pom
Downloading https://repo1.maven.org/maven2/org/apache/orc/orc-core/1.6.11/orc-core-1.6.11.pom
Downloading https://repo1.maven.org/maven2/org/apache/orc/orc-mapreduce/1.6.11/orc-mapreduce-1.6.11.pom
Downloading https://repo1.maven.org/maven2/org/apache/hive/hive-storage-api/2.7.2/hive-storage-api-2.7.2.pom
Downloading https://repo1.maven.org/maven2/org/apache/xbean/xbean-asm9-shaded/4.20/xbean-asm9-shaded-4.20.pom
Downloading https://repo1.maven.org/maven2/org/rocksdb/rocksdbjni/6.20.3/rocksdbjni-6.20.3.pom
Downloading https://repo1.maven.org/maven2/org/apache/

[32mimport [39m[36m$ivy.$                                  ;[39m

We will be disabling Spark internal logs to let us focus on its API.

In [8]:
import org.apache.log4j.{Level, Logger};
Logger.getLogger("org").setLevel(Level.OFF);

[32mimport [39m[36morg.apache.log4j.{Level, Logger};
[39m

### 2.2. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [11]:
import org.apache.spark.sql._

val spark = SparkSession.
            builder().
            appName("scala-spark-notebook").
            master("spark://spark:7077").
            config("spark.executor.memory", "512m").
            config("spark.cores.max", "1").
            //enableHiveSupport().
            getOrCreate()

[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@2b8338f8

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SparkSession.html).

## 3. The Data

### 3.1. Introduction

We will be using Spark Scala API to read, process and write data. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html).

### 3.2. Read

Let's read some UK's macroeconomic data ([source](https://www.kaggle.com/bank-of-england/a-millennium-of-macroeconomic-data)) from the cluster's simulated **Hadoop distributed file system (HDFS)** into a Spark dataframe.

In [None]:
//val data = spark.read.format("csv").option("sep", ",").option("header", "true").load("data/uk-macroeconomic-data.csv")
val data = spark.read.format("csv").option("sep", ",").option("header", "true").load("hdfs://hive:54310/examples/bank_prospects.csv")

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [None]:
data.count

In [7]:
data.columns.size

[36mres6[39m: [32mInt[39m = [32m5[39m

In [10]:
data.printSchema

root
 |-- _c0: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- id: string (nullable = true)



### 3.3. Process

In this example, we will get UK's population and unemployment rate thoughtout the years. Let's start by selecting the relevant columns.

In [8]:
var unemployment = data.select("Description", "Population (GB+NI)", "Unemployment rate")

In [9]:
unemployment.show(10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



We successfully selected the desired columns but two problems were found:
+ The first line contains no data but the unit of measurement of each column;
+ There are many years with missing population and unemployment data.

Let's then remove the first line.

In [10]:
val cols_description = unemployment.filter(unemployment("Description") === "Units")

[36mcols_description[39m: [32mDataset[39m[[32mRow[39m] = [Description: string, Population (GB+NI): string ... 1 more field]

In [11]:
cols_description.show()

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
+-----------+------------------+-----------------+



In [12]:
unemployment = unemployment.join(cols_description, unemployment("Description") === cols_description("Description"), "left_anti")

In [13]:
unemployment.show(10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
|       1218|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



Nice! Now, let's drop the dataframe rows with missing data and refactor its columns names.

In [14]:
unemployment = unemployment.na.drop()

In [15]:
unemployment = unemployment.
                withColumnRenamed("Description", "year").
                withColumnRenamed("Population (GB+NI)", "population").
                withColumnRenamed("Unemployment rate", "unemployment_rate")

In [16]:
unemployment.show(10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



### 3.4. Write

Lastly, we persist the unemployment data into the cluster's simulated **HDFS**.

In [17]:
unemployment.repartition(1).write.format("csv").mode("overwrite").option("sep", ",").option("header", "true").save("data/uk-macroeconomic-unemployment-data.csv")