# Pyspark

Install pyspark if its not already installed using the below code.

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


# 1. Importing the Libraries

In [2]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# For scaling the data
from pyspark.ml.feature import StandardScaler

# For PCA
from pyspark.ml.feature import PCA

# for Clustering
from pyspark.mllib.clustering import KMeans
from pyspark.ml.clustering import KMeans

# for dataframe operations
from pyspark.sql.functions import when, lit, round, col, ceil
from pyspark.sql.types import FloatType

# for evaluating the clustering results
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from numpy import array
from math import sqrt

# for balancing the data
from pyspark.sql.functions import col, explode, array, lit

# import Pipeline
from pyspark.ml import Pipeline

# import LogisticRegression classifier
from pyspark.ml.classification import LogisticRegression

# Import model evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.sql import SQLContext

# importing the python related libraries for plotting
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


## creating the spark Context

In [3]:
spark = SparkSession.builder\
.master("local[*]")\
.appName("ML Implementation")\
.getOrCreate()
sc = spark.sparkContext

Exception: Java gateway process exited before sending its port number

We will first import the cleaned dataset exported from the EDA step of R environment.

# 2. Loading the data

In [None]:
# Load and parse the data
ks_df = spark.read.csv("/Users/umasid/My_Workspace/PysparkProj-MultiClass-Classification/dataClean.csv", header=True, inferSchema=True)
ks_df.printSchema()

In [None]:
ks_df.show()

As we don't need some of the columns like launched, deadline and _c0 we will drop them along with the dates

In [None]:
# selecting only the necessary columns
ks_df = ks_df.select(['category', 'main_category','currency','backers','country','usd_pledged','usd_goal','launch_gap','state'])
ks_df.show()

In [None]:
# making a list of columns 
all_cols = ks_df.columns
all_cols

In [None]:
# looking at the modified schema
ks_df.printSchema()

In [None]:
# listing the categorical columns
cat_cols = ['category','main_category','currency','country','state']
cat_cols

In [None]:
# listing the category column names after indexing.
cat_cols_indexed = ['category_indexed','main_category_indexed','currency_indexed','country_indexed', 'state_indexed']
cat_cols_indexed

## Creating a dataframe with numerical columns.

We are going to perform PCA and do clustering on the PCA columns. As PCA is done on only on the continuous variables, we need to subset the numerical columns.

In [None]:
# selecting only the necessary columns
ks_df_num = ks_df.select(['backers','usd_pledged','usd_goal','launch_gap'])
ks_df_num.show()

# 3. Scale the dataframe with numerical columns

In [None]:
# Creating the dense vector of all the input features using vector assembler
vector_assembler1 = VectorAssembler(inputCols=['backers','usd_pledged','usd_goal','launch_gap'], outputCol='features')

In [None]:
# transforming the data
ks_df_scaled1 = vector_assembler1.transform(ks_df)
ks_df_scaled1.show(2)

In [None]:
# Applying scaling on the features vector.
standard_scaler1 = StandardScaler(inputCol='features', outputCol='scaled_features')
ks_df_scaled1 = standard_scaler1.fit(ks_df_scaled1).transform(ks_df_scaled1)
ks_df_scaled1.show(5)

## Performing PCA

We will first reduce the number of dimensions to two using the principal component analysis 

In [None]:
#Applying PCA
pca = PCA(k=2, inputCol='scaled_features', outputCol='pca')
model = pca.fit(ks_df_scaled1)

In [None]:
# transform
ks_df_pca = model.transform(ks_df_scaled1)

In [None]:
# looking at the datatype of the object ks_df_pca
type(ks_df_pca)

In [None]:
# looking at the important columns
ks_df_pca.select(['features','scaled_features', 'pca', 'state']).show(5)

In [None]:
# printing the pca vector
ks_df_pca.select(['pca']).show(truncate=False)

Now, we have completed the PCA and reduced the dimension to two. We will now apply clustering using these principal componenets and try to visualize the clsters formed.

# K-Means Clustering

We will use the K-Means Clustering algorithm to cluster the data

**Plan for Clustering**

1. Find the ideal K-value, which represents the number of clusters.
2. Clustering the data using the observed best k-value
3. Evaluating the cluster results


**Finding out the best value for K(number of clusters)**

In [None]:
# Setting the parameters  and creating the object for the evaluator
eval = ClusteringEvaluator(predictionCol='prediction', featuresCol='pca', metricName='silhouette', distanceMeasure='squaredEuclidean')

