In [1]:
train_path=r'C:\Users\Zavli\Desktop\train.csv'

In [2]:
test_path=r'C:\Users\Zavli\Desktop\test.csv'

In [3]:
from pyspark import  SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier

In [4]:
train_rdd = sc.textFile(train_path)

In [5]:
test_rdd = sc.textFile(test_path)

In [6]:
train_rdd.take(5)

['PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
 '1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S',
 '2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C',
 '3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S',
 '4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S']

In [7]:
test_rdd.take(5)

['PassengerId,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
 '892,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q',
 '893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47,1,0,363272,7,,S',
 '894,2,"Myles, Mr. Thomas Francis",male,62,0,0,240276,9.6875,,Q',
 '895,3,"Wirz, Mr. Albert",male,27,0,0,315154,8.6625,,S']

In [8]:
def parseTrain(rdd):

    # ekstrak data header (row satu)
    header = rdd.first()
    # remove header
    body = rdd.filter(lambda r: r!=header)
    
    # fungsi untuk Parse RDD ke DataFrame
    def parseRow(row):
        # remove double quote, split teks row dengan comma
        row_list = row.replace('"','').split(",")
        # konversi python list ke tuple, yang kompetibel dengan struktur data pyspark 
        row_tuple = tuple(row_list)
        return row_tuple

    rdd_parsed = body.map(parseRow)

    colnames = header.split(",")
    colnames.insert(3,'FirstName')

    return rdd_parsed.toDF(colnames)

def parseTest(rdd):
    header = rdd.first()
    body = rdd.filter(lambda r: r!=header)

    def parseRow(row):
        row_list = row.replace('"','').split(",")
        row_tuple = tuple(row_list)
        return row_tuple

    rdd_parsed = body.map(parseRow)
    
    colnames = header.split(",")
    colnames.insert(2,'FirstName')

    return rdd_parsed.toDF(colnames)

train_df = parseTrain(train_rdd)
test_df = parseTest(test_rdd)

In [9]:
train_df.show(3)

+-----------+--------+------+---------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|FirstName|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+---------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|   Braund|     Mr. Owen Harris|  male| 22|    1|    0|       A/5 21171|   7.25|     |       S|
|          2|       1|     1|  Cumings| Mrs. John Bradle...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen|         Miss. Laina|female| 26|    0|    0|STON/O2. 3101282|  7.925|     |       S|
+-----------+--------+------+---------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows



In [10]:
## Add kolom survived ke data test 
train_df = train_df.withColumn('Mark',lit('train'))
test_df = (test_df.withColumn('Survived',lit(0))
                  .withColumn('Mark',lit('test')))
test_df = test_df[train_df.columns]
## Append data test ke data train 
df = train_df.unionAll(test_df)

In [11]:
print(df)

DataFrame[PassengerId: string, Survived: string, Pclass: string, FirstName: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, Mark: string]


In [12]:
test_df.show(7)

