In [2]:
import findspark
findspark.init()

import pyspark 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("titanicClassification").getOrCreate()

In [49]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [50]:
df = spark.read.csv('d:\\titanic.csv',inferSchema=True,header=True)
df.limit(6).toPandas()

Unnamed: 0,Survived,Pclass,Name,Sex,Age,Siblings_Spouses Aboard,Parents/Children Aboard,Fare
0,0,3,Mr. Owen Harris Braund,male,22.0,1,0,7.25
1,1,1,Mrs. John Bradley (Florence Briggs Thayer) Cum...,female,38.0,1,0,71.2833
2,1,3,Miss. Laina Heikkinen,female,26.0,0,0,7.925
3,1,1,Mrs. Jacques Heath (Lily May Peel) Futrelle,female,35.0,1,0,53.1
4,0,3,Mr. William Henry Allen,male,35.0,0,0,8.05
5,0,3,Mr. James Moran,male,27.0,0,0,8.4583


In [51]:
df.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Siblings_Spouses Aboard: integer (nullable = true)
 |-- Parents/Children Aboard: integer (nullable = true)
 |-- Fare: double (nullable = true)



In [52]:
df.groupBy("Survived",'Age').count().show()

+--------+----+-----+
|Survived| Age|count|
+--------+----+-----+
|       0| 9.0|    6|
|       1|30.0|   10|
|       1|80.0|    1|
|       1|18.0|   13|
|       1| 6.0|    2|
|       0|66.0|    2|
|       0|49.0|    4|
|       1|40.0|    7|
|       0|22.0|   24|
|       0|32.0|   11|
|       1| 7.0|    2|
|       0|16.0|   13|
|       0| 2.0|    7|
|       0|11.0|    3|
|       0|41.0|    6|
|       0|15.0|    1|
|       0|55.0|    2|
|       1|62.0|    2|
|       0|51.0|    5|
|       1|14.0|    3|
+--------+----+-----+
only showing top 20 rows



In [53]:
input_columns=df.columns
input_columns=input_columns[4:6]
dependent_var='Survived'
print(input_columns)
print(dependent_var)

['Age', 'Siblings_Spouses Aboard']
Survived


In [54]:
renamed=df.withColumn("label_str",df[dependent_var].cast('string'))
indexer=StringIndexer(inputCol='label_str',outputCol="label")
indexed=indexer.fit(renamed).transform(renamed)
indexed.show()

+--------+------+--------------------+------+----+-----------------------+-----------------------+-------+---------+-----+
|Survived|Pclass|                Name|   Sex| Age|Siblings_Spouses Aboard|Parents/Children Aboard|   Fare|label_str|label|
+--------+------+--------------------+------+----+-----------------------+-----------------------+-------+---------+-----+
|       0|     3|Mr. Owen Harris B...|  male|22.0|                      1|                      0|   7.25|        0|  0.0|
|       1|     1|Mrs. John Bradley...|female|38.0|                      1|                      0|71.2833|        1|  1.0|
|       1|     3|Miss. Laina Heikk...|female|26.0|                      0|                      0|  7.925|        1|  1.0|
|       1|     1|Mrs. Jacques Heat...|female|35.0|                      1|                      0|   53.1|        1|  1.0|
|       0|     3|Mr. William Henry...|  male|35.0|                      0|                      0|   8.05|        0|  0.0|
|       0|     3

In [55]:
indexed.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Siblings_Spouses Aboard: integer (nullable = true)
 |-- Parents/Children Aboard: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- label_str: string (nullable = true)
 |-- label: double (nullable = false)



In [56]:
numeric_inputs=[]
string_inputs=[]
for column in input_columns:
    if str(indexed.schema[column].dataType)=='StringType':
        indexer=StringIndexer(inputCol=column,outputCol=column+"_num")
        indexed=indexer.fit(indexed).transform(indexed)
        new_col_name=column+"_num"
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)
print('numeric_inputs' , numeric_inputs)
print('String_inputs' , string_inputs)

