In [1]:
import pyspark
import findspark
import time
import os.path
from itertools import chain
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import MinMaxScaler,StringIndexer,IndexToString, VectorIndexer, VectorAssembler, OneHotEncoder, SQLTransformer
from pyspark.ml.linalg import DenseVector
from pyspark.sql.types import StructType,StringType,StructField,IntegerType,DoubleType
from pyspark.sql.utils import AnalysisException
from pyspark.ml.linalg import Vectors
from pyspark.sql import SQLContext, Row
from pyspark.sql import functions as F
from pyspark.sql.functions import concat,translate,lit,col,isnan,count,when,split,explode,ltrim,create_map

In [2]:
#initialise spark
findspark.init()
sc = pyspark.SparkContext(appName='Classifier')
sql = pyspark.SQLContext(sc)

In [3]:
def create_dataframes(directory,schema_train=None,schema_test=None):
    """
    Creates dataframes from directory
    Must be named 'train' or 'test'. 
    Returns only train if test N/A
    
    Inputs: String, schema defaults to false
    and will infer from input .csv else will apply
    specified schema/schemas
    
    Returns: Dataframes/Dataframe
    
    """
    inferSchema = True if schema_train==None else False
    if schema_test==None:
        schema_test = schema_train
    
    if os.path.exists(directory):
        train_path = directory+"/train.csv"
        if os.path.exists(train_path):
            df_train = sql.read.csv(train_path, 
                         header = True,
                         inferSchema = inferSchema,
                         schema=schema_train)
        else:
            raise ValueError("train.csv not found in %s" % directory)
        
        test_path = directory+"/test.csv"
        if os.path.exists(test_path):
            df_test = sql.read.csv(test_path, 
                         header = True,
                         inferSchema = inferSchema,
                         schema=schema_test)
            
            return df_train,df_test
        
        return df_train
        
    else:
        raise ValueError("%s does not exist" % directory)   
        
df_train,df_test=create_dataframes('./data')

In [4]:
def combine_train_test(df_train,df_test,label):
    """
    Combine train and test dataframes
    Creates dummy colum if label not in test
    
    inputs: 2 DataFrames
    
    returns: DataFrame
    
    """
    #Mark dataframes
    df_train = df_train.withColumn('Mark',lit('train'))
    df_test  = df_test.withColumn('Mark',lit('test'))
    
    def has_column(df, column):
        try:
            df[column]
            return True
        except AnalysisException:
            return False
    
    if has_column(df_test,label):
        if len(df_train.columns) == len(df_test.columns):
            #rearrange columns to avoid mis label when grouping together
            df_test = df_test.select(df_train.columns)
            return (df_train.union(df_test))
        else:
            raise ValueError("input dataframes of different shape")
    else:
        #add dummy label column to dataframe
        df_test = df_test.withColumn(label,lit(0))
        if len(df_train.columns) == len(df_test.columns):
            df_test = df_test.select(df_train.columns)
            return (df_train.union(df_test))
        else:
            raise ValueError("input dataframes of different shape")
            
df = combine_train_test(df_train,df_test,'Survived')        

In [5]:
#missing values by column
def get_missing(df):
    """
    Prints no. missing values for each column
    
    inputs: df - Spark DataFrame
    
    returns: None
    
    """
    
    for column in df.columns:
        missing = df.where(df[column].isNull()).count()
        print("Missing values for %s : %s" % (column,missing))

    return None

In [6]:
def remove_missing_columns(df,thresh=0.05,ignore=[]):
    """
    Removes column from dataframe if the column
    has higher number of null values than thresh 
    
    
    inputs: DataFrame, float - thresh (defaults to 0.05),
    ignore - Array (list of columns to be exempt)
    
    returns: DataFrame
    """
    
    x = df.cache()
    
    columns = filter(lambda x: x not in ignore,x.columns)
    
    for column in columns:
        missing = df.where(df[column].isNull()).count()
        if missing != 0:
            if (missing/x.count()) > thresh:
                x=x.drop(column)
    
    return x
 
df = remove_missing_columns(df,thresh=0.50,ignore=['Age','Fare'])

In [7]:
#fill missing values with the mean
def fill_null_with_mean(df):
    """
    Replaces null numeric values with
    mean value
    Replaces categorical string values
    with mode
    input: spark dataframe
    returns: spark dataframe
    
    """
    
    x = df.cache()
    
    for column in df.schema.fields:
        if df.where(df[column.name].isNull()).count() > 0:
            
            dtype = "%s" % column.dataType
            if dtype != "StringType":
                mean = df.groupBy().mean(column.name).first()[0]
                x = x.na.fill({column.name:mean})
            else:
                counts = df.groupBy(column.name).count()
                mode = counts.join(
                counts.agg(F.max("count").alias("max_")),
                col("count") == col("max_")
                ).limit(1).select(column.name)
                x = x.na.fill({column.name:mode.first()[0]})     
    return x

df = fill_null_with_mean(df)

The cleaning method above could be much improved to replace missing values than with the mean but for this notebook I wanted something quick

In [8]:
#remove spaces
spaceDeleteUDF = F.udf(lambda s: s.replace(" ", ""),StringType())
df=df.withColumn('Name',spaceDeleteUDF(df["Name"]))

