### Titanic survival by PySpark

#### データ読み込み確認

In [1]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [2]:
spark = SparkSession.builder.appName("Spark ML example on titanic data").getOrCreate()

In [3]:
path_and_file = './data_titanic/train.csv'
df = spark.read.csv(path_and_file,header='True',inferSchema='True')
df.take(1)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S')]

In [4]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
df.count()

891

In [6]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [7]:
df.registerTempTable('train_data')

In [8]:
spark.sql('select * from train_data').show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

欠損値、異常値の処理

In [9]:
from pyspark.sql.functions import col,mean

In [10]:
for column in df.columns:
    null_data = df.where(col(column).isNull())
    if (null_data.count()!=0):
        print(column,'欠損値件数',null_data.count())

Age 欠損値件数 177
Cabin 欠損値件数 687
Embarked 欠損値件数 2


cabinは保留  
ageは平均で補間  
embarkedは削除  

In [11]:
avg_age = df.select(mean('Age')).collect()[0][0]
avg_age

29.69911764705882

In [12]:
df = df.fillna({'age':avg_age,'Cabin':'Unknow'})

In [13]:
df 

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [14]:
df = df.dropna()

簡単な簡単な簡単な基礎集計

In [15]:
df.describe().toPandas().round(2)
# .show()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,889.0,889.0,889.0,889,889,889.0,889.0,889.0,889,889.0,889,889
1,mean,446.0,0.3824521934758155,2.3115860517435323,,,29.653446370674192,0.5241844769403825,0.3824521934758155,260763.9104704097,32.09668087739029,,
2,stddev,256.9981727771832,0.4862596883147733,0.8346997785705753,,,12.968366309252314,1.103704875596923,0.8067607445174785,472255.95121695305,49.69750431670795,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,Unknow,S


In [16]:
numeric_features = []
for i in df.dtypes:
    if i[1]=='int' or i[1]=='double':
        numeric_features.append(i[0])


In [17]:
df[numeric_features].show()

+-----------+--------+------+-----------------+-----+-----+-------+
|PassengerId|Survived|Pclass|              Age|SibSp|Parch|   Fare|
+-----------+--------+------+-----------------+-----+-----+-------+
|          1|       0|     3|             22.0|    1|    0|   7.25|
|          2|       1|     1|             38.0|    1|    0|71.2833|
|          3|       1|     3|             26.0|    0|    0|  7.925|
|          4|       1|     1|             35.0|    1|    0|   53.1|
|          5|       0|     3|             35.0|    0|    0|   8.05|
|          6|       0|     3|29.69911764705882|    0|    0| 8.4583|
|          7|       0|     1|             54.0|    0|    0|51.8625|
|          8|       0|     3|              2.0|    3|    1| 21.075|
|          9|       1|     3|             27.0|    0|    2|11.1333|
|         10|       1|     2|             14.0|    1|    0|30.0708|
|         11|       1|     3|              4.0|    1|    1|   16.7|
|         12|       1|     1|             58.0| 

In [None]:
plt = plt
for feature in numeric_features:
    plt.scatter(df['Survived'],df[feature])

In [None]:
df = df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")