In [None]:
# Trying different possible k-values and finding out the silhouette score for each of those k-values.
silhouette_score = []
print("""
Silhoutte Scores for K Mean Clustering
=======================================
Model\tScore\t
=====\t=====\t
""")
for k in range(2,11):
    kmeans_algo = KMeans(featuresCol='pca', k=k)
    kmeans_fit = kmeans_algo.fit(ks_df_pca)
    output = kmeans_fit.transform(ks_df_pca)
    score = eval.evaluate(output)
    silhouette_score.append(score)
    #print(f"K{k}\t {round(score,2)}\t")
    print(f"K{k}\t", score,"\t")

In [None]:
# Plotting the silhouette scores to identify the best k value
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1,1, figsize=(10,10))
ax.plot(range(2,11), silhouette_score)
ax.set_xlabel('K')
ax.set_ylabel('Score');

From the above silhoutte scores it is clear that at k=2 we have the best score. As this is a plot of Silhoutte score vs K we need to take the global maxima. So, we wil consider the best number of clusters as 2.

Now, lets cluster the data with k=2

**Train and Evaluate**

In [None]:
# Clustering the data using k=3
kmeans = KMeans(featuresCol = 'pca', k=2)
model = kmeans.fit(ks_df_pca)
ks_df_cls = model.transform(ks_df_pca)

In [None]:
# Evaluating the silihoutte score of the cluster
eval = ClusteringEvaluator(featuresCol='pca', metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette = eval.evaluate(ks_df_cls)
print(f"Silhouette with squared euclidean distance: {silhouette}")

In [None]:
# Looking at the cluster centers
centers = model.clusterCenters()
print('Cluster Centers:')
for center in centers:
    print(center)

In [None]:
ks_df_cls.show()

In [None]:
# Column names for splitting the pca vector
column_names = ['pc1', 'pc2']
column_names

In [None]:
# Selecting only pca and state colummns for plotting.
ks_df_sub = ks_df_pca.select('pca','state')
ks_df_sub.show(5, truncate=False)


In [None]:
#Splitting the vactor pca in to pc0 and pc1
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    # Important: asNondeterministic requires Spark 2.3 or later
    # It can be safely removed i.e.
    # return udf(to_array_, ArrayType(DoubleType()))(col)
    # but at the cost of decreased performance
    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)

ks_df_sub1 = (ks_df_sub
    .withColumn("pc", to_array(col("pca")))
    .select(["state"] + [col("pc")[i] for i in range(2)]))

In [None]:
ks_df_sub1.show(5, truncate=False)

In [None]:
# Plotting the formed clusters
ks_df_sub2 = ks_df_sub1.toPandas()
sns.scatterplot(x='pc[0]', y='pc[1]', data=ks_df_sub2, hue='state')

As the clusters seems to be inseparable, let's look at the Silhouette Coefficient of the clusters and decide what's going on. 

In [None]:
# Evaluating the silihoutte score of the cluster
eval = ClusteringEvaluator()
silhouette1 = eval.evaluate(ks_df_cls)
print(f"Silhouette with squared euclidean distance: {silhouette1}")

The Silhouette coefficient near to zero indicates that the clusters are inseparable or barely separable. It can be also be represented as the distance between the clusters is insignificant. But the score here is less than 0.4 so, we can conclude that the clusters are identifiable but the distance between them is not significant.

# Machine Learning Implementation.

In the machine learning implementation we are going to use the multinomial Logistic Regression algorithm. As our data is having four different classes, we are going to use the technique of passing weights of the classes to the Logistic Regression algorithm to balance the dataset. 

**Plan for Machine Learning Implementation**
1. One-hot encode the categorical columns
2. Scale the data
3. Calculate the weights of the classes
4. Apply the multinomial Logistic Regression algorithm

In [None]:
# Looking into the dataframe we need to model
ks_df.show(5)

## One-Hot Encoding the categorical columns.

As there are many categorical columns, let's encode them using the one-hot encoder

In [None]:
# Creating the String indexer and fitting the data to it.
indexer = StringIndexer(inputCols=cat_cols, outputCols=cat_cols_indexed)
ks_df = indexer.fit(ks_df).transform(ks_df)
ks_df.show()

In [None]:
# listing the category column names before encoding
cat_cols_indexed_be = ['category_indexed','main_category_indexed','currency_indexed','country_indexed']
cat_cols_indexed

