In [1]:
# First let's create our PySpark instance
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Classification").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark
# Click the hyperlinked "Spark UI" link to view details about your Spark session

You are working with 1 core(s)


In [2]:
# Read in functions we will need
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:

df = spark.read.csv('f:\\datasets\\Toddler Autism dataset July 2018.csv',inferSchema=True,header=True)
df.limit(6).toPandas()

Unnamed: 0,Case_No,A1,A2,A3,A4,A5,A6,A7,A8,A9,A10,Age_Mons,Qchat-10-Score,Sex,Ethnicity,Jaundice,Family_mem_with_ASD,Who completed the test,classASDTraits
0,1,0,0,0,0,0,0,1,1,0,1,28,3,f,middle eastern,yes,no,family member,No
1,2,1,1,0,0,0,1,1,0,0,0,36,4,m,White European,yes,no,family member,Yes
2,3,1,0,0,0,0,0,1,1,0,1,36,4,m,middle eastern,yes,no,family member,Yes
3,4,1,1,1,1,1,1,1,1,1,1,24,10,m,Hispanic,no,no,family member,Yes
4,5,1,1,0,1,1,1,1,1,1,1,20,9,f,White European,no,yes,family member,Yes
5,6,1,1,0,0,1,1,1,1,1,1,21,8,m,black,no,no,family member,Yes


In [4]:
df.printSchema()

root
 |-- Case_No: integer (nullable = true)
 |-- A1: integer (nullable = true)
 |-- A2: integer (nullable = true)
 |-- A3: integer (nullable = true)
 |-- A4: integer (nullable = true)
 |-- A5: integer (nullable = true)
 |-- A6: integer (nullable = true)
 |-- A7: integer (nullable = true)
 |-- A8: integer (nullable = true)
 |-- A9: integer (nullable = true)
 |-- A10: integer (nullable = true)
 |-- Age_Mons: integer (nullable = true)
 |-- Qchat-10-Score: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Jaundice: string (nullable = true)
 |-- Family_mem_with_ASD: string (nullable = true)
 |-- Who completed the test: string (nullable = true)
 |-- classASDTraits: string (nullable = true)



In [5]:
#How many classes do we have?
df.groupBy("classASDTraits").count().show()

+--------------+-----+
|classASDTraits|count|
+--------------+-----+
|            No|  326|
|           Yes|  728|
+--------------+-----+



In [6]:
# Declare values you will need

# col_list = ["A1","A2","A3","A4","A5","A6","A7","A8","A9","A10","Age_Mons","Qchat-10-Score","Sex","Ethnicity","Jaundice","Family_mem_with_ASD","Who completed the test"]
# input_columns = col_list

input_columns = df.columns # Collect the column names as a list
input_columns = input_columns[1:-1] # keep only relevant columns: from column 1 to 

dependent_var = 'classASDTraits'

print(input_columns)
print(dependent_var)

['A1', 'A2', 'A3', 'A4', 'A5', 'A6', 'A7', 'A8', 'A9', 'A10', 'Age_Mons', 'Qchat-10-Score', 'Sex', 'Ethnicity', 'Jaundice', 'Family_mem_with_ASD', 'Who completed the test']
classASDTraits


In [7]:


renamed=df.withColumn("label_str",df[dependent_var].cast(StringType()))
indexer=StringIndexer(inputCol='label_str',outputCol="label")
indexed=indexer.fit(renamed).transform(renamed)
indexed.show()

