# Introduction to Spark

### Importing the necessary libraries

In [1]:
import os
import re
import json

import warnings
from pprint import pprint

# DML
import numpy as np
import pandas as pd

# Spark
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer

# TODO: TBD

### Setup and configuration

In [2]:
spark = SparkSession.builder \
            .appName('Test0') \
            .getOrCreate()

spark

## DQL

In [80]:
filepath1 = r"./datasets/Sample1.csv"

df = spark.read \
        .option('header', 'true') \
        .csv(filepath1, inferSchema=True)

df.show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|  Himmler|  34|         8| 54000|
|     NULL|  23|         3| 16000|
|  Hermann|  30|         1| 12000|
|     Hans|NULL|      NULL| 22500|
|Hellstrom|  32|         7| 45000|
|     Eric|  35|         1| 20000|
|     Hugo|  45|        10| 67000|
|   Joseph|  27|         9|  NULL|
|     NULL|  32|         6|  NULL|
+---------+----+----------+------+



In [81]:
type(df)

pyspark.sql.classic.dataframe.DataFrame

### Fetching records

In [82]:
for row in df.head(5):
    name, age, exp, sal = row
    print(f"{name}, {age}, {exp}, {sal}")

Himmler, 34, 8, 54000
None, 23, 3, 16000
Hermann, 30, 1, 12000
Hans, None, None, 22500
Hellstrom, 32, 7, 45000


### Visualizing schema

In [83]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [84]:
df.columns

['Name', 'Age', 'Experience', 'Salary']

### Selecting columns

In [85]:
for column in df.columns:
    df.select(column).show()

+---------+
|     Name|
+---------+
|  Himmler|
|     NULL|
|  Hermann|
|     Hans|
|Hellstrom|
|     Eric|
|     Hugo|
|   Joseph|
|     NULL|
+---------+

+----+
| Age|
+----+
|  34|
|  23|
|  30|
|NULL|
|  32|
|  35|
|  45|
|  27|
|  32|
+----+

+----------+
|Experience|
+----------+
|         8|
|         3|
|         1|
|      NULL|
|         7|
|         1|
|        10|
|         9|
|         6|
+----------+

+------+
|Salary|
+------+
| 54000|
| 16000|
| 12000|
| 22500|
| 45000|
| 20000|
| 67000|
|  NULL|
|  NULL|
+------+



In [86]:
df.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [87]:
df.describe().show()

+-------+------+----------------+------------------+-----------------+
|summary|  Name|             Age|        Experience|           Salary|
+-------+------+----------------+------------------+-----------------+
|  count|     7|               8|                 8|                7|
|   mean|  NULL|           32.25|             5.625|33785.71428571428|
| stddev|  NULL|6.45312770235156|3.5431019500674026|21392.86708005175|
|    min|  Eric|              23|                 1|            12000|
|    max|Joseph|              45|                10|            67000|
+-------+------+----------------+------------------+-----------------+



## DML

### Adding columns with operations
> Adding **2 years** of experience to the experience column values

In [88]:
df_exp_2 = df.withColumn('Experience After 2 Years', df['Experience'] + 2)
df_exp_2.show()

+---------+----+----------+------+------------------------+
|     Name| Age|Experience|Salary|Experience After 2 Years|
+---------+----+----------+------+------------------------+
|  Himmler|  34|         8| 54000|                      10|
|     NULL|  23|         3| 16000|                       5|
|  Hermann|  30|         1| 12000|                       3|
|     Hans|NULL|      NULL| 22500|                    NULL|
|Hellstrom|  32|         7| 45000|                       9|
|     Eric|  35|         1| 20000|                       3|
|     Hugo|  45|        10| 67000|                      12|
|   Joseph|  27|         9|  NULL|                      11|
|     NULL|  32|         6|  NULL|                       8|
+---------+----+----------+------+------------------------+



### Dropping columns
> Dropping the **`Experience After 2 Years`** column

In [89]:
df_exp_2 = df_exp_2.drop('Experience After 2 Years')
df_exp_2.show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|  Himmler|  34|         8| 54000|
|     NULL|  23|         3| 16000|
|  Hermann|  30|         1| 12000|
|     Hans|NULL|      NULL| 22500|
|Hellstrom|  32|         7| 45000|
|     Eric|  35|         1| 20000|
|     Hugo|  45|        10| 67000|
|   Joseph|  27|         9|  NULL|
|     NULL|  32|         6|  NULL|
+---------+----+----------+------+



