In [1]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("sparkml").master("local").getOrCreate()

In [215]:
train = sparkSession.read.csv("titanic/train.csv", header=True, inferSchema=True).repartition(60)
test = sparkSession.read.csv("titanic/test.csv", header=True, inferSchema=True).repartition(60)

In [216]:
train.show(20)

+-----------+--------+------+--------------------+------+----+-----+-----+-----------------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|           Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-----------------+--------+-----+--------+
|        421|       0|     3|Gheorgheff, Mr. S...|  male|null|    0|    0|           349254|  7.8958| null|       C|
|        284|       1|     3|Dorking, Mr. Edwa...|  male|19.0|    0|    0|       A/5. 10482|    8.05| null|       S|
|        558|       0|     1| Robbins, Mr. Victor|  male|null|    0|    0|         PC 17757| 227.525| null|       C|
|        407|       0|     3|Widegren, Mr. Car...|  male|51.0|    0|    0|           347064|    7.75| null|       S|
|        327|       0|     3|Nysveen, Mr. Joha...|  male|61.0|    0|    0|           345364|  6.2375| null|       S|
|        183|       0|     3|Asplund, Master. ...|  male| 9.0|  

In [217]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [218]:
test.show(5)

+-----------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|            Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
|       1201|     3|Hansen, Mrs. Clau...|female|45.0|    1|    0|            350026|14.1083| null|       S|
|       1301|     3|Peacock, Miss. Tr...|female| 3.0|    1|    1|SOTON/O.Q. 3101315| 13.775| null|       S|
|       1155|     3|Klasen, Miss. Ger...|female| 1.0|    1|    1|            350405|12.1833| null|       S|
|        937|     3|Peltomaki, Mr. Ni...|  male|25.0|    0|    0| STON/O 2. 3101291|  7.925| null|       S|
|       1237|     3|Abelseth, Miss. K...|female|16.0|    0|    0|            348125|   7.65| null|       S|
+-----------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
only showing top 5 rows



In [219]:
test.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [220]:
from pyspark.ml.feature import StringIndexer
cat_cols=['Sex','Embarked']
train.fillna( { 'Embarked':'S'} )
test.fillna( { 'Embarked':'S'} )

cat_cols_index={'Sex':'Sex_Index','Embarked':'Embarked_Index'}

cat_indexers = [ StringIndexer(inputCol=col, outputCol=cat_cols_index[col])
                 for col in cat_cols ]

In [221]:
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, avg
from pyspark.sql.functions import isnan, when, count


def fill_with_mean(df, include=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c in include
    ))
    return df.na.fill(stats.first().asDict())

train = fill_with_mean(train, ['Age'])
test =fill_with_mean(test, ['Age'])
train = train.filter(train.Embarked.isNotNull())
train.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train.columns]).show()
train.show(5)

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|  687|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+

+-----------+--------+------+--------------------+----+-----------------+-----+-----+----------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex|              Age|SibSp|Parch|    Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+-----------------+-----+-----+----------+-------+-----+--------+
|        421|       0|     3|Gheorgheff, Mr. S...|male|29.69911764705882|    0|    0|    349254| 7.8958| null|       C|
|        284|       1|     3|Dorking, Mr. Edwa...|male|             19.0|    0|    0|A/5. 10482|   8.05| null

In [222]:
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(inputCols=[ "Pclass", "Sex_Index", "Age", "Fare", "SibSp", "Parch", "Embarked_Index"], outputCol='features')

In [223]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

In [224]:
rf = RandomForestClassifier(labelCol='Survived',featuresCol='scaledfeatures', numTrees=100,maxDepth=7)

In [225]:
standardScaler = StandardScaler(inputCol="features", outputCol="scaledfeatures")

In [226]:
(trainingData, testData) = train.randomSplit([0.8, 0.2], seed = 42)

In [227]:
pipeline = Pipeline(stages=[ *cat_indexers, vector_assembler, standardScaler, rf ] )

In [228]:
model = pipeline.fit(trainingData)

In [229]:
model.stages[4].featureImportances

SparseVector(7, {0: 0.1302, 1: 0.4036, 2: 0.1556, 3: 0.1645, 4: 0.0639, 5: 0.0398, 6: 0.0424})

In [230]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [231]:
predictions = model.transform(testData)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="Survived")
accuracy = evaluator.evaluate(predictions)
accuracy

0.8313486005089058

In [232]:
test = test.filter(test.Fare.isNotNull())

pred = model.transform(test)
pred = pred.select(['PassengerId', 'Pclass', "Name", "Sex", "Age", "Ticket","parch", "SibSp", "Fare", "Cabin", "Embarked", "prediction"])



In [236]:
#pred.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in pred.columns]).show()
pred.write.csv("Output_Prediction_test_Spark")

In [264]:
log = LogisticRegression(maxIter=100, regParam=0.003, elasticNetParam=0, labelCol='Survived',featuresCol='scaledfeatures')

In [265]:
pipeline2 = Pipeline(stages=[ *cat_indexers, vector_assembler, standardScaler, log ] )

In [266]:
model2 = pipeline2.fit(trainingData)

In [267]:
predictions2 = model2.transform(testData)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="Survived")
accuracy = evaluator.evaluate(predictions2)
accuracy

0.7846310432569974

In [269]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [291]:
MLP =  MultilayerPerceptronClassifier(maxIter=1000, layers=[7, 7, 5, 3, 2], seed=1234, labelCol='Survived',featuresCol='scaledfeatures')

In [292]:
pipeline3 = Pipeline(stages=[ *cat_indexers, vector_assembler, standardScaler, MLP ] )

In [293]:
model3 = pipeline3.fit(trainingData)

In [294]:
predictions3 = model3.transform(testData)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="Survived")
accuracy = evaluator.evaluate(predictions3)
accuracy

0.7769974554707378