+-------+---+---+---+---+---+---+---+---+---+---+--------+--------------+---+--------------+--------+-------------------+----------------------+--------------+---------+-----+
|Case_No| A1| A2| A3| A4| A5| A6| A7| A8| A9|A10|Age_Mons|Qchat-10-Score|Sex|     Ethnicity|Jaundice|Family_mem_with_ASD|Who completed the test|classASDTraits|label_str|label|
+-------+---+---+---+---+---+---+---+---+---+---+--------+--------------+---+--------------+--------+-------------------+----------------------+--------------+---------+-----+
|      1|  0|  0|  0|  0|  0|  0|  1|  1|  0|  1|      28|             3|  f|middle eastern|     yes|                 no|         family member|            No|       No|  1.0|
|      2|  1|  1|  0|  0|  0|  1|  1|  0|  0|  0|      36|             4|  m|White European|     yes|                 no|         family member|           Yes|      Yes|  0.0|
|      3|  1|  0|  0|  0|  0|  0|  1|  1|  0|  1|      36|             4|  m|middle eastern|     yes|                 no

In [8]:
#indexed.show(5)
indexed.printSchema()

root
 |-- Case_No: integer (nullable = true)
 |-- A1: integer (nullable = true)
 |-- A2: integer (nullable = true)
 |-- A3: integer (nullable = true)
 |-- A4: integer (nullable = true)
 |-- A5: integer (nullable = true)
 |-- A6: integer (nullable = true)
 |-- A7: integer (nullable = true)
 |-- A8: integer (nullable = true)
 |-- A9: integer (nullable = true)
 |-- A10: integer (nullable = true)
 |-- Age_Mons: integer (nullable = true)
 |-- Qchat-10-Score: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Jaundice: string (nullable = true)
 |-- Family_mem_with_ASD: string (nullable = true)
 |-- Who completed the test: string (nullable = true)
 |-- classASDTraits: string (nullable = true)
 |-- label_str: string (nullable = true)
 |-- label: double (nullable = false)



In [9]:

numeric_inputs = []
string_inputs = []
for column in input_columns:
    
    if str(indexed.schema[column].dataType) == 'StringType':
       
        indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
        
        indexed = indexer.fit(indexed).transform(indexed)
       
        new_col_name = column+"_num"
        
        string_inputs.append(new_col_name)
    else:
       
        numeric_inputs.append(column)
        
print('numeric_inputs' , numeric_inputs)
print('String_inputs' , string_inputs)

numeric_inputs ['A1', 'A2', 'A3', 'A4', 'A5', 'A6', 'A7', 'A8', 'A9', 'A10', 'Age_Mons', 'Qchat-10-Score']
String_inputs ['Sex_num', 'Ethnicity_num', 'Jaundice_num', 'Family_mem_with_ASD_num', 'Who completed the test_num']


In [10]:
# Treat for skewness
# Flooring and capping
# Plus if right skew take the log +1
# if left skew do exp transformation
# This is best practice

# create empty dictionary d
d = {}
# Create a dictionary of quantiles from your numeric cols
# I'm doing the top and bottom 1% but you can adjust if needed
for col in numeric_inputs: 
    d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number

#Now check for skewness for all numeric cols
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
    skew = skew[0][0]
    # If skewness is found,
    # This function will make the appropriate corrections
    if skew > 1: # If right skew, floor, cap and log(x+1)
        indexed = indexed.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1: # If left skew floor, cap and exp(x)
        indexed = indexed.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")
        
print(skew)        

-0.08006349388828135


In [11]:
# Now check for negative values in the dataframe. 
# Produce a warning if there are negative values in the dataframe that Naive Bayes cannot be used. 
# Note: we only need to check the numeric input values since anything that is indexed won't have negative values

# Calculate the mins for all columns in the df
minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) 
# Create an array for all mins and select only the input cols
min_array = minimums.select(array(numeric_inputs).alias("mins")) 
# Collect golobal min as Python object
df_minimum = min_array.select(array_min(min_array.mins)).collect() 
# Slice to get the number itself
df_minimum = df_minimum[0][0] 

# If there are ANY negative vals found in the df, print a warning message
if df_minimum < 0:
    print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
else:
    print("No negative values were found in your dataframe.")

No negative values were found in your dataframe.


In [12]:
# Before we correct for negative values that may have been found above, 
# We need to vectorize our df
# becauase the function that we use to make that correction requires a vector. 
# Now create your final features list
features_list = numeric_inputs + string_inputs
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')
output.show(5,False)