### Renaming columns

In [90]:
df_exp_2 = df_exp_2.withColumnRenamed('Name', 'First Name')
df_exp_2.show()

+----------+----+----------+------+
|First Name| Age|Experience|Salary|
+----------+----+----------+------+
|   Himmler|  34|         8| 54000|
|      NULL|  23|         3| 16000|
|   Hermann|  30|         1| 12000|
|      Hans|NULL|      NULL| 22500|
| Hellstrom|  32|         7| 45000|
|      Eric|  35|         1| 20000|
|      Hugo|  45|        10| 67000|
|    Joseph|  27|         9|  NULL|
|      NULL|  32|         6|  NULL|
+----------+----+----------+------+



### Handling missing values

In [91]:
df.na.drop().show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|  Himmler| 34|         8| 54000|
|  Hermann| 30|         1| 12000|
|Hellstrom| 32|         7| 45000|
|     Eric| 35|         1| 20000|
|     Hugo| 45|        10| 67000|
+---------+---+----------+------+



### Dropping options
- Whether to drop the row if `"any"` or `"all"` values in it's columns are `NULL`
- Keep the row if it has atleast `thresh` number of **non-null** values
- Choosing which `subset` of columns are to be checked for null values

In [96]:
df.na.drop(how='any', thresh=3).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|  Himmler| 34|         8| 54000|
|     NULL| 23|         3| 16000|
|  Hermann| 30|         1| 12000|
|Hellstrom| 32|         7| 45000|
|     Eric| 35|         1| 20000|
|     Hugo| 45|        10| 67000|
|   Joseph| 27|         9|  NULL|
+---------+---+----------+------+



* **`how="any"`** will drop row(s) if **ANY** of their column(s) are `NULL`

* **`how="all"`** will drop row(s) if **ALL** of their column(s) are `NULL`

### Imputation strategies

#### 1. Using **default** values as per the inferred type of the column

In [93]:
df.na.fill({"Age": 0, "Experience": 0, "Name": "nicht"}).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|  Himmler| 34|         8| 54000|
|    nicht| 23|         3| 16000|
|  Hermann| 30|         1| 12000|
|     Hans|  0|         0| 22500|
|Hellstrom| 32|         7| 45000|
|     Eric| 35|         1| 20000|
|     Hugo| 45|        10| 67000|
|   Joseph| 27|         9|  NULL|
|    nicht| 32|         6|  NULL|
+---------+---+----------+------+



#### 2. Using **ML**
- Mindfully pick a strategy, such as `"mean"` or `"median"`

In [98]:
imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=[f"{column}_imputed" for column in df.columns[1:]],
).setStrategy('mean')

In [99]:
imputer.fit(df).transform(df).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|  Himmler|  34|         8| 54000|         34|                 8|         54000|
|     NULL|  23|         3| 16000|         23|                 3|         16000|
|  Hermann|  30|         1| 12000|         30|                 1|         12000|
|     Hans|NULL|      NULL| 22500|         32|                 5|         22500|
|Hellstrom|  32|         7| 45000|         32|                 7|         45000|
|     Eric|  35|         1| 20000|         35|                 1|         20000|
|     Hugo|  45|        10| 67000|         45|                10|         67000|
|   Joseph|  27|         9|  NULL|         27|                 9|         33785|
|     NULL|  32|         6|  NULL|         32|                 6|         33785|
+---------+----+----------+-

### Filters

In [105]:
q1 = "Salary<=20000"
q2 = "Salary>=40000"

df.filter(q1).show()
df.filter(q2).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|   NULL| 23|         3| 16000|
|Hermann| 30|         1| 12000|
|   Eric| 35|         1| 20000|
+-------+---+----------+------+

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|  Himmler| 34|         8| 54000|
|Hellstrom| 32|         7| 45000|
|     Hugo| 45|        10| 67000|
+---------+---+----------+------+



#### Joining two filter queries

In [110]:
df.filter((df['Salary'] <= 20000) | (df['Salary'] >= 40000)).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|  Himmler| 34|         8| 54000|
|     NULL| 23|         3| 16000|
|  Hermann| 30|         1| 12000|
|Hellstrom| 32|         7| 45000|
|     Eric| 35|         1| 20000|
|     Hugo| 45|        10| 67000|
+---------+---+----------+------+



#### Negating a filter query

