In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col,isnull, mean, udf
from pyspark.context import SparkContext as sc

In [2]:
spark=SparkSession.builder.appName('titanic').getOrCreate()

In [3]:
data = spark.read.csv('train.csv',header=True,inferSchema=True)


In [4]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [6]:
data.count()

891

In [7]:
data.filter(data["Cabin"].isNull()).count()
#data.filter(isnull(col("Cabin"))).count()

687

In [8]:
{c :data.filter(isnull(col(c))).count() for c in data.columns}

{'PassengerId': 0,
 'Survived': 0,
 'Pclass': 0,
 'Name': 0,
 'Sex': 0,
 'Age': 177,
 'SibSp': 0,
 'Parch': 0,
 'Ticket': 0,
 'Fare': 0,
 'Cabin': 687,
 'Embarked': 2}

In [9]:
data = data.drop('Cabin')
data = data.drop('PassengerId')
data = data.drop('Ticket')

In [10]:
data = data.withColumn('Sex', when(data.Sex == 'male', 0).otherwise(1))

In [11]:
data.show()

+--------+------+--------------------+---+----+-----+-----+-------+--------+
|Survived|Pclass|                Name|Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+--------------------+---+----+-----+-----+-------+--------+
|       0|     3|Braund, Mr. Owen ...|  0|22.0|    1|    0|   7.25|       S|
|       1|     1|Cumings, Mrs. Joh...|  1|38.0|    1|    0|71.2833|       C|
|       1|     3|Heikkinen, Miss. ...|  1|26.0|    0|    0|  7.925|       S|
|       1|     1|Futrelle, Mrs. Ja...|  1|35.0|    1|    0|   53.1|       S|
|       0|     3|Allen, Mr. Willia...|  0|35.0|    0|    0|   8.05|       S|
|       0|     3|    Moran, Mr. James|  0|null|    0|    0| 8.4583|       Q|
|       0|     1|McCarthy, Mr. Tim...|  0|54.0|    0|    0|51.8625|       S|
|       0|     3|Palsson, Master. ...|  0| 2.0|    3|    1| 21.075|       S|
|       1|     3|Johnson, Mrs. Osc...|  1|27.0|    0|    2|11.1333|       S|
|       1|     2|Nasser, Mrs. Nich...|  1|14.0|    1|    0|30.0708|       C|

In [12]:
data.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [13]:
mean_age = int(data.agg({'Age': 'mean'}).first()[0])

In [14]:
data = data.fillna({'Embarked': 'S', 'Age' : mean_age })

In [15]:
data = data.withColumn('Embarked', when(data.Embarked == 'S', 0) .when(data.Embarked == 'C', 1) .when(data.Embarked == 'Q', 2))

In [16]:
data.show()

+--------+------+--------------------+---+----+-----+-----+-------+--------+
|Survived|Pclass|                Name|Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+--------------------+---+----+-----+-----+-------+--------+
|       0|     3|Braund, Mr. Owen ...|  0|22.0|    1|    0|   7.25|       0|
|       1|     1|Cumings, Mrs. Joh...|  1|38.0|    1|    0|71.2833|       1|
|       1|     3|Heikkinen, Miss. ...|  1|26.0|    0|    0|  7.925|       0|
|       1|     1|Futrelle, Mrs. Ja...|  1|35.0|    1|    0|   53.1|       0|
|       0|     3|Allen, Mr. Willia...|  0|35.0|    0|    0|   8.05|       0|
|       0|     3|    Moran, Mr. James|  0|29.0|    0|    0| 8.4583|       2|
|       0|     1|McCarthy, Mr. Tim...|  0|54.0|    0|    0|51.8625|       0|
|       0|     3|Palsson, Master. ...|  0| 2.0|    3|    1| 21.075|       0|
|       1|     3|Johnson, Mrs. Osc...|  1|27.0|    0|    2|11.1333|       0|
|       1|     2|Nasser, Mrs. Nich...|  1|14.0|    1|    0|30.0708|       1|

