## Titanic Feature Engineering

In [1]:
%%datalabframework getfilename

In [2]:
import datalabframework as dlf
logger = dlf.log.initLogger(__name__, kafka_topic="datalab", kafka_servers="kafka:9092")

2017-04-30 14:27:34,743 - jovyan - __main__ - INFO - init - {'project': {'rootpath': '/home/jovyan/work/notebooks', 'main': 'main.ipynb'}, 'notebook': {'filepath': '/home/jovyan/work/notebooks/features', 'filename': 'regression.ipynb'}, 'datalab': {'framework': '0.1'}}


In [3]:
# EXPORT

from pyspark.ml.feature import RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import coalesce

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [4]:
# EXPORT 

def regressors_na(df, colnames):
    na = False
    for colname in colnames:
        null_count = df.where(df[colname].isNull()).count()
        na = na or (null_count>0)
    return na

In [5]:
# EXPORT

def inpute_simple(df, colname):
    # which value is the most probable?
    d = df.groupBy(colname).count().toPandas()
    fill_value = d.loc[d['count'].argmax(),colname]
    
    #fill the na
    df = df.fillna(fill_value, 'Embarked')
    
    return df

In [6]:
# EXPORT

def feature_vector(df, idcol, colname, regressors):
    formula = RFormula(formula= colname + ' ~ ' + '+'.join(regressors), 
                   labelCol='label', featuresCol='features')
    
    # to dense feature vector
    df_features = formula.fit(df).transform(df).select(idcol,'features', 'label')
    
    return df_features

In [7]:
# EXPORT

def impute_lr(df, idcol, colname, regressors):
    assert not regressors_na(df, regressors)
    
    # to vector
    df_features = feature_vector(df, idcol, colname, regressors)  
    
    # create lr estimator
    lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

    # Fit the model
    lrModel = lr.fit(df_features.where(df_features.label.isNotNull()))
    
    # Print the coefficients and intercept for linear regression
    #print("Coefficients: %s" % str(lrModel.coefficients))
    #print("Intercept: %s" % str(lrModel.intercept))

    # Summarize the model over the training set and print out some metrics
    #trainingSummary = lrModel.summary
    #print("numIterations: %d" % trainingSummary.totalIterations)
    #print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
    #trainingSummary.residuals.show()
    #print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    #print("r2: %f" % trainingSummary.r2)
    
    # impute dependent variable
    df_impute = lrModel.transform(df_features)
    
    # join prediction with original dataframe
    df = df.join(df_impute.select('PassengerId','prediction'), 'PassengerId', "leftouter") 
    
    # coalesce null using imputation
    df =  df.withColumn(colname,coalesce(df[colname],df.prediction)).drop('prediction')
    
    return df

In [8]:
# EXPORT

## created user defined function to extract title
def feature_title(df):
    getTitle = udf(lambda name: name.split(',')[1].split('.')[0].strip(),StringType())
    df = df.withColumn('Title', getTitle(df['Name']))
    df = df.drop('Name')
    return df

In [9]:
DATA_ROOT = '/home/jovyan/work/data'
d = {   
    'input_sample' : 1.0,
    'input_source' : DATA_ROOT + '/titanic/data/set/clean/train.parquet',
    
    'output_sample' : 1.0,
    'output_source' : DATA_ROOT + '/titanic/data/set/train/features.parquet'
}
p = dlf.params.config_fromdict(d)

In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(dlf.project.filename()).getOrCreate()
spark.version

'2.1.0'

In [11]:
df = spark.read.parquet(p.input_source)
df.printSchema()
df.show(10, truncate=True)

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)



+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|  7.925|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|   53.1|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|   8.05|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0| 8.4583|       Q|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|51.8625|       S|
|          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 21.07

In [12]:
for colname in df.columns:
    null_count = df.where(df[colname].isNull()).count()
    print('{} {}'.format(colname, null_count))

PassengerId 0
Survived 0


Pclass 0
Name 0
Sex 0


Age 177
SibSp 0
Parch 0


Fare 0
Embarked 2


In [13]:
df.groupBy('Embarked').count().toPandas()

Unnamed: 0,Embarked,count
0,Q,77
1,,2
2,C,168
3,S,644


In [14]:
df = inpute_simple(df, 'Embarked')
df.groupBy('Embarked').count().toPandas()

Unnamed: 0,Embarked,count
0,Q,77
1,C,168
2,S,646


In [15]:
df = impute_lr(df, 'PassengerId', 'Age', ['Sex', 'Pclass', 'SibSp', 'Parch', 'Embarked', 'Fare']) 
df.show(10)

+-----------+--------+------+--------------------+------+------------------+-----+-----+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|   Fare|Embarked|
+-----------+--------+------+--------------------+------+------------------+-----+-----+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|   7.25|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|71.2833|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|  7.925|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|   53.1|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|              35.0|    0|    0|   8.05|       S|
|          6|       0|     3|    Moran, Mr. James|  male|28.467886947074096|    0|    0| 8.4583|       Q|
|          7|       0|     1|McCarthy, Mr. Tim

#### No NA beyond this point :)

In [16]:
assert not regressors_na(df, df.columns)

In [17]:
df = feature_title(df)

# list occurences of the title
df.groupBy('Title').count().toPandas()

Unnamed: 0,Title,count
0,Don,1
1,Miss,182
2,Col,2
3,Rev,6
4,Lady,1
5,Master,40
6,Mme,1
7,Capt,1
8,Mr,517
9,Dr,7


In [18]:
df.show()

+-----------+--------+------+------+------------------+-----+-----+-------+--------+------+
|PassengerId|Survived|Pclass|   Sex|               Age|SibSp|Parch|   Fare|Embarked| Title|
+-----------+--------+------+------+------------------+-----+-----+-------+--------+------+
|          1|       0|     3|  male|              22.0|    1|    0|   7.25|       S|    Mr|
|          2|       1|     1|female|              38.0|    1|    0|71.2833|       C|   Mrs|
|          3|       1|     3|female|              26.0|    0|    0|  7.925|       S|  Miss|
|          4|       1|     1|female|              35.0|    1|    0|   53.1|       S|   Mrs|
|          5|       0|     3|  male|              35.0|    0|    0|   8.05|       S|    Mr|
|          6|       0|     3|  male|28.467886947074096|    0|    0| 8.4583|       Q|    Mr|
|          7|       0|     1|  male|              54.0|    0|    0|51.8625|       S|    Mr|
|          8|       0|     3|  male|               2.0|    3|    1| 21.075|     

#### Save as parquet

In [19]:
df.write.parquet(p.output_source, mode='overwrite')