# Classification in PySpark's MLlib

PySpark offers a good variety of algorithms that can be applied to classification machine learning problems. However, because PySpark operates on distributed dataframes, we cannot use popular Python libraries like scikit learn for our machine learning applications. Which means we need to use PySpark's MLlib packages for these tasks. Luckily, MLlib offers a pretty good variety of algorithms! In this notebook we will go over how to prep our data and train and test the classification algorithms PySpark offers. 

## Algorithms Available

PySpark offers the following algorithms for classification. 

1. Logistic Regression 
2. Naive Bayes
3. One Vs Rest
4. Linear Support Vector Machine (SVC)
5. Random Forest Classifier
6. GBT Classifier
7. Decision Tree Classifier
8. Multilayer Perceptron Classifier (Neural Network)

In [2]:
#Importing pysark and creating a session
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('classification1').getOrCreate()
spark

In [3]:
#Importing required libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

### Data Set Name: Autistic Spectrum Disorder Screening Data for Adult
Autistic Spectrum Disorder (ASD) is a neurodevelopment condition associated with significant healthcare costs, and early diagnosis can significantly reduce these. Unfortunately, waiting times for an ASD diagnosis are lengthy and procedures are not cost effective. The economic impact of autism and the increase in the number of ASD cases across the world reveals an urgent need for the development of easily implemented and effective screening methods. Therefore, a time-efficient and accessible ASD screening is imminent to help health professionals and inform individuals whether they should pursue formal clinical diagnosis. The rapid growth in the number of ASD cases worldwide necessitates datasets related to behaviour traits. However, such datasets are rare making it difficult to perform thorough analyses to improve the efficiency, sensitivity, specificity and predictive accuracy of the ASD screening process. Presently, very limited autism datasets associated with clinical or screening are available and most of them are genetic in nature. Hence, we propose a new dataset related to autism screening of adults that contained 20 features to be utilised for further analysis especially in determining influential autistic traits and improving the classification of ASD cases. In this dataset, we record ten behavioural features (AQ-10-Adult) plus ten individuals characteristics that have proved to be effective in detecting the ASD cases from controls in behaviour science.

### Source: 
https://www.kaggle.com/faizunnabi/autism-screening

In [9]:
path ="datasets-mlib/"
df = spark.read.csv(path+'Toddler Autism dataset July 2018.csv',inferSchema=True,header=True)

In [10]:
#Checking the dataset
df.limit(6).toPandas()

Unnamed: 0,Case_No,A1,A2,A3,A4,A5,A6,A7,A8,A9,A10,Age_Mons,Qchat-10-Score,Sex,Ethnicity,Jaundice,Family_mem_with_ASD,Who completed the test,Class/ASD Traits
0,1,0,0,0,0,0,0,1,1,0,1,28,3,f,middle eastern,yes,no,family member,No
1,2,1,1,0,0,0,1,1,0,0,0,36,4,m,White European,yes,no,family member,Yes
2,3,1,0,0,0,0,0,1,1,0,1,36,4,m,middle eastern,yes,no,family member,Yes
3,4,1,1,1,1,1,1,1,1,1,1,24,10,m,Hispanic,no,no,family member,Yes
4,5,1,1,0,1,1,1,1,1,1,1,20,9,f,White European,no,yes,family member,Yes
5,6,1,1,0,0,1,1,1,1,1,1,21,8,m,black,no,no,family member,Yes


In [11]:
df.printSchema()

root
 |-- Case_No: integer (nullable = true)
 |-- A1: integer (nullable = true)
 |-- A2: integer (nullable = true)
 |-- A3: integer (nullable = true)
 |-- A4: integer (nullable = true)
 |-- A5: integer (nullable = true)
 |-- A6: integer (nullable = true)
 |-- A7: integer (nullable = true)
 |-- A8: integer (nullable = true)
 |-- A9: integer (nullable = true)
 |-- A10: integer (nullable = true)
 |-- Age_Mons: integer (nullable = true)
 |-- Qchat-10-Score: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Jaundice: string (nullable = true)
 |-- Family_mem_with_ASD: string (nullable = true)
 |-- Who completed the test: string (nullable = true)
 |-- Class/ASD Traits : string (nullable = true)



In the dataset,
- Inependent variables (features): Case_No - Who completed the test
- Dependent variable: Class/ASD Traits 

In [12]:
#Identifying the number of classes in the dpenedent variable
df.groupBy("Class/ASD Traits ").count().show()

+-----------------+-----+
|Class/ASD Traits |count|
+-----------------+-----+
|               No|  326|
|              Yes|  728|
+-----------------+-----+