In [114]:
df.filter(~(df['Salary'] >= 20000)).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|   NULL| 23|         3| 16000|
|Hermann| 30|         1| 12000|
+-------+---+----------+------+



### Aggregations

#### New sample dataset
> One that is more suitable for aggregation use cases.

In [28]:
filepath2 = r"../ML/Data/Sample2.csv"

df2 = spark.read \
        .option('header', 'true') \
        .csv(filepath2, inferSchema=True)

df2.show()

+------+----------------+------+
|  Name|      Department|Salary|
+------+----------------+------+
|  Eric|  Data Analytics| 15000|
|  Eric|  Infrastructure|  8000|
|  Karl|Data Warehousing|  9500|
|  Hans|Data Warehousing| 10000|
|  Karl|  Data Analytics| 12500|
|Thomas|  Data Analytics| 12500|
|Thomas|  Infrastructure|  9000|
|Thomas|Data Warehousing|  8000|
|   Max|  Data Analytics| 15000|
|   Max|Data Warehousing|  8000|
+------+----------------+------+



In [29]:
df2 = df2.withColumnRenamed('Department', 'Dept')

df2.show()

+------+----------------+------+
|  Name|            Dept|Salary|
+------+----------------+------+
|  Eric|  Data Analytics| 15000|
|  Eric|  Infrastructure|  8000|
|  Karl|Data Warehousing|  9500|
|  Hans|Data Warehousing| 10000|
|  Karl|  Data Analytics| 12500|
|Thomas|  Data Analytics| 12500|
|Thomas|  Infrastructure|  9000|
|Thomas|Data Warehousing|  8000|
|   Max|  Data Analytics| 15000|
|   Max|Data Warehousing|  8000|
+------+----------------+------+



#### Convert to `.csv` format
> Should it be necessary... keep in mind that this is ONLY recommended if you are working with **SMALL** datasets.

In [None]:
df2.toPandas() \
    .to_csv(filepath2)

> For high volume datasets, **Spark** will create a **directory**, possibly containing multiple **partitions**.

In [32]:
df2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [33]:
df2.describe().show()

+-------+------+--------------+-----------------+
|summary|  Name|    Department|           Salary|
+-------+------+--------------+-----------------+
|  count|    10|            10|               10|
|   mean|  NULL|          NULL|          10750.0|
| stddev|  NULL|          NULL|2791.355704074038|
|    min|  Eric|Data Analytics|             8000|
|    max|Thomas|Infrastructure|            15000|
+-------+------+--------------+-----------------+



#### Highest **total salary** by `Name`

In [34]:
df2.groupby('Name') \
    .sum() \
    .withColumnRenamed('sum(Salary)', 'Total Salary') \
    .sort(['Total Salary'], ascending=False) \
    .limit(1) \
    .show()

+------+------------+
|  Name|Total Salary|
+------+------------+
|Thomas|       29500|
+------+------------+



#### Highest **average salary** by `Name`

In [35]:
df2.groupby('Name') \
    .avg() \
    .withColumnRenamed('avg(Salary)', 'Total Salary') \
    .sort(['Total Salary'], ascending=False) \
    .limit(1) \
    .show()

+----+------------+
|Name|Total Salary|
+----+------------+
|Eric|     11500.0|
+----+------------+



#### Highest **total salary** by `Department`

In [36]:
df2.groupby('Department') \
    .sum() \
    .withColumnRenamed('sum(Salary)', 'Total Salary') \
    .sort(['Total Salary'], ascending=False) \
    .limit(1) \
    .show()

+--------------+------------+
|    Department|Total Salary|
+--------------+------------+
|Data Analytics|       55000|
+--------------+------------+



#### **Number** of employees by `Department`

In [37]:
df2.groupby('Department') \
    .count() \
    .withColumnRenamed('count', 'Number of Employees') \
    .show()

+----------------+-------------------+
|      Department|Number of Employees|
+----------------+-------------------+
|  Data Analytics|                  4|
|  Infrastructure|                  2|
|Data Warehousing|                  4|
+----------------+-------------------+



#### Grand total salary
> **agg.** for all employees

In [38]:
df2.agg({'Salary' : 'sum'}) \
    .withColumnRenamed('sum(Salary)', 'Grand Total Salary') \
    .show()

+------------------+
|Grand Total Salary|
+------------------+
|            107500|
+------------------+



In [31]:
# TODO: Be seein' ya.