# Spark ML - Classification

## Prepare the Spark session

In [None]:
# Import findspark
import findspark

# Configure the environment
findspark.init()

# Import the Spark components required for the session creation
from pyspark import SparkConf
from pyspark.sql import SparkSession

# Configure and create the session
conf = SparkConf()
conf = conf.setAppName('mds-session')
conf = conf.setMaster('local[*]')
spark = SparkSession.builder.config(conf = conf).getOrCreate()

## Package import

In [None]:
# Import required packages
import pandas as pd
from sklearn.metrics import roc_curve
from plotnine import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier,
    GBTClassifier
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## Read a sample CSV

In [None]:
# Read a sample dataset
data = spark.read.options(sep=',', header=True, inferSchema=True).csv('./data/housing.csv')

# Convert the target variable to Double
data = data.withColumn('chas', data.chas.cast('Double'))

## Target vs. features

In [None]:
target = 'chas'
features = [col for col in data.columns if col != target]

## Vectorize inputs

In [None]:
# Create the assembler
assembler = VectorAssembler(inputCols=features, outputCol='features')

# Apply the transformation
vectorized_data = assembler.transform(data)

# Check the transformed data
vectorized_data.show(5)

## Split train-test

In [None]:
# Split train test (70-30)
train, test = vectorized_data.randomSplit([0.7, 0.3])
print(f'Train size: {train.count()}')
print(f'Test size: {test.count()}')

## Logistic regression

### Create the model

In [None]:
# Create the logistict regression classifier
lr = LogisticRegression(
    featuresCol='features', 
    labelCol=target, 
    predictionCol='prediction',
    probabilityCol='probability',
    regParam=0,
    elasticNetParam=0,
    fitIntercept=True,
    standardization=False,
    threshold=0.05
)

### Train and analyze the model

In [None]:
# Train the model
trained_lr = lr.fit(train)

In [None]:
# Retrieve the whole set of coefficients and intercept value
coefs = list(trained_lr.coefficients.toArray()) + [trained_lr.intercept]
pd.DataFrame({
    'features': features + ['Intercept'], 
    'coefficients': coefs
})

### Evaluate the model

In [None]:
# Retrieve train and test predictions
train_preds = trained_lr.transform(train)
test_preds = trained_lr.transform(test)

# Create an evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol=target)

In [None]:
# Evaluate in train
train_AUC = evaluator.evaluate(train_preds, {evaluator.metricName: 'areaUnderROC'})
train_AUPR = evaluator.evaluate(train_preds, {evaluator.metricName: 'areaUnderPR'})

# Evaluate in test
test_AUC = evaluator.evaluate(test_preds, {evaluator.metricName: 'areaUnderROC'})
test_AUPR = evaluator.evaluate(test_preds, {evaluator.metricName: 'areaUnderPR'})

# Display model metrics
print(f'AUC (Train/Test): {train_AUC} / {test_AUC}')
print(f'AUPR (Train/Test): {train_AUPR} / {test_AUPR}')

In [None]:
# Display confussion matrix in train
train_results = train_preds.groupBy('chas', 'prediction').count().toPandas()
train_results.columns = ['actual', 'prediction', 'frequency']
confusion_matrix = pd.DataFrame({'prediction': [0, 1, 0, 1], 'actual': [0, 0, 1, 1]})
confusion_matrix = pd.merge(confusion_matrix, train_results, how='left').fillna(0).astype(int)
confusion_matrix = confusion_matrix.assign(
    actual = confusion_matrix.actual.astype(str),
    prediction = confusion_matrix.prediction.astype(str),
)
confusion_matrix.set_index(['prediction', 'actual']).unstack()

In [None]:
# Display the confussion matrix in test
test_results = test_preds.groupBy('chas', 'prediction').count().toPandas()
test_results.columns = ['actual', 'prediction', 'frequency']
confusion_matrix = pd.DataFrame({'prediction': [0, 1, 0, 1], 'actual': [0, 0, 1, 1]})
confusion_matrix = pd.merge(confusion_matrix, test_results, how='left').fillna(0).astype(int)
confusion_matrix = confusion_matrix.assign(
    actual = confusion_matrix.actual.astype(str),
    prediction = confusion_matrix.prediction.astype(str),
)
confusion_matrix.set_index(['prediction', 'actual']).unstack()