### Formatting data

MLib requires all the data to be vectorized

In [13]:
#Separating dependent and independent variables
input_columns = df.columns # Collect the column names as a list
input_columns = input_columns[1:-1] # keep only relevant columns: from column 1 to 
input_columns

['A1',
 'A2',
 'A3',
 'A4',
 'A5',
 'A6',
 'A7',
 'A8',
 'A9',
 'A10',
 'Age_Mons',
 'Qchat-10-Score',
 'Sex',
 'Ethnicity',
 'Jaundice',
 'Family_mem_with_ASD',
 'Who completed the test']

In [14]:
dependent_var = 'Class/ASD Traits '
dependent_var

'Class/ASD Traits '

In [16]:
#Convwrting the dependent variables columsn from yes, n to 0 and 1
renamed = df.withColumn("label_str", df[dependent_var].cast(StringType())) #Rename and change to string type
indexer = StringIndexer(inputCol="label_str", outputCol="label") #Pyspark is expecting the this naming convention 

In [20]:
indexed = indexer.fit(renamed).transform(renamed)
indexed.limit(5).toPandas()

Unnamed: 0,Case_No,A1,A2,A3,A4,A5,A6,A7,A8,A9,...,Age_Mons,Qchat-10-Score,Sex,Ethnicity,Jaundice,Family_mem_with_ASD,Who completed the test,Class/ASD Traits,label_str,label
0,1,0,0,0,0,0,0,1,1,0,...,28,3,f,middle eastern,yes,no,family member,No,No,1.0
1,2,1,1,0,0,0,1,1,0,0,...,36,4,m,White European,yes,no,family member,Yes,Yes,0.0
2,3,1,0,0,0,0,0,1,1,0,...,36,4,m,middle eastern,yes,no,family member,Yes,Yes,0.0
3,4,1,1,1,1,1,1,1,1,1,...,24,10,m,Hispanic,no,no,family member,Yes,Yes,0.0
4,5,1,1,0,1,1,1,1,1,1,...,20,9,f,White European,no,yes,family member,Yes,Yes,0.0


In [21]:
#Converting all the string type data in the input column list to numeric, otherwise pyspark cannot process it
numeric_inputs = []
string_inputs = []
for column in input_columns:
    #Identifying string type variables
    if str(indexed.schema[column].dataType) == 'StringType':
        #Setting up string indexer function 
        indexer = StringIndexer(inputCol=column, outputCol=column+"_num")
        #Calling the indexer function
        indexed = indexer.fit(indexed).transform(indexed)
        #Collecting the new column names
        new_col_name = column+"_num"
        #Appending the column name to string input list
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)

In [22]:
indexed.limit(5).toPandas()

Unnamed: 0,Case_No,A1,A2,A3,A4,A5,A6,A7,A8,A9,...,Family_mem_with_ASD,Who completed the test,Class/ASD Traits,label_str,label,Sex_num,Ethnicity_num,Jaundice_num,Family_mem_with_ASD_num,Who completed the test_num
0,1,0,0,0,0,0,0,1,1,0,...,no,family member,No,No,1.0,1.0,2.0,1.0,0.0,0.0
1,2,1,1,0,0,0,1,1,0,0,...,no,family member,Yes,Yes,0.0,0.0,0.0,1.0,0.0,0.0
2,3,1,0,0,0,0,0,1,1,0,...,no,family member,Yes,Yes,0.0,0.0,2.0,1.0,0.0,0.0
3,4,1,1,1,1,1,1,1,1,1,...,no,family member,Yes,Yes,0.0,0.0,5.0,0.0,0.0,0.0
4,5,1,1,0,1,1,1,1,1,1,...,yes,family member,Yes,Yes,0.0,1.0,0.0,0.0,1.0,0.0


#### Treating for skewness and outliers

Skewness measures how much a distribution of values deviates from symmetry around the mean. A value of zero means the distribution is symmetric, while a positive skewness indicates a greater number of smaller values, and a negative value indicates a greater number of larger values. 

As a general rule of thumb: 

 - If skewness is **less than -1 or greater than 1**, the distribution is highly skewed. 
 - If skewness is **between -1 and -0.5 or between 0.5 and 1**, the distribution is moderately skewed. 
 - If skewness is **between -0.5 and 0.5**, the distribution is approximately symmetric.
 
A common recommendation for treating skewness is either a log transformation for positive skewed data or an exponential transformation for negatively skewed data.