In [9]:
#Title cleanse 
df = df.withColumn('Surname',F.trim(split('Name',',')[0]))
df = df.withColumn('name_split',F.trim(split('Name',',')[1]))
df = df.withColumn('Title',F.trim(split('name_split','\\.')[0]))
title_dictionary = {
    "Capt":       "Officer",
    "Col":        "Officer",
    "Major":      "Officer",
    "Jonkheer":   "Sir",
    "Don":        "Sir",
    "Sir" :       "Sir",
    "Dr":         "Mr",
    "Rev":        "Mr",
    "theCountess":"Lady",
    "Dona":       "Lady",
    "Mme":        "Mrs",
    "Mlle":       "Miss",
    "Ms":         "Mrs",
    "Mr" :        "Mr",
    "Mrs" :       "Mrs",
    "Miss" :      "Miss",
    "Master" :    "Master",
    "Lady" :      "Lady"
}

#x = df['Title'].map(Title_Dictionary)
mapping_expr = create_map([lit(x) for x in chain(*title_dictionary.items())])

df = df.withColumn("Title", mapping_expr.getItem(col("Title")))

In [10]:
# create binary column 'Mother'
df = df.withColumn('Mother',when((df['Sex'] =='female')&
                                (df['Age'] > 18)&
                                (df['Parch'] > 0)
                                 ,'True').otherwise('False'))

#create a family size column
df = df.withColumn('Family_size',(df['SibSp'] + df['Parch'] + 1))

# create a family id column
df = df.withColumn('Family',when((df['Family_size']>2),
                                    'Family').otherwise('No_Family'))

In [11]:
#drop columns 
df = df.drop('Ticket','Surname','Name','name_split')

In [12]:
def split_on_column_types(df):
    """
    Create array of numeric and string
    
    """
    
    categorical = []
    numeric = []
    
    for col in df.schema.fields:
        x = "%s" % col.dataType
        if x == "StringType":
            categorical.append(col.name)
        else:
            numeric.append(col.name)
            
            
    return categorical,numeric

#categorical,numeric = split_on_column_types(train)
#indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in categorical]
#encoders = [OneHotEncoder(inputCol=column+"_index",outputCol=column+"_vec") for column in categorical]

In [13]:
df = df.withColumnRenamed('Survived','label')

In [14]:
def build_pipeline(df,label):
    """
    Build pipeline to fit and transform data on
    
    Inputs: df - spark DataFrame, label - String relating label column
    
    Returns: pipeline and cross validation object
    
    """

    categorical = []
    numeric = []
    
    
    for column in df.schema.fields:
        if column.name != label and column.name !='Mark':
            cType = "%s" % column.dataType
            if cType == "StringType":
                categorical.append(column.name)
            else:
                numeric.append(column.name)
            
    indexers = [StringIndexer(inputCol=column,
                              outputCol=column+"_index")\
                              for column in categorical]
    labelIndexer = StringIndexer(inputCol=label,outputCol=label+"_index").fit(df)
    index_categorical = [column + "_index" for column in categorical]
    all_columns = index_categorical + numeric
    
    assembler = VectorAssembler(inputCols=all_columns,outputCol="features")
    scaler = MinMaxScaler(inputCol="features",outputCol="scaledFeatures")
    
    rf = RandomForestClassifier(labelCol=label+"_index",
                            featuresCol="scaledFeatures",
                            numTrees=10,
                            maxBins=200)
    
    #Used to convert predicted values back to their original format
    labelConverter = IndexToString(inputCol="prediction", 
                                   outputCol="predictedLabel",
                                   labels=labelIndexer.labels)
    
    
    #assembler is added to list with square brackets
    stages = indexers + [labelIndexer,assembler,scaler,rf,labelConverter]
    pipeline = Pipeline(stages = stages)
    
    paramGrid = ParamGridBuilder()\
                .addGrid(rf.maxBins,[25,50,75])\
                .addGrid(rf.maxDepth,[4,6,8])\
                .build()
                
    crossVal = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=BinaryClassificationEvaluator(),
                              numFolds=4)
    
    return pipeline,crossVal

pipeline,crossVal = build_pipeline(df,'label')

In [15]:
def split_into_train_test(df,train_sample_size=0.7):
    """
    Splits a dataframe into train and test.
    If dataframe contains no column 'Mark'
    it splits on default 0.7/0.3 random sampling
    
    inputs: df - Spark DataFrame, train_sample_size - float (0-1)
    
    returns: train - Spark DataFrame, test - Spark DataFrame
    """
    
    def has_column(df, col):
        try:
            df[col]
            return True
        except AnalysisException:
            return False
    
    if has_column(df,'Mark'):
        train = df.where(df['Mark']=='train')
        train = train.drop('Mark')
        test  = df.where(df['Mark']=='test')
        test = test.drop('Mark')
    
    else:
        if train_sample_size > 1 or train_sample_size < 0:
            raise ValueError("train_sample_size out of bounds")
        test_sample_size = 1 - train_sample_size
        (train,test) = df.randomSplit([train_sample_size,
                                       test_sample_size])
        
    return train,test
        
train,test = split_into_train_test(df)

In [16]:
#model = pipeline.fit(train)
#pred = model.transform(test)
cvModel = crossVal.fit(train)

In [17]:
pred = cvModel.transform(test)

In [18]:
predictions = pred.withColumn("Survived", pred["predictedLabel"]).select("PassengerId", "Survived")
predictions.coalesce(1).write.format('com.databricks.spark.csv') \
.mode('overwrite').option("header", "true").save('./data/prediction.csv')

In [19]:
# #scale numeric columns
# from pyspark.ml.feature import StandardScaler
# scalers = [StandardScaler(inputCol=column, outputCol=column+"_index"
#                          ,withStd=False,withMean=False
#                          ).fit(df) for column in numeric]


In [20]:
sc.stop()