In [None]:
# Display train ROC curve
train_values = train_preds.select(target, 'probability').toPandas()
train_values = train_values.assign(probability = train_values.probability.apply(lambda x: x[1]))
fpr, tpr, _ = roc_curve(train_values.chas, train_values.probability)
(ggplot(pd.DataFrame({'fpr': fpr, 'tpr': tpr}), aes(x='fpr', y='tpr')) + geom_line())

In [None]:
# Display test ROC curve
test_values = test_preds.select(target, 'probability').toPandas()
test_values = test_values.assign(probability = test_values.probability.apply(lambda x: x[1]))
fpr, tpr, _ = roc_curve(test_values.chas, test_values.probability)
(ggplot(pd.DataFrame({'fpr': fpr, 'tpr': tpr}), aes(x='fpr', y='tpr')) + geom_line())

## Decision tree

### Create the model

In [None]:
# Create the decission tree classifier
tree = DecisionTreeClassifier(
    featuresCol='features', 
    labelCol=target, 
    predictionCol='prediction',
    probabilityCol='probability',
    maxDepth=5,
    minInstancesPerNode=1
)

### Train and analyze the model

In [None]:
# Entrenamos nuestro modelo
trained_tree = tree.fit(train)

In [None]:
# Display feature importance
importance = trained_tree.featureImportances.toArray()
results = pd.DataFrame({
    'features': features, 
    'importance': importance,
})
(
    ggplot(results, aes(x='features', y='importance')) + 
        geom_bar(stat='identity') + 
        coord_flip() + 
        scale_x_discrete(limits=results.sort_values('importance')['features'])
)

### Evaluate the model

In [None]:
# Retrieve predictions in train and test
train_preds = trained_tree.transform(train)
test_preds = trained_tree.transform(test)

# Create the evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol=target)

In [None]:
# Evaluate in train
train_AUC = evaluator.evaluate(train_preds, {evaluator.metricName: 'areaUnderROC'})
train_AUPR = evaluator.evaluate(train_preds, {evaluator.metricName: 'areaUnderPR'})

# Evaluate in test
test_AUC = evaluator.evaluate(test_preds, {evaluator.metricName: 'areaUnderROC'})
test_AUPR = evaluator.evaluate(test_preds, {evaluator.metricName: 'areaUnderPR'})

# Display model metrics
print(f'AUC (Train/Test): {train_AUC} / {test_AUC}')
print(f'AUPR (Train/Test): {train_AUPR} / {test_AUPR}')

In [None]:
# Display results in train
train_results = train_preds.groupBy('chas', 'prediction').count().toPandas()
train_results.columns = ['actual', 'prediction', 'frequency']
confusion_matrix = pd.DataFrame({'prediction': [0, 1, 0, 1], 'actual': [0, 0, 1, 1]})
confusion_matrix = pd.merge(confusion_matrix, train_results, how='left').fillna(0).astype(int)
confusion_matrix = confusion_matrix.assign(
    actual = confusion_matrix.actual.astype(str),
    prediction = confusion_matrix.prediction.astype(str),
)
confusion_matrix.set_index(['prediction', 'actual']).unstack()

In [None]:
# Display results in test
test_results = test_preds.groupBy('chas', 'prediction').count().toPandas()
test_results.columns = ['actual', 'prediction', 'frequency']
confusion_matrix = pd.DataFrame({'prediction': [0, 1, 0, 1], 'actual': [0, 0, 1, 1]})
confusion_matrix = pd.merge(confusion_matrix, test_results, how='left').fillna(0).astype(int)
confusion_matrix = confusion_matrix.assign(
    actual = confusion_matrix.actual.astype(str),
    prediction = confusion_matrix.prediction.astype(str),
)
confusion_matrix.set_index(['prediction', 'actual']).unstack()