**Outliers** <br>
One common way to correct outliers is by flooring and capping which means editing any value that is above or below a certain threshold (99th percentile or 1st percentile) back to the highest/lowest value in that percentile. For example, if the 99th percentile is 96 and there is a value of 1,000, you would change that value to 96. 

In [23]:
#Creating empty dictionary d
d = {}
# Creating a dictionary of quantiles from numeric cols, Doing the top and bottom 1% but it can be adjusted if needed
for col in numeric_inputs: 
    d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number

#Now check for skewness for all numeric cols
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
    skew = skew[0][0]
    # If skewness is found,
    # This function will make the appropriate corrections
    if skew > 1: # If right skew, floor, cap and log(x+1)
        indexed = indexed.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1: # If left skew floor, cap and exp(x)
        indexed = indexed.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

As I didn't get any print statements executed above, I can conclude the data is not treated for skewness

In [24]:
#Checking for negative values in the dataframe 
#Producign a warning if there are negative values in the dataframe that Naive Bayes cannot be used
#Note: Checking only the numeric input values since anything that is indexed won't have negative values

# Calculating the mins for all columns in the df
minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) 
# Creating an array for all mins and select only the input cols
min_array = minimums.select(array(numeric_inputs).alias("mins")) 
# Collecting global min as Python object
df_minimum = min_array.select(array_min(min_array.mins)).collect() 
# Slicing to get the number itself
df_minimum = df_minimum[0][0] 

# If there are ANY negative vals found in the df, print a warning message
if df_minimum < 0:
    print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
else:
    print("No negative values were found in your dataframe.")

No negative values were found in your dataframe.


In [25]:
# Now creating final features list
features_list = numeric_inputs + string_inputs
# Creating vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And calling on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')

