In [1]:
import datalabframework as dlf

In [2]:
dlf.project.rootpath()

'/home/natbusa/Projects/dsp-titanic/src'

#### Init Spark

In [3]:
engine = dlf.engines.get('spark')
spark = engine.context()

In [4]:
#print out name and version
'{}:{}'.format(engine.info['context'], spark.sparkSession.version)

'spark:2.3.1'

In [5]:
#EXPORT 
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.sql.functions import col, lit

from pyspark.sql.window import *
from pyspark.sql.functions import count, row_number, desc
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
import numpy as np

#create a lookup table from spark dataframe
def collect_lut(df, discreteCols=[]):
    lut = pd.DataFrame(columns=['name', 'value', 'count'])
    for colname in discreteCols:
        occurrences = df.groupBy(col(colname).alias('value')).agg(count(colname).alias('count')).sort('count', ascending=False)
        rows = [row.asDict() for row in occurrences.collect()]
        pdf = pd.DataFrame(rows, columns=['name','value', 'count'], dtype=object)
        pdf['name'] = colname
        pdf['count'].apply(np.int32)
        pdf['value'] = pdf['value'].astype(str)
        lut = lut.append(pdf)
    d = spark.createDataFrame(lut)
    d = d.withColumn("index", row_number().over(Window.partitionBy("name").orderBy(desc("count"))) - 1 )
    return d

# # encoding columns
# from pyspark import SparkContext
# sc = SparkContext._active_spark_context

# #reproducable labeling with default for unknown values
# lutdf = lut.toPandas().set_index(['name','value'])[['index']]
# b_lutdf = sc.broadcast(lutdf)

# # Use pandas_udf to define a Pandas UDF
# @pandas_udf('integer', PandasUDFType.SCALAR)
# def encode_colum(v, c):
#     d = b_lutdf.value.loc[c[0]]['index'].to_dict()
#     r = v.apply(lambda x: d.get(str(x),0))
#     return r

# # cols = ['Sex','SibSp', 'Embarked']
# # for c in cols:
# #     df = df.withColumn(c+'_L', encode_colum(col(c), lit(c)))
# # df.show()

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, DoubleType

class LookupIndexer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, lookupTable=None):
        self.lookupTable = Param(self, "lookupTable", "")
        self._setDefault(stopwords=set())
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, lookupTable=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setLookupTable(self, value):
        self._paramMap[self.lookupTable] = value
        return self

    def getLookupTable(self):
        return self.getOrDefault(self.lookupTable)

    def _transform(self, dataset):
        lookupTable = self.getlookupTable()

        #pandas utf goes here
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, lit(42))

def lookup_indexer(features):
    for c in features:
        yield LookupIndexer(inputCol=c, outputCol=c+'_I')    

def string_indexer(features):
    for c in features:
        yield StringIndexer(inputCol=c, outputCol=c+'_I')

def onehot(features):
    #todo: reproducable mapping, 
    #      here the mapping depends on the data provided
    for c in features:
        yield OneHotEncoder(inputCol=c, outputCol=c+'_C')

def featurize(df, idCol=None, labelCol=None, numericContinuosCols=[], numericDiscreteCols=[], stringCols=[], lut=None):

    #if lut available apply it first on the discrete columns
    if lut:
        # encoding columns
        from pyspark import SparkContext
        sc = SparkContext._active_spark_context

        #reproducable labeling with default for unknown values
        lutdf = lut.toPandas().set_index(['name','value'])[['index']]
        b_lutdf = sc.broadcast(lutdf)

        # Use pandas_udf to define a Pandas UDF
        @pandas_udf('integer', PandasUDFType.SCALAR)
        def encode_colum(v, c):
            d = b_lutdf.value.loc[c[0]]['index'].to_dict()
            r = v.apply(lambda x: d.get(str(x),0))
            return r

        for c in numericDiscreteCols+stringCols:
            df = df.withColumn(c+'_I', encode_colum(col(c), lit(c)))

        reg_all_discrete_cols = [c+'_I' for c in numericDiscreteCols+stringCols]
        stages = []
    else:
        reg_all_discrete_cols = [c+'_I' for c in numericDiscreteCols+stringCols]
        stages = list(lookup_indexer(numericDiscreteCols+stringCols))
    
    reg_all_cols = numericContinuosCols + [c+'_C' for c in reg_all_discrete_cols]

    oh = list(onehot(reg_all_discrete_cols))
    ar = [VectorAssembler(inputCols=reg_all_cols, outputCol="features")]
    
    stages += oh+ar

    #set the pipeline
    pipeline = Pipeline(stages=stages)

    #fit
    model = pipeline.fit(df)

    #select columns
    columns = []
    columns += [col(idCol).alias('id')] if idCol else []
    columns += [col(labelCol).alias('label')] if labelCol else []
    columns += ['features']

    #transform
    features_df = model.transform(df).select(*columns)

    return features_df

### Train set

In [7]:
df = engine.read('.etl.clean.train')
df.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|   Fare|Embarked|
+-----------+--------+------+--------------------+------+------------------+-----+-----+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|   7.25|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|71.2833|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|  7.925|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|   53.1|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|              35.0|    0|    0|   8.05|       S|
|          6|       0|     3|    Moran, Mr. James|  male|28.963899873494455|    0|    0| 8.4583|       Q|
|          7|       0|     1|McCarthy, Mr. Tim

In [8]:
lut = collect_lut(df, ['Pclass','SibSp','Parch']+['Sex', 'Embarked'])
lut.show()

