## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark Cluster, read a local CSV and store it to Hadoop as partitioned parquet files. Is also shows how to ustilise MlLIB to train ML model by Spark.

## 2. Connection to Spark Cluster

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [2]:
import numpy as np
import pyspark
from pyspark.sql import SparkSession
import logging

spark = SparkSession.\
        builder.\
        appName("pyspark-bank").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

# Get the SparkContext
sc = spark.sparkContext

logger = logging.getLogger("py4j")
logger.setLevel(logging.ERROR)
logging.basicConfig(level=logging.ERROR)
sc.setLogLevel('ERROR')

## 3. Data Preparation
We will now load data from a local CSV and store it to Hadoop partitioned by column.
Afterward you can access Hadoop UI to explore the saved parquet files.
Access Hadoop UI on 'http://bigdata:9870' (Utilities -> Browse the files system )

In [3]:
import pandas as pd
import numpy as np
from pyspark.sql.types import *
from pyspark.sql import functions as F
import os
import time

### 3.1. Load

In [4]:
bank_df = pd.read_csv('./data_bank/bank-additional-full.csv', sep=';')
bank_df.head()

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,...,campaign,pdays,previous,poutcome,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,y
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
2,37,services,married,high.school,no,yes,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
4,56,services,married,high.school,no,no,yes,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no


In [5]:
#Create PySpark DataFrame from Pandas
bank_spark_df = spark.createDataFrame(bank_df)

  for column, series in pdf.iteritems():


### 3.2. Set number of partitions

In [6]:
number_of_partitions = bank_spark_df.rdd.getNumPartitions()
print("Number of partitions:", number_of_partitions)

Number of partitions: 2


In [7]:
bank_spark_df = bank_spark_df.repartition(10)
bank_spark_df.cache()
bank_spark_df.unpersist();

In [8]:
number_of_partitions = bank_spark_df.rdd.getNumPartitions()
print("Number of partitions:", number_of_partitions)

Number of partitions: 10


### 3.3. Remove dots in the column names

In [9]:
bank_spark_df = bank_spark_df.select([F.col("`" + col + "`").alias(col.replace('.', '_')) 
                          for col in bank_spark_df.columns])
print("Bank Dataframe created with schema : ")
bank_spark_df.printSchema()

Bank Dataframe created with schema : 
root
 |-- age: long (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- campaign: long (nullable = true)
 |-- pdays: long (nullable = true)
 |-- previous: long (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp_var_rate: double (nullable = true)
 |-- cons_price_idx: double (nullable = true)
 |-- cons_conf_idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr_employed: double (nullable = true)
 |-- y: string (nullable = true)



### 3.4. Check for missing values
Check for missing values and handle them appropriately (e.g. fill missing values with mean, median, or mode).

In [10]:
bank_spark_df = bank_spark_df.na.replace("unknown", None)
bank_spark_df = bank_spark_df.dropna()

### 3.5. One Hot Encoding Categorical data type columns

In [12]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categorical_columns = [bank_spark_df.columns[i] for i in list(range(1, 10)) + [14]]

indexer_output_names = [name + "__indexed" for name in categorical_columns]
encoder_output_names = [name + "__vec" for name in categorical_columns]

indexer = StringIndexer(inputCols=categorical_columns, outputCols=indexer_output_names)
onehoter = OneHotEncoder(inputCols=indexer_output_names, outputCols=encoder_output_names)

indexed_df = indexer.fit(bank_spark_df).transform(bank_spark_df)
one_hoted_df = onehoter.fit(indexed_df).transform(indexed_df)

label_indexer = StringIndexer(inputCol='y', outputCol='label')
one_hoted_df = label_indexer.fit(one_hoted_df).transform(one_hoted_df)

one_hoted_df = one_hoted_df.select([F.col(col) for col in one_hoted_df.columns 
                                          if "__vec" in col or col not in categorical_columns and "__indexed" not in col])
processed_spark_df = one_hoted_df.select([F.col(col) for col in one_hoted_df.columns if col != "y"])

### 3.6. Train & Test split

In [13]:
from pyspark.ml.feature import VectorAssembler

input_cols = processed_spark_df.columns

seed = 42
data_train, data_test = processed_spark_df.randomSplit([0.7, 0.3], seed=seed)
# Use the VectorAssembler to combine the feature columns into a single vector column
assembler = VectorAssembler(inputCols=input_cols, outputCol='features')

# Use the assembler to transform the dataset
data_train = assembler.transform(data_train)
data_test = assembler.transform(data_test)

## 4. Model selection
Choose an appropriate classification model (e.g. Logistic Regression, K-Nearest Neighbors, Decision Trees, Random Forest, Support Vector Machines, Neural Networks, etc.) based on the size and characteristics of the dataset and the problem you are trying to solve.
It's a good idea to use cross-validation techniques to get an accurate estimate of the model performance.

### 4.1. Logistic Regression

In [14]:
from pyspark.ml.classification import LogisticRegression

In [15]:
# Create a logistic regression object
logr = LogisticRegression(featuresCol='features', labelCol='label')

### 4.2. Decision Tree Classifier

In [16]:
from pyspark.ml.classification import DecisionTreeClassifier

In [17]:
# Train a decision tree classifier on the training data
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

### 4.3. Random Forest Classifier

In [18]:
from pyspark.ml.classification import RandomForestClassifier

In [19]:
# Create the Random Forest Classifier
rf = RandomForestClassifier(featuresCol='features', labelCol='label')

## 5. Model Training
Train the model using the training data.

In [22]:
# Fit the models on the training data
lr_model = logr.fit(data_train)
dt_model = dt.fit(data_train)
rf_model = rf.fit(data_train)

                                                                                

## 6. Model Evaluation
Evaluate the performance of the model on the testing data using appropriate metrics such as accuracy, precision, recall, F1-score, confusion matrix, etc.

### 6.1. Use the model to predict the labels for the test data

In [24]:
lr_predictions = lr_model.transform(data_test)
dt_predictions = dt_model.transform(data_test)
rf_predictions = rf_model.transform(data_test)

### 6.2. Evaluate the predictions

In [25]:
def prec_recall(predictions):
    TN = predictions.filter('prediction = 0 AND label = prediction').count()
    TP = predictions.filter('prediction = 1 AND label = prediction').count()
    FN = predictions.filter('prediction = 0 AND label = 1').count()
    FP = predictions.filter('prediction = 1 AND label = 0').count()
    
    # Calculate precision and recall
    try:
        precision = TP / (TP + FP)
    except ZeroDivisionError:
        precision = 0
    try:
        recall = TP / (TP + FN)
    except ZeroDivisionError:
        recall = 0

    return precision, recall


lr_prec_recall = prec_recall(lr_predictions)
dt_prec_recall = prec_recall(dt_predictions)
rf_prec_recall = prec_recall(rf_predictions)

                                                                                

In [26]:
print('Linear Regression:')
print('precision = {:.2f}\nrecall   = {:.2f}'.format(*lr_prec_recall))
print('Decision tree classifier:')
print('precision = {:.2f}\nrecall   = {:.2f}'.format(*dt_prec_recall))
print('Random Forest classifier:')
print('precision = {:.2f}\nrecall   = {:.2f}'.format(*rf_prec_recall))

Linear Regression:
precision = 1.00
recall   = 1.00
Decision tree classifier:
precision = 1.00
recall   = 1.00
Random Forest classifier:
precision = 1.00
recall   = 0.99