# listing the category column names after encoding.
cat_cols_indexed_ae = ['category_indexed_O','main_category_indexed_O','currency_indexed_O','country_indexed_O']
cat_cols_indexed_ae

In [None]:
# One Hot encoding implementation using the indexed columns
encoder = OneHotEncoder(inputCols=cat_cols_indexed_be, outputCols=cat_cols_indexed_ae)
model =encoder.fit(ks_df)
ks_df = model.transform(ks_df)
ks_df.show()

Now lets drop the cat_cols_indexed from our dataframe ks_df

In [None]:
# selecting only the necessary columns
ks_df = ks_df.select(['category', 'main_category','currency','backers','country','usd_pledged','usd_goal','launch_gap','state', 'category_indexed_O','main_category_indexed_O','currency_indexed_O','country_indexed_O'])
ks_df.show()

As we have different ranges of values in the numerical columns we need to scale the data inorder to overcome the bias while machine learning model training.

### Scaling the data

In [None]:
# making a list of columns 
inputcols = ['backers','usd_pledged','usd_goal','launch_gap','category_indexed_O','main_category_indexed_O','currency_indexed_O','country_indexed_O']
inputcols

In [None]:
# Creating the dense vector of all the input features using vector assembler
vector_assembler = VectorAssembler(inputCols=inputcols, outputCol='features')

In [None]:
# transforming the data
ks_df_scaled = vector_assembler.transform(ks_df)
ks_df_scaled.show(2)

We will use the standardscaler from the pyspark.ml.feature

In [None]:
# Applying scaling on the features vector.
standard_scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
ks_df_scaled = standard_scaler.fit(ks_df_scaled).transform(ks_df_scaled)
ks_df_scaled.show(5)

In [None]:
# Slicing the dataframe to have only the scaled_features and labels.
ks_df_ss = ks_df_scaled.selectExpr("scaled_features as features", "state as state")
ks_df_ss.show(10)

## Inspecting the Balance of the dataset

As we are dealing with a multi-class classification, we need to look at the balance of the dataset and try to balance it if it is imbalanced. 

In [None]:
ks_df_ss.groupBy('state').count().orderBy(col('count').desc()).show()

As we have an imbalanced dataset. Let's try to balance it using Oversampling technique

# Balancing the data using Oversampling

In [None]:
# Calculating the ratio of weights to oversample
failed_df = ks_df_ss.filter(col("state") == 'failed')
successful_df = ks_df_ss.filter(col("state") == 'successful')
canceled_df = ks_df_ss.filter(col("state") == 'canceled')
suspended_df = ks_df_ss.filter(col("state") == 'suspended')

ratio_fai_suc = int(failed_df.count()/successful_df.count())
ratio_fai_can = int(failed_df.count()/canceled_df.count())
ratio_fai_sus = int(failed_df.count()/suspended_df.count())

print("ratio_fai_suc: {}".format(ratio_fai_suc))
print("ratio_fai_can: {}".format(ratio_fai_can))
print("ratio_fai_sus: {}".format(ratio_fai_sus))

The actual 'failed' state records are almost 2 times higher than the 'successful' records. As we got the ratio of successful projects to the failed classes as 1, we need to inspect the actual float number before rounding off and try to round it off to a higher number. To balance the data better.

In [None]:
# Inspecting the actual ration in float
ratio_fai_suc1 = float(failed_df.count()/successful_df.count())
print(ratio_fai_suc1)

As the ratio is 1.76 which can be rounded of to 2 to balance the data better, we will use ratio_fai_suc+1 to oversample the data.

## Oversampling the data

In [None]:
# duplicate the minority rows in Successful state
os_suc_df = successful_df.withColumn("dummy", explode(array([lit(x) for x in range(int(ratio_fai_suc+1))]))).drop('dummy')
# combine both oversampled successful rows and previous majority rows 
failed_succ_df = failed_df.unionAll(os_suc_df)

# duplicate the minority rows in Canceled state
os_can_df = canceled_df.withColumn("dummy", explode(array([lit(x) for x in range(ratio_fai_can)]))).drop('dummy')
# combine both oversampled canceled rows and previous majority rows 
failed_succ_can_df = failed_succ_df.unionAll(os_can_df)

# duplicate the minority rows in Suspended state
os_sus_df = suspended_df.withColumn("dummy", explode(array([lit(x) for x in range(ratio_fai_sus)]))).drop('dummy')
# combine both oversampled suspended rows and previous majority rows 
ks_df_os = failed_succ_can_df.unionAll(os_sus_df)


