# The Apache Spark Scala API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Scala API.

## 2. The Spark Cluster

### 2.1. Get Spark

Let's start by importing Apache Spark from Maven repository (mind the Spark **version**).

In [1]:
import $ivy.`org.apache.spark::spark-sql:3.0.0`;

[32mimport [39m[36m$ivy.$                                  ;[39m

We will be disabling Spark internal logs to let us focus on its API.

In [2]:
import org.apache.log4j.{Level, Logger};
Logger.getLogger("org").setLevel(Level.OFF);

[32mimport [39m[36morg.apache.log4j.{Level, Logger};
[39m

### 2.2. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [3]:
import org.apache.spark.sql._

val spark = SparkSession.
            builder().
            appName("scala-spark-notebook").
            master("spark://spark-master:7077").
            config("spark.executor.memory", "512m").
            getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@390c4e73

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SparkSession.html).

## 3. The Data

### 3.1. Introduction

We will be using Spark Scala API to read, process and write data. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html).

### 3.2. Read

Let's read some UK's macroeconomic data ([source](https://www.kaggle.com/bank-of-england/a-millennium-of-macroeconomic-data)) from the cluster's simulated **Hadoop distributed file system (HDFS)** into a Spark dataframe.

In [None]:
val data = spark.read.format("csv").option("sep", ",").option("header", "true").load("data/uk-macroeconomic-data.csv")

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [None]:
data.count

In [None]:
data.columns.size

In [None]:
data.printSchema

### 3.3. Process

In this example, we will get UK's population and unemployment rate thoughtout the years. Let's start by selecting the relevant columns.

In [None]:
var unemployment = data.select("Description", "Population (GB+NI)", "Unemployment rate")

In [None]:
unemployment.show(10)

We successfully selected the desired columns but two problems were found:
+ The first line contains no data but the unit of measurement of each column;
+ There are many years with missing population and unemployment data.

Let's then remove the first line.

In [None]:
val cols_description = unemployment.filter(unemployment("Description") === "Units")

In [None]:
cols_description.show()

In [None]:
unemployment = unemployment.join(cols_description, unemployment("Description") === cols_description("Description"), "left_anti")

In [None]:
unemployment.show(10)

Nice! Now, let's drop the dataframe rows with missing data and refactor its columns names.

In [None]:
unemployment = unemployment.na.drop()

In [None]:
unemployment = unemployment.
                withColumnRenamed("Description", "year").
                withColumnRenamed("Population (GB+NI)", "population").
                withColumnRenamed("Unemployment rate", "unemployment_rate")

In [None]:
unemployment.show(10)

### 3.4. Write

Lastly, we persist the unemployment data into the cluster's simulated **HDFS**.

In [None]:
unemployment.repartition(1).write.format("csv").mode("overwrite").option("sep", ",").option("header", "true").save("data/uk-macroeconomic-unemployment-data.csv")