+-----------------------------------------------------------------------+-----+
|features                                                               |label|
+-----------------------------------------------------------------------+-----+
|(17,[6,7,9,10,11,12,13,14],[1.0,1.0,1.0,28.0,3.0,1.0,2.0,1.0])         |1.0  |
|(17,[0,1,5,6,10,11,14],[1.0,1.0,1.0,1.0,36.0,4.0,1.0])                 |0.0  |
|(17,[0,6,7,9,10,11,13,14],[1.0,1.0,1.0,1.0,36.0,4.0,2.0,1.0])          |0.0  |
|[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,24.0,10.0,0.0,5.0,0.0,0.0,0.0]|0.0  |
|[1.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,20.0,9.0,1.0,0.0,0.0,1.0,0.0] |0.0  |
+-----------------------------------------------------------------------+-----+
only showing top 5 rows



In [13]:
# Create the mix max scaler object 
# This is what will correct for negative values
# I like to use a high range like 1,000 
#     because I only see one decimal place in the final_data.show() call
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(output)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')
# Rename to default value
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show()

Features scaled to range: [0.000000, 1000.000000]
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(17,[6,7,9,10,11,...|
|  0.0|(17,[0,1,5,6,10,1...|
|  0.0|(17,[0,6,7,9,10,1...|
|  0.0|[1000.0,1000.0,10...|
|  0.0|[1000.0,1000.0,0....|
|  0.0|[1000.0,1000.0,0....|
|  0.0|(17,[0,3,4,5,8,10...|
|  0.0|(17,[1,4,6,7,8,9,...|
|  1.0|(17,[6,9,10,11,13...|
|  0.0|[1000.0,1000.0,10...|
|  0.0|[1000.0,0.0,0.0,1...|
|  0.0|[1000.0,1000.0,10...|
|  1.0|(17,[10,12,13,14]...|
|  0.0|[1000.0,1000.0,10...|
|  1.0|(17,[10,13],[250....|
|  0.0|(17,[0,1,2,4,6,7,...|
|  1.0|(17,[10,13,15],[1...|
|  0.0|[1000.0,1000.0,10...|
|  1.0|(17,[0,4,9,10,11,...|
|  0.0|(17,[0,1,2,4,6,7,...|
+-----+--------------------+
only showing top 20 rows



In [14]:
train,test = final_data.randomSplit([0.70,0.30])

In [15]:
nbclassifier=NaiveBayes()

In [16]:
nbcModel = nbclassifier.fit(train)

In [17]:
predictions = nbcModel.transform(test)

In [18]:
predictions.printSchema()
predictions.select('label','rawPrediction','probability','prediction').show()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+-----+--------------------+--------------------+----------+
|label|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|  0.0|[-25686.734619812...|           [1.0,0.0]|       0.0|
|  0.0|[-25045.161743504...|           [1.0,0.0]|       0.0|
|  0.0|[-23762.380021002...|           [1.0,0.0]|       0.0|
|  0.0|[-23015.083115530...|           [1.0,0.0]|       0.0|
|  0.0|[-26038.772257369...|           [1.0,0.0]|       0.0|
|  0.0|[-22727.718553757...|           [1.0,0.0]|       0.0|
|  0.0|[-23159.876223679...|           [1.0,0.0]|       0.0|
|  0.0|[-21178.577515970...|           [1.0,0.0]|       0.0|
|  0.0|[-25166.148072382...|           [1.0,0.0]|       0.0|
|  0.0|[-25141.095758901...|           [1.0,0.0]|    

In [19]:
evaluator=BinaryClassificationEvaluator();
accuracy = evaluator.evaluate(predictions)
print("Accuracy of Model :" , accuracy)
print("Test Error of Model :" , 1-accuracy)

Accuracy of Model : 0.9940880441686618
Test Error of Model : 0.005911955831338189