numeric_inputs ['Age', 'Siblings_Spouses Aboard']
String_inputs []


In [57]:
# Treat for skewness
d = {}

for col in numeric_inputs: 
    d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) 

for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect() 
    skew = skew[0][0]
   
    if skew > 1:
        indexed = indexed.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1:
        indexed = indexed.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")
        
print(skew)        

Siblings_Spouses Aboard has been treated for positive (right) skewness. (skew =) 3.6805221729276023 )
3.6805221729276023


In [58]:
minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) 

min_array = minimums.select(array(numeric_inputs).alias("mins")) 

df_minimum = min_array.select(array_min(min_array.mins)).collect() 

df_minimum = df_minimum[0][0] 


if df_minimum < 0:
    print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
else:
    print("No negative values were found in your dataframe.")

No negative values were found in your dataframe.


In [59]:
features_list = numeric_inputs + string_inputs
assembler = VectorAssembler(inputCols=features_list,outputCol='features')

output = assembler.transform(indexed).select('features','label')
output.show(5,False)

+-------------------------+-----+
|features                 |label|
+-------------------------+-----+
|[22.0,0.6931471805599453]|0.0  |
|[38.0,0.6931471805599453]|1.0  |
|[26.0,0.0]               |1.0  |
|[35.0,0.6931471805599453]|1.0  |
|[35.0,0.0]               |0.0  |
+-------------------------+-----+
only showing top 5 rows



In [60]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))


scalerModel = scaler.fit(output)


scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')

final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show()

Features scaled to range: [0.000000, 1000.000000]
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[271.173661724051...|
|  1.0|[472.229203317416...|
|  1.0|[321.437547122392...|
|  1.0|[434.531289268660...|
|  0.0|[434.531289268660...|
|  0.0|[334.003518471977...|
|  0.0|[673.284744910781...|
|  0.0|[19.8542347323448...|
|  1.0|[334.003518471977...|
|  1.0|[170.645890927368...|
|  1.0|[44.9861774315154...|
|  1.0|[723.548630309122...|
|  0.0|[246.041719024880...|
|  0.0|[484.795174667001...|
|  0.0|[170.645890927368...|
|  1.0|[685.850716260367...|
|  0.0|[19.8542347323448...|
|  1.0|[283.739633073636...|
|  0.0|[384.267403870319...|
|  1.0|[271.173661724051...|
+-----+--------------------+
only showing top 20 rows



In [61]:
train,test = final_data.randomSplit([0.70,0.30])

In [62]:
naiveclassifier=NaiveBayes()
navieModel = naiveclassifier.fit(train)


In [63]:
predictions = navieModel.transform(test)

In [64]:
predictions.printSchema()
predictions.select('label','rawPrediction','probability','prediction').show()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+-----+--------------------+--------------------+----------+
|label|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|  0.0|[-1025.5920404004...|[1.32476714676256...|       1.0|
|  0.0|[-1141.4832440677...|[8.08304234209895...|       1.0|
|  0.0|[-887.32460570308...|[3.21821767156518...|       1.0|
|  0.0|[-1029.1639756994...|[2.23565666338175...|       1.0|
|  0.0|[-1047.0236521941...|[3.06008086683512...|       1.0|
|  0.0|[-908.75621749675...|[7.43376399855233...|       1.0|
|  0.0|[-1054.1675227920...|[8.71493237510997...|       1.0|
|  0.0|[-34.713436485702...|[0.99576661003354...|       0.0|
|  0.0|[-1061.3113933899...|[2.48196206596543...|       1.0|
|  0.0|[-1177.2025970572...|[1.51436458246399...|    

In [65]:
evaluator=BinaryClassificationEvaluator();
accuracy = evaluator.evaluate(predictions)
print("Accuracy of Model :" , accuracy)
print("Test Error of Model :" , 1-accuracy)

Accuracy of Model : 0.4658256880733945
Test Error of Model : 0.5341743119266056