In [17]:
def pronoun(name):
    
    if('Miss.' in name):
        return 1
    if('Ms.' in name):
        return 2
    if('Mrs.' in name):
        return 3
    if('Mr.' in name):
        return 3
    if('Rev.' in name):
        return 4
    if('Dr.' in name):
        return 5
    if('Capt.' in name):
        return 6
    if('Master.' in name):
        return 7
    if('Major.' in name):
        return 8
    if('Col.' in name):
        return 9
    else:
        return 0


In [18]:
from pyspark.sql.types import IntegerType

udfpronoun = udf(pronoun, IntegerType())

In [19]:
data = data.withColumn("Pronoun", udfpronoun("Name"))

In [20]:
data = data.drop('Name')

In [21]:
data = data.withColumn("Age", data["Age"].cast(IntegerType()))

In [22]:
data.show()

+--------+------+---+---+-----+-----+-------+--------+-------+
|Survived|Pclass|Sex|Age|SibSp|Parch|   Fare|Embarked|Pronoun|
+--------+------+---+---+-----+-----+-------+--------+-------+
|       0|     3|  0| 22|    1|    0|   7.25|       0|      3|
|       1|     1|  1| 38|    1|    0|71.2833|       1|      3|
|       1|     3|  1| 26|    0|    0|  7.925|       0|      1|
|       1|     1|  1| 35|    1|    0|   53.1|       0|      3|
|       0|     3|  0| 35|    0|    0|   8.05|       0|      3|
|       0|     3|  0| 29|    0|    0| 8.4583|       2|      3|
|       0|     1|  0| 54|    0|    0|51.8625|       0|      3|
|       0|     3|  0|  2|    3|    1| 21.075|       0|      7|
|       1|     3|  1| 27|    0|    2|11.1333|       0|      3|
|       1|     2|  1| 14|    1|    0|30.0708|       1|      3|
|       1|     3|  1|  4|    1|    1|   16.7|       0|      1|
|       1|     1|  1| 58|    0|    0|  26.55|       0|      1|
|       0|     3|  0| 20|    0|    0|   8.05|       0| 

In [23]:
data.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: integer (nullable = false)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: integer (nullable = true)
 |-- Pronoun: integer (nullable = true)



In [24]:
from pyspark.ml.feature import VectorAssembler

featureassembler = VectorAssembler(     
    inputCols=['Pclass', 'Sex','Age', 'SibSp', 'Parch', 'Fare', 'Embarked', 'Pronoun'],
    outputCol="Ind"
)

In [25]:
data.show()

+--------+------+---+---+-----+-----+-------+--------+-------+
|Survived|Pclass|Sex|Age|SibSp|Parch|   Fare|Embarked|Pronoun|
+--------+------+---+---+-----+-----+-------+--------+-------+
|       0|     3|  0| 22|    1|    0|   7.25|       0|      3|
|       1|     1|  1| 38|    1|    0|71.2833|       1|      3|
|       1|     3|  1| 26|    0|    0|  7.925|       0|      1|
|       1|     1|  1| 35|    1|    0|   53.1|       0|      3|
|       0|     3|  0| 35|    0|    0|   8.05|       0|      3|
|       0|     3|  0| 29|    0|    0| 8.4583|       2|      3|
|       0|     1|  0| 54|    0|    0|51.8625|       0|      3|
|       0|     3|  0|  2|    3|    1| 21.075|       0|      7|
|       1|     3|  1| 27|    0|    2|11.1333|       0|      3|
|       1|     2|  1| 14|    1|    0|30.0708|       1|      3|
|       1|     3|  1|  4|    1|    1|   16.7|       0|      1|
|       1|     1|  1| 58|    0|    0|  26.55|       0|      1|
|       0|     3|  0| 20|    0|    0|   8.05|       0| 

In [26]:
output = featureassembler.transform(data)

In [27]:
output = output.select("Ind", "Survived")

In [28]:
train_data , test_data = output.randomSplit([0.75,0.25])

In [29]:
from pyspark.ml.classification import RandomForestClassifier

classific = RandomForestClassifier(featuresCol='Ind', labelCol='Survived')


In [30]:
classific = classific.fit(train_data)

In [31]:
pred_results = classific.evaluate(test_data)

In [32]:
pred_results.accuracy

0.8620689655172413

In [36]:
pred_results.areaUnderROC

0.9049829278702516