In [None]:
# Display train ROC curve
train_values = train_preds.select(target, 'probability').toPandas()
train_values = train_values.assign(probability = train_values.probability.apply(lambda x: x[1]))
fpr, tpr, _ = roc_curve(train_values.chas, train_values.probability)
(ggplot(pd.DataFrame({'fpr': fpr, 'tpr': tpr}), aes(x='fpr', y='tpr')) + geom_line())

In [None]:
# Display test ROC curve
test_values = test_preds.select(target, 'probability').toPandas()
test_values = test_values.assign(probability = test_values.probability.apply(lambda x: x[1]))
fpr, tpr, _ = roc_curve(test_values.chas, test_values.probability)
(ggplot(pd.DataFrame({'fpr': fpr, 'tpr': tpr}), aes(x='fpr', y='tpr')) + geom_line())

## Random forest

### Create the model

In [None]:
# Create a random forest classifier
rf = RandomForestClassifier(
    featuresCol='features', 
    labelCol=target, 
    predictionCol='prediction',
    probabilityCol='probability',
    numTrees=100,
    maxDepth=5,
    minInstancesPerNode=1
)

### Train and analyze the model

In [None]:
# Train the model
trained_rf = rf.fit(train)

In [None]:
# Display feature importance
importance = trained_rf.featureImportances.toArray()
results = pd.DataFrame({
    'features': features, 
    'importance': importance,
})
(
    ggplot(results, aes(x='features', y='importance')) + 
        geom_bar(stat='identity') + 
        coord_flip() + 
        scale_x_discrete(limits=results.sort_values('importance')['features'])
)

### Evaluate the model

In [None]:
# Retrieve predictions in train and test
train_preds = trained_rf.transform(train)
test_preds = trained_rf.transform(test)

# Create the evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol=target)

In [None]:
# Evaluate in train
train_AUC = evaluator.evaluate(train_preds, {evaluator.metricName: 'areaUnderROC'})
train_AUPR = evaluator.evaluate(train_preds, {evaluator.metricName: 'areaUnderPR'})

# Evaluate in test
test_AUC = evaluator.evaluate(test_preds, {evaluator.metricName: 'areaUnderROC'})
test_AUPR = evaluator.evaluate(test_preds, {evaluator.metricName: 'areaUnderPR'})

# Display model metrics
print(f'AUC (Train/Test): {train_AUC} / {test_AUC}')
print(f'AUPR (Train/Test): {train_AUPR} / {test_AUPR}')

In [None]:
# Display results in train
train_results = train_preds.groupBy('chas', 'prediction').count().toPandas()
train_results.columns = ['actual', 'prediction', 'frequency']
confusion_matrix = pd.DataFrame({'prediction': [0, 1, 0, 1], 'actual': [0, 0, 1, 1]})
confusion_matrix = pd.merge(confusion_matrix, train_results, how='left').fillna(0).astype(int)
confusion_matrix = confusion_matrix.assign(
    actual = confusion_matrix.actual.astype(str),
    prediction = confusion_matrix.prediction.astype(str),
)
confusion_matrix.set_index(['prediction', 'actual']).unstack()

In [None]:
# Display results in test
test_results = test_preds.groupBy('chas', 'prediction').count().toPandas()
test_results.columns = ['actual', 'prediction', 'frequency']
confusion_matrix = pd.DataFrame({'prediction': [0, 1, 0, 1], 'actual': [0, 0, 1, 1]})
confusion_matrix = pd.merge(confusion_matrix, test_results, how='left').fillna(0).astype(int)
confusion_matrix = confusion_matrix.assign(
    actual = confusion_matrix.actual.astype(str),
    prediction = confusion_matrix.prediction.astype(str),
)
confusion_matrix.set_index(['prediction', 'actual']).unstack()

In [None]:
# Display train ROC curve
train_values = train_preds.select(target, 'probability').toPandas()
train_values = train_values.assign(probability = train_values.probability.apply(lambda x: x[1]))
fpr, tpr, _ = roc_curve(train_values.chas, train_values.probability)
(ggplot(pd.DataFrame({'fpr': fpr, 'tpr': tpr}), aes(x='fpr', y='tpr')) + geom_line())