+-----------+--------+------+---------+--------------------+------+----+-----+-----+-------+-------+-----+--------+----+
|PassengerId|Survived|Pclass|FirstName|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|Mark|
+-----------+--------+------+---------+--------------------+------+----+-----+-----+-------+-------+-----+--------+----+
|        892|       0|     3|    Kelly|           Mr. James|  male|34.5|    0|    0| 330911| 7.8292|     |       Q|test|
|        893|       0|     3|   Wilkes| Mrs. James (Elle...|female|  47|    1|    0| 363272|      7|     |       S|test|
|        894|       0|     2|    Myles|  Mr. Thomas Francis|  male|  62|    0|    0| 240276| 9.6875|     |       Q|test|
|        895|       0|     3|     Wirz|          Mr. Albert|  male|  27|    0|    0| 315154| 8.6625|     |       S|test|
|        896|       0|     3| Hirvonen| Mrs. Alexander (...|female|  22|    1|    1|3101298|12.2875|     |       S|test|
|        897|       0|     3| Sv

In [13]:
## Data Cleaning/Manipulation
## konversi tipe data Age, SibSp, Parch, Fare ke double
df = (df.withColumn('Age',df['Age'].cast("double"))
			.withColumn('SibSp',df['SibSp'].cast("double"))
			.withColumn('Parch',df['Parch'].cast("double"))
			.withColumn('Fare',df['Fare'].cast("double"))
			.withColumn('Survived',df['Survived'].cast("double"))
			)

df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: double (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Mark: string (nullable = false)



In [14]:
## Impute missing Age dan Fare
numVars = ['Survived','Age','SibSp','Parch','Fare']
def missingAge(df):
 
    def countNull(df, variable):
        return df.where(df[variable].isNull()).count()
 
    missingVAriable = {variable: countNull(df, variable) for variable in numVars}
    mean_age = df.groupBy().mean('Age').first()[0]
    mean_fare = df.groupBy().mean('Fare').first()[0]
    df = df.na.fill({'Age': mean_age, 'Fare': mean_fare})
    return df
 
df = missingAge(df)

In [15]:
## fungsi untuk mengekstrak title
def extract_title(df):
    getTitle = udf(lambda name: name.split('.')[0].strip(), StringType())
    df = df.withColumn('Title', getTitle(df['Name']))
    return df
 
df = extract_title(df)

In [16]:
def change_category_to_int(df):
    category = ['Pclass','Sex','Embarked']
    si = StringIndexer(inputCol='Sex', outputCol='Sex_indexed')
    df_indexed = si.fit(df).transform(df).drop('Sex').withColumnRenamed('Sex_indexed', 'Sex')
    # menggunakan pipeline untuk mengindex semua kategori variabel
    def indexer(df, col):
        si = StringIndexer(inputCol=col, outputCol=col + '_indexed').fit(df)
        return si
    indexers = [indexer(df, col) for col in category]
    pipeline = Pipeline(stages=indexers)
    return pipeline.fit(df).transform(df)
 
df = change_category_to_int(df) 

In [17]:
def create_vectors(df):
    category = ['Pclass','Sex','Embarked']
    category_index = [i+'_indexed' for i in category]
    featuresCol = numVars + category_index
    featuresCol.remove('Survived')
    labelCol = ['Mark', 'Survived']
    row = Row('mark', 'label', 'features')
    df_indexed = df[labelCol + featuresCol]
    # 0-mark, 1-label, 2-features
    # map features ke DenseVector
    lf = df_indexed.rdd.map(lambda r: (row(r[0], r[1], Vectors.dense(r[2:]), VectorUDT()))).toDF()
    # index label
    # mengonversi label numerik ke ketegori yang dibutuhkan untuk dt dan rf
    lf = StringIndexer(inputCol='label', outputCol='index').fit(lf).transform(lf)
    return lf

In [18]:
lf = create_vectors(df)
validate = 0
def train_model(lf):
    train = lf.where(lf.mark == 'train')
    test = lf.where(lf.mark == 'test')
    global validate
    # random split yang untuk mendapakan train/validate
    train, validate = train.randomSplit([0.7, 0.3], seed=121)
    print ('Train Data: ' + str(train.count()))
    print ('Validate Data: ' + str(validate.count()))
    print ('Test Data: ' + str(test.count()))
    return train, test

In [19]:
lf.show(5)

+-----+-----+--------------------+-----+
| mark|label|            features|index|
+-----+-----+--------------------+-----+
|train|  0.0|[22.0,1.0,0.0,7.2...|  0.0|
|train|  1.0|[38.0,1.0,0.0,71....|  1.0|
|train|  1.0|[26.0,0.0,0.0,7.9...|  1.0|
|train|  1.0|[35.0,1.0,0.0,53....|  1.0|
|train|  0.0|[35.0,0.0,0.0,8.0...|  0.0|
+-----+-----+--------------------+-----+
only showing top 5 rows



In [20]:
train = train_model(lf)[0]
test = train_model(lf)[1]
# Logsitic Regression
# regPara: regualrization parameter
lr = LogisticRegression(maxIter=100, regParam=0.05, labelCol='index').fit(train)

Train Data: 636
Validate Data: 255
Test Data: 418
Train Data: 636
Validate Data: 255
Test Data: 418


In [21]:
# Menggunakan model evaluasi auc ROC(default buat binary classification)
def test_model_regression(model, validate = validate):
    pred = model.transform(validate)
    evaluator = BinaryClassificationEvaluator(labelCol='index')
    return evaluator.evaluate(pred)
 
print ('AUC ROC: ' + str(test_model_regression(lr)))

AUC ROC: 0.8325267447784005


In [22]:
# DT dan RF
def dtRf():
    dt = DecisionTreeClassifier(maxDepth=3, labelCol='index').fit(train)
    rf = RandomForestClassifier(numTrees=100, labelCol='index').fit(train)
 
    models = {'LogisticRegression': lr,
              'DecistionTree': dt,
              'RandomForest': rf}
 
    modelPerf = {k: test_model_regression(v) for k, v in models.items()}
 
    print (modelPerf)
 
dtRf()

{'DecistionTree': 0.7700267447784003, 'LogisticRegression': 0.8325267447784005, 'RandomForest': 0.8546867040244526}