In [26]:
output.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(17,[6,7,9,10,11,...|  1.0|
|(17,[0,1,5,6,10,1...|  0.0|
|(17,[0,6,7,9,10,1...|  0.0|
|[1.0,1.0,1.0,1.0,...|  0.0|
|[1.0,1.0,0.0,1.0,...|  0.0|
|[1.0,1.0,0.0,0.0,...|  0.0|
|(17,[0,3,4,5,8,10...|  0.0|
|(17,[1,4,6,7,8,9,...|  0.0|
|(17,[6,9,10,11,13...|  1.0|
|[1.0,1.0,1.0,0.0,...|  0.0|
|[1.0,0.0,0.0,1.0,...|  0.0|
|[1.0,1.0,1.0,1.0,...|  0.0|
|(17,[10,12,13,14]...|  1.0|
|[1.0,1.0,1.0,1.0,...|  0.0|
|(17,[10,13],[18.0...|  1.0|
|(17,[0,1,2,4,6,7,...|  0.0|
|(17,[10,13,15],[3...|  1.0|
|[1.0,1.0,1.0,0.0,...|  0.0|
|(17,[0,4,9,10,11,...|  1.0|
|[1.0,1.0,1.0,0.0,...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [28]:
# Creating the mix max scaler object (process if there are negative values present)
#Udsing range 0 to 1000

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
# Computing summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(output)

Features scaled to range: [0.000000, 1000.000000]


In [29]:
#Rescaling each feature to range [min, max].
scaled_data = scalerModel.transform(output)
scaled_data

DataFrame[features: vector, label: double, scaledFeatures: vector]

In [30]:
final_data = scaled_data.select('label','scaledFeatures')
# Renaming to default value
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(17,[6,7,9,10,11,...|
|  0.0|(17,[0,1,5,6,10,1...|
|  0.0|(17,[0,6,7,9,10,1...|
|  0.0|[1000.0,1000.0,10...|
|  0.0|[1000.0,1000.0,0....|
|  0.0|[1000.0,1000.0,0....|
|  0.0|(17,[0,3,4,5,8,10...|
|  0.0|(17,[1,4,6,7,8,9,...|
|  1.0|(17,[6,9,10,11,13...|
|  0.0|[1000.0,1000.0,10...|
|  0.0|[1000.0,0.0,0.0,1...|
|  0.0|[1000.0,1000.0,10...|
|  1.0|(17,[10,12,13,14]...|
|  0.0|[1000.0,1000.0,10...|
|  1.0|(17,[10,13],[250....|
|  0.0|(17,[0,1,2,4,6,7,...|
|  1.0|(17,[10,13,15],[1...|
|  0.0|[1000.0,1000.0,10...|
|  1.0|(17,[0,4,9,10,11,...|
|  0.0|(17,[0,1,2,4,6,7,...|
+-----+--------------------+
only showing top 20 rows



#### Splitting the data into training and splitting data

In [31]:
#Splitting the data into train and test
train, test = final_data.randomSplit([0.7,0.3])

In [32]:
train.count()

751

In [33]:
test.count()

303

##### Reading in dependencies from PySpark

In [34]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [35]:
#Setting up our evaluation objects
binary_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction')
MC_eval = MulticlassClassificationEvaluator(metricName='accuracy')

### Implementing Logistic Regression
The Logistic Regression Algorithm, also known as "Logit", is used to estimate (guess) the probability (a number between 0 and 1) of an event occurring having been given some previous data to “learn” from. It works with either binary or multinomial (more than 2 categories) data and uses logistic function (ie. log) to find a model that fits with the data points.

In [36]:
#Initiatng logistic regression constructor
classifier = LogisticRegression()

In [37]:
fitModel = classifier.fit(train)

In [38]:
# Evaluation method for binary classification problem
predictionAndLabels = fitModel.transform(test)
auc = binary_eval.evaluate(predictionAndLabels)
print("AUC:",auc)

AUC: 1.0


#### Cross valdation

In [39]:
#Instantiating the classifier
classifier = LogisticRegression()

In [40]:
#Setting up parameter grid for the cross validator
paramGrid = (ParamGridBuilder().addGrid(classifier.maxIter, [10,15,20]).build())

#Setting up cross validator
crossval = CrossValidator(estimator=classifier,
                         estimatorParamMaps=paramGrid,
                         evaluator=MC_eval,
                         numFolds=2)

In [41]:
#Fitting the model
fitModel = crossval.fit(train)

In [42]:
#Collecting the best model
BestModel = fitModel.bestModel
print("Intercept:" + str(BestModel.interceptVector))
print("Coefficients" + str(BestModel.coefficientMatrix))

Intercept:[26.84292337550994]
CoefficientsDenseMatrix([[-5.68957493e-03, -5.45318699e-03, -5.16569166e-03,
              -5.87594904e-03, -5.09657030e-03, -5.92895640e-03,
              -6.17518685e-03, -5.54693837e-03, -6.17501559e-03,
              -5.79022842e-03, -6.69887614e-04, -1.58644578e-02,
               8.22293467e-05, -1.15586711e-03, -7.19309787e-05,
               2.25025336e-04, -2.71323550e-03]])


In [44]:
#We can extract the best model from this run like below
LR_BestModel = BestModel
print("Best model:", LR_BestModel)
#Generating the prediction
predictions = fitModel.transform(test)

#Calculating the accuracy
accuracy = (MC_eval.evaluate(predictions))*100
print("Accuracy:",accuracy)

Best model: LogisticRegressionModel: uid=LogisticRegression_63545fb5557c, numClasses=2, numFeatures=17
Accuracy: 100.0


#### Classification Diagnostics

In [48]:
#Loading the summary
trainingSummary = LR_BestModel.summary

In [49]:
#Generate descibe
trainingSummary.predictions.describe().show()

+-------+------------------+------------------+
|summary|             label|        prediction|
+-------+------------------+------------------+
|  count|               751|               751|
|   mean|0.3249001331557923|0.3249001331557923|
| stddev| 0.468649645271727| 0.468649645271727|
|    min|               0.0|               0.0|
|    max|               1.0|               1.0|
+-------+------------------+------------------+



In [50]:
#Obtaining the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("Objective History at each iteration")
for objective in objectiveHistory:
    print(objective)

Objective History at each iteration
0.6305080142244923
0.5213027152588892
0.45697304682735296
0.28172682941507166
0.25450563807118987
0.2150518588866581
0.19477127698376415
0.17340394922408264
0.14213392712486164
0.08315770329642269
0.06440491310915293
0.026968891896859117
0.018902946707378104
0.013639940469738451
0.01221273057277904
0.00588014351459365


In [51]:
#For multiclass, we can inspect metrics on a per-label basis
print(" ")
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print(" ")
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print(" ")
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print(" ")
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print(" ")
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

 
False positive rate by label:
label 0: 0.0
label 1: 0.0
 
True positive rate by label:
label 0: 1.0
label 1: 1.0
 
Precision by label:
label 0: 1.0
label 1: 1.0
 
Recall by label:
label 0: 1.0
label 1: 1.0
 
F-measure by label:
label 0: 1.0
label 1: 1.0


In [52]:
#Generating confusion matrix and print (includes accuracy)
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print(" ")
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

 
Accuracy: 1.0
FPR: 0.0
TPR: 1.0
F-measure: 1.0
Precision: 1.0
Recall: 1.0