ks_df_os.show()

In [None]:
# inspecting the balance of the data after oversampling.
ks_df_os.groupBy('state').count().orderBy(col('count').desc()).show()

Now the data looks quite balanced with an acceptable variation among the counts.

## Indexing the State column

In [None]:
# Creating the String indexer and fitting state column to it
indexer = StringIndexer(inputCol='state', outputCol='state_indexed')
ks_df_sliced = indexer.fit(ks_df_os).transform(ks_df_os)
ks_df_sliced.show()

In [None]:
ks_df_sliced.filter(ks_df_sliced.state_indexed==0).show(1)
ks_df_sliced.filter(ks_df_sliced.state_indexed==1).show(1)
ks_df_sliced.filter(ks_df_sliced.state_indexed==2).show(1)
ks_df_sliced.filter(ks_df_sliced.state_indexed==3).show(1)

From the above step it is clear that we have labels setup as below.

0 --> successful

1 --> failed

2 --> suspended

3 --> canceled

In [None]:
# Slicing the dataframe to have only the scaled_features and labels.
ks_df_sliced = ks_df_sliced.selectExpr("features as features", "state_indexed as labels")
ks_df_sliced.show(10)

In [None]:
#Checking the datatypes
ks_df_sliced.printSchema()

The weight column is read as string. we need to change it to float

## Splitting the dataset into train and test sets

Now, we will split the dataset in to train and test sets randomly.

In [None]:
train, test = ks_df_sliced.randomSplit([0.75, 0.25], seed = 100)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
test.groupBy('labels').count().orderBy(col('count').desc()).show()

## Logistic Regression Classifier

In [None]:
# configuring and training the Logistic Regression classifier using the training data
lr = LogisticRegression(featuresCol = 'features', labelCol = 'labels', maxIter=10)
#lr.setWeightCol("weight")
lrModel = lr.fit(train)


In [None]:
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

# Performance Metrics

In [None]:
# Getting the training summary.
trainingSummary = lrModel.summary

In [None]:
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
          % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
# $example off$

# Testing with the test dataset

In [None]:
predictions = lrModel.transform(test)
predictions.show(5)

In [None]:
# Area under ROC of the model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='labels')
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
predictions.show(20)

In [None]:
predictions.groupBy('labels', 'prediction').count().show()

In [None]:
# compute TN, TP, FN, and FP
predictions.groupBy('labels', 'prediction').count().show()