+--------+-----+-----+-----+
|    name|value|count|index|
+--------+-----+-----+-----+
|  Pclass|    3|  491|    0|
|  Pclass|    1|  216|    1|
|  Pclass|    2|  184|    2|
|Embarked|    S|  646|    0|
|Embarked|    C|  168|    1|
|Embarked|    Q|   77|    2|
|   Parch|    0|  678|    0|
|   Parch|    1|  118|    1|
|   Parch|    2|   80|    2|
|   Parch|    3|    5|    3|
|   Parch|    5|    5|    4|
|   Parch|    4|    4|    5|
|   Parch|    6|    1|    6|
|   SibSp|    0|  608|    0|
|   SibSp|    1|  209|    1|
|   SibSp|    2|   28|    2|
|   SibSp|    4|   18|    3|
|   SibSp|    3|   16|    4|
|   SibSp|    8|    7|    5|
|   SibSp|    5|    5|    6|
+--------+-----+-----+-----+
only showing top 20 rows



In [9]:
df_features = featurize(df, 'PassengerId', 'Survived', ['Age', 'Fare'], ['Pclass','SibSp','Parch'], ['Sex', 'Embarked'], lut)
df_features.show(truncate=False)

+---+-----+----------------------------------------------------------------+
|id |label|features                                                        |
+---+-----+----------------------------------------------------------------+
|1  |0    |(19,[0,1,2,5,10,16,17],[22.0,7.25,1.0,1.0,1.0,1.0,1.0])         |
|2  |1    |(19,[0,1,3,5,10,18],[38.0,71.2833,1.0,1.0,1.0,1.0])             |
|3  |1    |(19,[0,1,2,4,10,17],[26.0,7.925,1.0,1.0,1.0,1.0])               |
|4  |1    |(19,[0,1,3,5,10,17],[35.0,53.1,1.0,1.0,1.0,1.0])                |
|5  |0    |(19,[0,1,2,4,10,16,17],[35.0,8.05,1.0,1.0,1.0,1.0,1.0])         |
|6  |0    |(19,[0,1,2,4,10,16],[28.963899873494455,8.4583,1.0,1.0,1.0,1.0])|
|7  |0    |(19,[0,1,3,4,10,16,17],[54.0,51.8625,1.0,1.0,1.0,1.0,1.0])      |
|8  |0    |(19,[0,1,2,8,11,16,17],[2.0,21.075,1.0,1.0,1.0,1.0,1.0])        |
|9  |1    |(19,[0,1,2,4,12,17],[27.0,11.1333,1.0,1.0,1.0,1.0])             |
|10 |1    |(19,[0,1,5,10,18],[14.0,30.0708,1.0,1.0,1.0])                   |

In [10]:
engine.write(df_features,'train', mode='overwrite')

In [11]:
df = engine.read('.etl.clean.test')
df.show()

+-----------+------+--------------------+------+----------------+-----+-----+-------+--------+
|PassengerId|Pclass|                Name|   Sex|             Age|SibSp|Parch|   Fare|Embarked|
+-----------+------+--------------------+------+----------------+-----+-----+-------+--------+
|        892|     3|    Kelly, Mr. James|  male|            34.5|    0|    0| 7.8292|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|            47.0|    1|    0|    7.0|       S|
|        894|     2|Myles, Mr. Thomas...|  male|            62.0|    0|    0| 9.6875|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|            27.0|    0|    0| 8.6625|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|            22.0|    1|    1|12.2875|       S|
|        897|     3|Svensson, Mr. Joh...|  male|            14.0|    0|    0|  9.225|       S|
|        898|     3|Connolly, Miss. Kate|female|            30.0|    0|    0| 7.6292|       Q|
|        899|     2|Caldwell, Mr. Alb...|  male|  

In [12]:
df_features = featurize(df, 'PassengerId', None, ['Age', 'Fare'], ['Pclass','SibSp','Parch'], ['Sex', 'Embarked'], lut)
df_features.show(truncate=False)

+---+---------------------------------------------------------------------+
|id |features                                                             |
+---+---------------------------------------------------------------------+
|892|(19,[0,1,2,4,10,16],[34.5,7.8292,1.0,1.0,1.0,1.0])                   |
|893|(19,[0,1,2,5,10,17],[47.0,7.0,1.0,1.0,1.0,1.0])                      |
|894|(19,[0,1,4,10,16],[62.0,9.6875,1.0,1.0,1.0])                         |
|895|(19,[0,1,2,4,10,16,17],[27.0,8.6625,1.0,1.0,1.0,1.0,1.0])            |
|896|(19,[0,1,2,5,11,17],[22.0,12.2875,1.0,1.0,1.0,1.0])                  |
|897|(19,[0,1,2,4,10,16,17],[14.0,9.225,1.0,1.0,1.0,1.0,1.0])             |
|898|(19,[0,1,2,4,10],[30.0,7.6292,1.0,1.0,1.0])                          |
|899|(19,[0,1,5,11,16,17],[26.0,29.0,1.0,1.0,1.0,1.0])                    |
|900|(19,[0,1,2,4,10,18],[18.0,7.2292,1.0,1.0,1.0,1.0])                   |
|901|(19,[0,1,2,6,10,16,17],[21.0,24.15,1.0,1.0,1.0,1.0,1.0])             |
|902|(19,[0,

In [13]:
engine.write(df_features,'test', mode='overwrite')