# Introduction to Spark

## Import Libraries

In [1]:
from os.path import abspath
from pyspark.sql import SparkSession, HiveContext
import pyspark.sql.functions as F
import pandas as pd

## Spark Session

**Spark session** is a unified entry point for all spark applications starting from Spark 2.0. Instead of having a Spark context, Hive context, SQL context, now all of it is encapsulated in a Spark session.

**Resources:**
 * [A tale of Spark Session and Spark Context](https://medium.com/@achilleus/spark-session-10d0d66d1d24)

Create Spark session with `SparkSession.builder`:
 * `config("spark.sql.warehouse.dir", warehouse_location)` - `warehouse_location` points to the default location for managed databases and tables
 * `config('spark.driver.extraJavaOptions','-Dderby.system.home=../data/tmp')` points where `metastore_db` and `derby.log` are created

In [2]:
warehouse_location = abspath('../data/spark-warehouse')

In [None]:
spark = SparkSession \
         .builder \
         .config("spark.sql.warehouse.dir", warehouse_location) \
         .config('spark.driver.extraJavaOptions','-Dderby.system.home=../data/tmp') \
         .enableHiveSupport() \
         .getOrCreate()

### Multiple Spark Sessions

Creating multiple Spark sessions can cause issues, so it's best practice to use the `getOrCreate()` method. It returns an existing Spark session if there's already one in the environment, or creates a new one if necessary. Let's test this and create another Spark session:

In [None]:
spark_2 = (SparkSession.builder.enableHiveSupport().getOrCreate())

And now we can verify that both Spark sessions are the same objects:

In [None]:
print(spark)
print(spark_2)

Check Spark version:

In [None]:
spark.version

Note, Spark context (and other contexts) are accessible from the Spark session object - `spark`:

In [None]:
sc = spark.sparkContext

In [None]:
sc

Another example: access Spark configuration parameters:

In [None]:
spark.sparkContext._conf.getAll()

## Data in Spark

### Read from a File to Spark Data Frame 

We can read data in Spark data frames, for example, from a `csv` file.

In [None]:
df_iris = spark.read.csv("../data/raw/iris.csv", header=True, inferSchema =True)

Note, the data above is the famous _Iris_ sample by Fisher:
 * [_Iris_ Data Set at Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/iris)
 * [_Iris_ flower data set](https://en.wikipedia.org/wiki/Iris_flower_data_set)
 * [R. A. Fisher (1936). "The use of multiple measurements in taxonomic problems". Annals of Eugenics. 7 (2): 179–188](https://onlinelibrary.wiley.com/doi/abs/10.1111/j.1469-1809.1936.tb02137.x)

In [None]:
df_iris

In [None]:
df_iris.show(5)

### Save Spark Data Frame to Table

Let's save the data frame `df_iris` to Hive table:

In [None]:
df_iris.write.saveAsTable("iris_tb")

We can see **tables** available in Spark cluster with `catalog.listTables()` method:

In [None]:
print(spark.catalog.listTables())

We can see **databases** available in Spark cluster with `catalog.listDatabases()` method:

In [None]:
print(spark.catalog.listDatabases())

Note, it is located inside `spark-warehouse` which we defined above in Spark configuration.

We can also register Spark data frame into **TEMPORARY** table to make it available within other contexts as well (for example, within SQL context) but only from the specific Spark session that was used to create the data frame.

There are two methods:
 * `createTempView()` - the lifetime of this temporary table is tied to the `SparkSession` that was used to create this `DataFrame`. It throws `TempTableAlreadyExistsException`, if the view name already exists in the catalog.
 * `createOrReplaceTempView()` -  similar to above but safely creates a new temporary table if nothing was there before, or updates an existing table if one was already defined. This is recommended method.

In [None]:
df_iris.createOrReplaceTempView("iris_temp")

Let's examine catalog again and see that new table `iris_temp` is there and listed as temporary:

In [None]:
print(spark.catalog.listTables())

### Read from Table to Spark Data Frame 

We can read the **entire** table into data frame using method `table()` as follows:

In [None]:
df_iris_2 = spark.table("iris_tb")

In [None]:
df_iris_2.show(10)

Or we can perform **SQL query** on the table to read results into data frame. Method `sql()` allows to run queries as follows:

In [None]:
query = "FROM iris_tb SELECT * WHERE class_iris = 'Iris-versicolor' LIMIT 10"

In [None]:
flowers10 = spark.sql(query)

In [None]:
flowers10.show()

### Convert Spark Data Frame to Pandas Data Frame

If resulting Spark data frame has manageable size it could be converted to **Pandas** dataframe with method `toPandas()` as follows:

In [None]:
pdf_flowers10 = flowers10.toPandas()

In [None]:
pdf_flowers10.info()

In [None]:
pdf_flowers10.describe()

### Convert Pandas Data Frame to Spark Data Frame

We can convert Pandas data frame to Spark data frame using `createDataFrame()` method with Pandas data frame as argument as follows:

In [None]:
flowers10_tmp = spark.createDataFrame(pdf_flowers10)

In [None]:
flowers10_tmp

In [None]:
flowers10_tmp.show()

## Manipulating Data in Spark

### Creating a New Column in a Data Frame

Method `withColumn()` allows to perform column-wise operations. It takes two arguments:
 * `colName` - a string containing the name of the new column
 * `col` - a column expression
and returns a new DataFrame with the new column added.  Note, data frames in Spark are **imutable**, i.e. can't be changed in place, but we can reassign resulting data frame to the initial data frame:

In [None]:
df_iris = df_iris.withColumn("sepal_area_cm2", df_iris.sepal_length_cm * df_iris.sepal_width_cm)

In [None]:
df_iris.show(10)

Similarly, we can create a new column of boolean values based on a condition as follows:

In [None]:
df_iris = df_iris.withColumn("sepal_length_big", df_iris.sepal_length_cm > 6.0)

In [None]:
df_iris.show(10)

### Renaming a Column in a Data Frame

A column could be renamed using `withColumnRenamed()` method:

In [None]:
df_iris = df_iris.withColumnRenamed("sepal_area_cm2", "sepal_area_cm_squared")

In [None]:
df_iris.show(5)

### Filtering Records in a Data Frame

Method `filter()` allows to select only **rows** from a Spark data frame that satisfy given condition. This method accpets one argument - the condition expression which could be constructed as  follows:
 * SQL expression
 * or Spark column of boolean values. 

The following is an example of SQL expression used for filtering. Note, that the expression must be **string** and doesn't contain data frame name (use `"sepal_length_cm > 6.0"` but not `"df_iris.sepal_length_cm > 6.0"`):

In [None]:
df_iris.filter("sepal_length_cm > 6.0").show(10)

This is an example of Spark column of boolean values used for filtering. Note, it does contain name of the data frame (use `df_iris.sepal_length_cm > 6.0` but not `sepal_length_cm > 6.0`) and it is **not** string:

In [None]:
df_iris.filter(df_iris.sepal_length_cm > 6.0).show(10)

Note, that Spark column used in the filter could be defined separately as follows: 

In [None]:
filter_long_sepal = df_iris.sepal_length_cm > 6.0

In [None]:
df_iris.filter(filter_long_sepal).show(10)

### Selecting Columns from a Data Frame

Method `select()` allows to select **columns** from Spark data frame with given names. This method accpets multiple arguments - names of columns as follows:
 * string name
 * or column object. 

The following is an example of **string** column name used for selection (use `"sepal_length_cm"` but not `"df_iris.sepal_length_cm"`):

In [None]:
df_iris.select("sepal_length_cm", "sepal_width_cm").show(5)

The following is an example of **column object** used for selection (use `df_iris.sepal_length_cm` but not `sepal_length_cm`):

In [None]:
df_iris.select(df_iris.sepal_length_cm, df_iris.sepal_width_cm).show(5)

We can mix both types of arguments:

In [None]:
df_iris.select("sepal_length_cm", df_iris.sepal_width_cm).show(5)

The same method `select()` could be also used to apply **column-wise operations**. It is applied **only** to column objects as follows (applying to SQL strings `select("sepal_length_cm*10")` would **not** work):

In [None]:
df_iris.select(df_iris.sepal_length_cm*10, df_iris.sepal_width_cm*10).show(5)

Additionally, we can use method `alias()` to rename selected and changed columns as follows:

In [None]:
df_iris.select( (df_iris.sepal_length_cm*10).alias("sepal_length_mm"), (df_iris.sepal_width_cm*10).alias("sepal_width_mm") ).show(5)

Conviniently arguments of the `select()` method could be defined separately and then plugged in:

In [None]:
sepal_length_mm = (df_iris.sepal_length_cm*10).alias("sepal_length_mm")
sepal_width_mm = (df_iris.sepal_width_cm*10).alias("sepal_width_mm")
df_iris.select(sepal_length_mm, sepal_width_mm).show(5)

Notice, in the code above we use `df_iris.select(sepal_length_mm, sepal_width_mm)` but not `df_iris.select(df_iris.sepal_length_mm, df_iris.sepal_width_mm)`.

As noted above, selection **and** operation on columns with `select()` method could be performed using column objects only. But if we want to use SQL strings, then we have to use method `selectExpr()` as follows:

In [None]:
df_iris.selectExpr("sepal_length_cm*10", "sepal_width_cm*10").show(5)

Or we can rename new columns with SQL operator `AS` (similarly as we did above with `alias()` method):

In [None]:
df_iris.selectExpr("sepal_length_cm*10 AS sepal_length_mm", "sepal_width_cm*10 sepal_width_mm").show(5)

Again, let's use both methods next to each other to demonstrate that results are the same:

In [None]:
df_iris.select( (df_iris.sepal_length_cm*10).alias("sepal_length_mm"), (df_iris.sepal_width_cm*10).alias("sepal_width_mm") ).show(5)

In [None]:
df_iris.selectExpr("sepal_length_cm*10 AS sepal_length_mm", "sepal_width_cm*10 sepal_width_mm").show(5)

### Difference between `withColumn` and `select`

Method `select()` creates a new data frame with only columns specified as its arguments.

Method `withColumn()` creates a new data fram with **all** columns of original data frame plus new column specified with its two arguments.

In [None]:
df_iris.select( (df_iris.sepal_length_cm*10).alias("sepal_length_mm") ).show(5)

In [None]:
df_iris.withColumn("sepal_length_mm", df_iris.sepal_length_cm * 10.0).show(5)

### Aggregating Records in a Data Frame

Aggregation methods follow `groupBy()` method wich creates a `GroupedData` object from Spark data frame.

List of some aggregation methods:
 * `min()`
 * `max()`
 * `avg()`
 * `sum()`
 * `count()`

Aggregation methods are used with **string** column names (column objects don't work, for example use `min("sepal_length_cm")` but not `min(df_iris.sepal_length_cm)`).

Minimal sepal length:

In [None]:
df_iris.groupBy().min("sepal_length_cm").show()

Maximal sepal length:

In [None]:
df_iris.groupBy().max("sepal_length_cm").show()

Average sepal length:

In [None]:
df_iris.groupBy().avg("sepal_length_cm").show()

Sum of all sepal lengths in the table:

In [None]:
df_iris.groupBy().sum("sepal_length_cm").show()

Counts of records in the table:

In [None]:
df_iris.groupBy().count().show()

The `groupBy()` method could accept name of one or more columns as an argument. For example, we can group records by *Iris* classes (there are 3 of them *virginica*, *setosa* and *versicolor*) and calcualte average in each class separately:

In [None]:
df_iris.groupBy("class_iris").avg("sepal_length_cm").show()

Arguments to `groupBy()` could be column name strings (as `"class_iris"` used above) or column objects `df_iris.class_iris` as used in the example below:

In [None]:
df_iris.groupBy(df_iris.class_iris).avg("sepal_length_cm").show()

Another iinteresting application of `groupBy` method is together with `count` method to return number of records for each `Iris` class:

In [None]:
df_iris.groupBy(df_iris.class_iris).count().show()

Finally, any aggreagte function from `pyspark.sql.functions` module could be used with `groupBy()` and `agg()` methods. For example, let's group by *Iris* class and calculate standard deviation:

In [None]:
df_iris.groupBy("class_iris").agg( F.stddev("sepal_length_cm") ).show()

Note, that we import `pyspark.sql.functions` as `F` at the beginning of this notebook.

### Joining Data Frames

Let's create a data frame from `iris_class.csv` file that contains *Iris* classes and corresponding English names:

In [None]:
df_iris_class = spark.read.csv("../data/raw/iris_class.csv", header=True, inferSchema =True)

In [None]:
df_iris_class.show()

Method `join()` creates a new data frame combining information from 2 data frames using a column as a key. The method runs on 1st data frame and accepts 3 arguments:
 * 2nd data frame
 * `on` - name of column to join over (it should be the same name in both data frames; use `withColumnRenamed` if needed to rename)
 * `how` - defines different types of join, we use `leftouter` in the example below

In [None]:
df_iris.join(df_iris_class, on = "class_iris", how = "leftouter").show(5)