In [None]:
preds_and_labels = predictions.select(['prediction','labels']).withColumn('labels', col('labels').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','labels'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())

In [None]:
# Class-0- Failed projects
print('\n--------------Class-0 Successful Projects----------------')
print('True Positive Rate :', metrics.truePositiveRate(0.0))
print('False Positive Rate:', metrics.falsePositiveRate(0.0))
print('Precision          :', metrics.precision(0.0))
print('recall             :', metrics.recall(0.0))
print('f-measure          :', metrics.fMeasure(0.0))


# Class-1- Successful projects
print('\n--------------Class-1 Failed Projects----------------')
print('True Positive Rate :', metrics.truePositiveRate(1.0))
print('False Positive Rate:', metrics.falsePositiveRate(1.0))
print('Precision          :', metrics.precision(1.0))
print('recall             :', metrics.recall(1.0))
print('f-measure          :', metrics.fMeasure(1.0))

# Class-2- Canclled projects
print('\n--------------Class-2 Suspended Projects----------------')
print('True Positive Rate :', metrics.truePositiveRate(2.0))
print('False Positive Rate:', metrics.falsePositiveRate(2.0))
print('Precision          :', metrics.precision(2.0))
print('recall             :', metrics.recall(2.0))
print('f-measure          :', metrics.fMeasure(2.0))

# Class-3- Suspended projects
print('\n--------------Class-3 Canceled Projects----------------')
print('True Positive Rate :', metrics.truePositiveRate(3.0))
print('False Positive Rate:', metrics.falsePositiveRate(3.0))
print('Precision          :', metrics.precision(3.0))
print('recall             :', metrics.recall(3.0))
print('f-measure          :', metrics.fMeasure(3.0))

# Overall Accuracy
print('\n Overall Accuracy:',metrics.accuracy)

Teh model performed excellent in predicting the 'Successful' state projects. It did reasonable in predicting 'Failed' and 'Suspended' states but due to the low f-measure value, these states results can be interpreted as below par. But the model performed poor in predicting the 'canceled' state. 

As per the kickstarter's platform about section, the projects 'successful' and 'failure' states are important and the other two states are due to many other reasons outside the data. So, let's try and model the data as binary classification by only using 'successful' and 'failed' states.

# Binary Classification Using only 'Successful' and 'Failed' state projects.

In [None]:
# Load and parse the data
ks_df1 = spark.read.csv("../input/kickstartercleandata/dataClean.csv", header=True, inferSchema=True)
ks_df1.printSchema()

In [None]:
ks_df1.show(10)

As we don't need some of the columns like launched, deadline and _c0 we will drop them along with the dates

In [None]:
# selecting only the necessary columns
ks_df1 = ks_df1.select(['category', 'main_category','currency','backers','country','usd_pledged','usd_goal','launch_gap','state'])
ks_df1.show()

In [None]:
# looking at the modified schema
ks_df1.printSchema()

# Dropping the 'Canceled' and 'Suspended' state records

In [None]:
ks_df1 = ks_df1.where((col('state')=='successful') | (col('state')=='failed'))
ks_df1.show(10)

In [None]:
ks_df1.groupBy('state').count().show()

Now, we are only left with two states of records.

In [None]:
# making a list of columns 
all_cols = ks_df1.columns
all_cols

In [None]:
# listing the categorical columns
cat_cols = ['category','main_category','currency','country','state']
cat_cols

In [None]:
# listing the category column names after indexing.
cat_cols_indexed = ['category_indexed','main_category_indexed','currency_indexed','country_indexed', 'state_indexed']
cat_cols_indexed

# Machine Learning Implementation for Binary Classification

In the machine learning implementation we are going to use the multinomial Logistic Regression algorithm. As our data is having four different classes, we are going to use the technique of passing weights of the classes to the Logistic Regression algorithm to balance the dataset.

Plan for Machine Learning Implementation

1. One-hot encode the categorical columns
2. Scale the data
3. Calculate the weights of the classes
4. Apply the multinomial Logistic Regression algorithm

## One-Hot Encoding the categorical columns.

As there are many categorical columns, let's encode them using the one-hot encoder

In [None]:
# Creating the String indexer and fitting the data to it.
indexer = StringIndexer(inputCols=cat_cols, outputCols=cat_cols_indexed)
ks_df1 = indexer.fit(ks_df1).transform(ks_df1)
ks_df1.show()

In [None]:
# listing the category column names before encoding
cat_cols_indexed_be = ['category_indexed','main_category_indexed','currency_indexed','country_indexed']
cat_cols_indexed

# listing the category column names after encoding.
cat_cols_indexed_ae = ['category_indexed_O','main_category_indexed_O','currency_indexed_O','country_indexed_O']
cat_cols_indexed_ae

In [None]:
# One Hot encoding implementation using the indexed columns
encoder = OneHotEncoder(inputCols=cat_cols_indexed_be, outputCols=cat_cols_indexed_ae)
model =encoder.fit(ks_df1)
ks_df1 = model.transform(ks_df1)
ks_df1.show()

Now lets drop the cat_cols_indexed from our dataframe ks_df

In [None]:
# selecting only the necessary columns
ks_df1 = ks_df1.select(['category', 'main_category','currency','backers','country','usd_pledged','usd_goal','launch_gap','state', 'category_indexed_O','main_category_indexed_O','currency_indexed_O','country_indexed_O'])
ks_df1.show()

As we have different ranges of values in the numerical columns we need to scale the data inorder to overcome the bias while machine learning model training.

## Scaling the data

In [None]:
# making a list of columns 
inputcols = ['backers','usd_pledged','usd_goal','launch_gap','category_indexed_O','main_category_indexed_O','currency_indexed_O','country_indexed_O']
inputcols

In [None]:
# Creating the dense vector of all the input features using vector assembler
vector_assembler = VectorAssembler(inputCols=inputcols, outputCol='features')

In [None]:
# transforming the data
ks_df_scaled = vector_assembler.transform(ks_df1)
ks_df_scaled.show(2)

We will use the standardscaler from the pyspark.ml.feature

In [None]:
# Applying scaling on the features vector.
standard_scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
ks_df_scaled = standard_scaler.fit(ks_df_scaled).transform(ks_df_scaled)
ks_df_scaled.show(5)

In [None]:
# Slicing the dataframe to have only the scaled_features and labels.
ks_df_ss = ks_df_scaled.selectExpr("scaled_features as features", "state as state")
ks_df_ss.show(10)

## Inspecting the Balance of the dataset

As we are dealing with a multi-class classification, we need to look at the balance of the dataset and try to balance it if it is imbalanced.

In [None]:
ks_df_ss.groupBy('state').count().orderBy(col('count').desc()).show()

From the above counts, it is evident that the data is imbalanced. So, we need to use a balancing technique while applying the machinelearning.
So, we will use the oversampling technique so that we won't loosing any data unlike the undersampling technique.

# Balancing the data using Oversampling

In [None]:
# Calculating the ratio of weights to oversample
failed_df = ks_df_ss.filter(col("state") == 'failed')
successful_df = ks_df_ss.filter(col("state") == 'successful')

ratio_fai_suc = int(failed_df.count()/successful_df.count())

print("ratio_fai_suc: {}".format(ratio_fai_suc))

The actual 'failed' state records are almost 2 times higher than the 'successful' records. As we got the ratio of successful projects to the failed classes as 1, we need to inspect the actual float number before rounding off and try to round it off to a higher number. To balance the data better.

In [None]:
# Inspecting the actual ration in float
ratio_fai_suc1 = float(failed_df.count()/successful_df.count())
print(ratio_fai_suc1)

As the ratio is 1.76 which can be rounded of to 2 to balance the data better, we will use ratio_fai_suc+1 to oversample the data.

## Oversampling the data

In [None]:
# duplicate the minority rows in successful state
os_suc_df = successful_df.withColumn("dummy", explode(array([lit(x) for x in range(int(ratio_fai_suc+1))]))).drop('dummy')

# combine both oversampled minority rows and previous majority rows 
ks_df_os = failed_df.unionAll(os_suc_df)

ks_df_os.show(5)

In [None]:
# inspecting the balance of the data after oversampling.
ks_df_os.groupBy('state').count().orderBy(col('count').desc()).show()

In [None]:
ks_df_os.show(5)

In [None]:
# Creating the String indexer and fitting state column to it
indexer = StringIndexer(inputCol='state', outputCol='state_indexed')
ks_df_sliced = indexer.fit(ks_df_os).transform(ks_df_os)
ks_df_sliced.show()

In [None]:
ks_df_sliced.filter(ks_df_sliced.state_indexed==0).show(1)
ks_df_sliced.filter(ks_df_sliced.state_indexed==1).show(1)

From the above step it is clear that we have labels setup as below.

0 --> successful

1 --> failed

In [None]:
# Slicing the dataframe to have only the scaled_features and labels.
ks_df_sliced = ks_df_sliced.selectExpr("features as features", "state_indexed as labels")
ks_df_sliced.show(10)

In [None]:
#Checking the datatypes
ks_df_sliced.printSchema()

# Splitting the dataset into train and test sets

Now, we will split the dataset in to train and test sets randomly.

In [None]:
train, test = ks_df_sliced.randomSplit([0.75, 0.25], seed = 100)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

# Logistic Regression Classifier

In [None]:
# configuring and training the Logistic Regression classifier using the training data
lr = LogisticRegression(featuresCol = 'features', labelCol = 'labels', maxIter=10)
#lr.setWeightCol("weight")
lrModel = lr.fit(train)

In [None]:
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

In [None]:
# Getting the training summary.
trainingSummary = lrModel.summary

In [None]:
# calculate the statistics summary for the Logistic Regression model
trainingSummary = lrModel.summary
# plot the ROC curve from the calculated summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set Area Under ROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
          % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
# $example off$

# Testing with the test dataset¶

In [None]:
predictions = lrModel.transform(test)
predictions.show(5)

In [None]:
# Area under ROC of the model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='labels')
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol='labels')
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
# computing TruePositive, TrueNegative, FalsePositive, FalseNegative
predictions.groupBy('labels', 'prediction').count().show()
# Calculate the elements of the confusion matrix
TN = predictions.filter('prediction = 0 AND labels = prediction').count()
TP = predictions.filter('prediction = 1 AND labels = prediction').count()
FN = predictions.filter('prediction = 0 AND labels <> prediction').count()
FP = predictions.filter('prediction = 1 AND labels <> prediction').count()
# calculate accuracy, precision, recall, and F1-score
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F = 2 * (precision*recall) / (precision + recall)
print('precision:', precision)
print('recall   :', recall)
print('accuracy :', accuracy)
print('F1 score :', F)

So, with the above metrics, we can conclude that the data can be modeled with very good accuracy when we only use 'successful' and 'failed' state project records.