In [None]:
# Display test ROC curve
test_values = test_preds.select(target, 'probability').toPandas()
test_values = test_values.assign(probability = test_values.probability.apply(lambda x: x[1]))
fpr, tpr, _ = roc_curve(test_values.chas, test_values.probability)
(ggplot(pd.DataFrame({'fpr': fpr, 'tpr': tpr}), aes(x='fpr', y='tpr')) + geom_line())

## Gradient Boosted Tree

### Create the model

In [None]:
# Create the GBT classifier
gbt = GBTClassifier(
    featuresCol='features', 
    labelCol=target, 
    predictionCol='prediction',
    maxIter=20,
    maxDepth=8,
    minInstancesPerNode=5
)

### Train and analyze the model

In [None]:
# Train the model
trained_gbt = gbt.fit(train)

In [None]:
# Display feature importance
importance = trained_gbt.featureImportances.toArray()
results = pd.DataFrame({
    'features': features, 
    'importance': importance,
})
(
    ggplot(results, aes(x='features', y='importance')) + 
        geom_bar(stat='identity') + 
        coord_flip() + 
        scale_x_discrete(limits=results.sort_values('importance')['features'])
)

### Evaluate the model

In [None]:
# Retrieve predictions in train and test
train_preds = trained_gbt.transform(train)
test_preds = trained_gbt.transform(test)

# Create the evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol=target)

In [None]:
# Evaluate in train
train_AUC = evaluator.evaluate(train_preds, {evaluator.metricName: 'areaUnderROC'})
train_AUPR = evaluator.evaluate(train_preds, {evaluator.metricName: 'areaUnderPR'})

# Evaluate in test
test_AUC = evaluator.evaluate(test_preds, {evaluator.metricName: 'areaUnderROC'})
test_AUPR = evaluator.evaluate(test_preds, {evaluator.metricName: 'areaUnderPR'})

# Display model metrics
print(f'AUC (Train/Test): {train_AUC} / {test_AUC}')
print(f'AUPR (Train/Test): {train_AUPR} / {test_AUPR}')

In [None]:
# Display results in train
train_results = train_preds.groupBy('chas', 'prediction').count().toPandas()
train_results.columns = ['actual', 'prediction', 'frequency']
confusion_matrix = pd.DataFrame({'prediction': [0, 1, 0, 1], 'actual': [0, 0, 1, 1]})
confusion_matrix = pd.merge(confusion_matrix, train_results, how='left').fillna(0).astype(int)
confusion_matrix = confusion_matrix.assign(
    actual = confusion_matrix.actual.astype(str),
    prediction = confusion_matrix.prediction.astype(str),
)
confusion_matrix.set_index(['prediction', 'actual']).unstack()

In [None]:
# Display results in test
test_results = test_preds.groupBy('chas', 'prediction').count().toPandas()
test_results.columns = ['actual', 'prediction', 'frequency']
confusion_matrix = pd.DataFrame({'prediction': [0, 1, 0, 1], 'actual': [0, 0, 1, 1]})
confusion_matrix = pd.merge(confusion_matrix, test_results, how='left').fillna(0).astype(int)
confusion_matrix = confusion_matrix.assign(
    actual = confusion_matrix.actual.astype(str),
    prediction = confusion_matrix.prediction.astype(str),
)
confusion_matrix.set_index(['prediction', 'actual']).unstack()

In [None]:
# Display train ROC curve
train_values = train_preds.select(target, 'probability').toPandas()
train_values = train_values.assign(probability = train_values.probability.apply(lambda x: x[1]))
fpr, tpr, _ = roc_curve(train_values.chas, train_values.probability)
(ggplot(pd.DataFrame({'fpr': fpr, 'tpr': tpr}), aes(x='fpr', y='tpr')) + geom_line())

In [None]:
# Display test ROC curve
test_values = test_preds.select(target, 'probability').toPandas()
test_values = test_values.assign(probability = test_values.probability.apply(lambda x: x[1]))
fpr, tpr, _ = roc_curve(test_values.chas, test_values.probability)
(ggplot(pd.DataFrame({'fpr': fpr, 'tpr': tpr}), aes(x='fpr', y='tpr')) + geom_line())

## Close the Spark session

In [None]:
spark.stop()