In [2]:
import datalabframework as dlf
dlf.project.rootpath()
dlf.project.test()

{'a': 1, 'b':2}



#### Init Spark

In [2]:
engine = dlf.engines.get('spark')
spark = engine.context()

In [3]:
#print out name and version
'{}:{}'.format(engine.info['context'], spark.sparkSession.version)

'spark:2.3.1'

In [20]:
#EXPORT

import pandas as pd
from pyspark.sql.functions import isnan, when, count, col, lit, countDistinct

from pyspark.context import SparkContext
from pyspark.sql import SQLContext

def describe_all(df):

    spark = SQLContext(SparkContext.getOrCreate())

    cols = ['summary'] + df.columns
    df_nan  = df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).withColumn('summary', lit('nan')).toPandas()
    df_null = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).withColumn('summary', lit('isnull')).toPandas()
    df_type = pd.DataFrame(data=[['type']+[str(c.dataType)[:-4] for c in df.schema]], columns=['summary']+df.columns)
    df_distinct =  df.select([countDistinct(c).alias(c) for c in df.columns]).withColumn('summary', lit('distinct')).toPandas()
    d = pd.concat([df_nan, df_null, df_type, df_distinct, df.describe().toPandas()], sort='false').reset_index().drop('index', axis=1).astype(str)
    return spark.createDataFrame(d[cols])


In [4]:
#EXPORT
import json

from pyspark.ml import Estimator, Model
from pyspark.ml.param.shared import *
from pyspark.sql.functions import col, count
from pyspark.sql.functions import pandas_udf, PandasUDFType

class HasLookupTable(Params):

    lookup_table  = Param(Params._dummy(), "lookup_table", "lookup_table")   # it's a dictionary (string -> integer)
    default_value = Param(Params._dummy(), "default_value", "default_value", typeConverter=TypeConverters.toInt) # it's an integer

    def __init__(self):
        super(HasLookupTable, self).__init__()

    def setLookupTable(self, value):
        return self._set(lookup_table=value)

    def getLookupTable(self):
        return self.getOrDefault(self.lookup_table)
    
    def setDefaultValue(self, value):
        return self._set(default_value=value)

    def getDefaultValue(self):
        return self.getOrDefault(self.default_value)

class LookupIndexer(Estimator, HasInputCol, HasPredictionCol):

    def __init__(self, inputCol=None, outputCol=None):
        super(LookupIndexer, self).__init__()
        self.setInputCol(inputCol)
        self.setPredictionCol(outputCol)

    def _fit(self, dataset):
        c = self.getInputCol()
        
        occurrences = dataset.groupBy(col(c)).agg(count(c).alias('count')).sort('count', ascending=False)
        values = [str(x[c]) for x in occurrences.select(c).collect()]

        lut = dict(zip(values,range(len(values))))
        dvalue = 0  #if not found use the most frequent class
        
        return (LookupIndexerModel()
            .setInputCol(c)
            .setLookupTable(lut)
            .setDefaultValue(dvalue)
            .setPredictionCol(self.getPredictionCol()))

class LookupIndexerModel(Model, HasInputCol, HasPredictionCol, HasLookupTable):

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        lut = self.getLookupTable()
        dvalue = self.getDefaultValue()

        # Use pandas_udf to define a Pandas UDF
        @pandas_udf('integer', PandasUDFType.SCALAR)
        def encode_colum(v):
            r = v.apply(lambda x: lut.get(str(x),dvalue))
            return r

        return dataset.withColumn(y, encode_colum(x))

In [5]:
#EXPORT 

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler

def featurize(numericContinuosCols=[], numericDiscreteCols=[], stringCols=[]):
    def lookup_stages(features):
        for c in features:
            yield LookupIndexer(inputCol=c, outputCol=c+'_I')    

    def onehot_stages(features):
        for c in features:
            yield OneHotEncoder(inputCol=c, outputCol=c+'_C')

    reg_all_discrete_cols = [c+'_I' for c in numericDiscreteCols+stringCols]
    reg_all_cols = numericContinuosCols + [c+'_C' for c in reg_all_discrete_cols]

    stages  = []
    stages += list(lookup_stages(numericDiscreteCols+stringCols))
    stages += list(onehot_stages(reg_all_discrete_cols))
    stages += [VectorAssembler(inputCols=reg_all_cols, outputCol="features")]
    
    return Pipeline(stages=stages)

### Train set

In [6]:
df = engine.read('.etl.clean.train')
df.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|   Fare|Embarked|
+-----------+--------+------+--------------------+------+------------------+-----+-----+-------+--------+
|        246|       0|     1|Minahan, Dr. Will...|  male|              44.0|    2|    0|   90.0|       Q|
|        346|       1|     2|"Brown, Miss. Ame...|female|              24.0|    0|    0|   13.0|       S|
|        360|       1|     3|"Mockler, Miss. H...|female| 26.24842701587761|    0|    0| 7.8792|       Q|
|        367|       1|     1|Warren, Mrs. Fran...|female|              60.0|    1|    0|  75.25|       C|
|        476|       0|     1|Clifford, Mr. Geo...|  male| 42.21393334298665|    0|    0|   52.0|       S|
|        539|       0|     3|Risien, Mr. Samue...|  male|27.844465003907715|    0|    0|   14.5|       S|
|        599|       0|     3|   Boulos, Mr. Ha

In [7]:
pipeline = featurize(['Age', 'Fare'], ['Pclass','SibSp','Parch'], ['Sex', 'Embarked'])
m = pipeline.fit(df)

In [8]:
df_features = m.transform(df).select(col('PassengerId').alias('id'), col('Survived').alias('label'), col('features'))
df_features.show(truncate=False)

+---+-----+----------------------------------------------------------------------+
|id |label|features                                                              |
+---+-----+----------------------------------------------------------------------+
|246|0    |(19,[0,1,3,6,10,16],[44.0,90.0,1.0,1.0,1.0,1.0])                      |
|346|1    |(19,[0,1,4,10,17],[24.0,13.0,1.0,1.0,1.0])                            |
|360|1    |(19,[0,1,2,4,10],[26.24842701587761,7.8792,1.0,1.0,1.0])              |
|367|1    |(19,[0,1,3,5,10,18],[60.0,75.25,1.0,1.0,1.0,1.0])                     |
|476|0    |(19,[0,1,3,4,10,16,17],[42.21393334298665,52.0,1.0,1.0,1.0,1.0,1.0])  |
|539|0    |(19,[0,1,2,4,10,16,17],[27.844465003907715,14.5,1.0,1.0,1.0,1.0,1.0]) |
|599|0    |(19,[0,1,2,4,10,16,18],[26.552053955241124,7.225,1.0,1.0,1.0,1.0,1.0])|
|725|1    |(19,[0,1,3,5,10,16,17],[27.0,53.1,1.0,1.0,1.0,1.0,1.0])               |
|855|0    |(19,[0,1,5,10,17],[44.0,26.0,1.0,1.0,1.0])                            |
|861

In [9]:
engine.write(df_features,'train', mode='overwrite')

In [10]:
df = engine.read('.etl.clean.test')
df.show()

+-----------+------+--------------------+------+------------------+-----+-----+--------+--------+
|PassengerId|Pclass|                Name|   Sex|               Age|SibSp|Parch|    Fare|Embarked|
+-----------+------+--------------------+------+------------------+-----+-----+--------+--------+
|        911|     3|"Assaf Khalil, Mr...|female|              45.0|    0|    0|   7.225|       C|
|        933|     1|Franklin, Mr. Tho...|  male| 42.21393334298665|    0|    0|   26.55|       S|
|       1042|     1|Earnshaw, Mrs. Bo...|female|              23.0|    0|    1| 83.1583|       C|
|       1131|     1|Douglas, Mrs. Wal...|female|              48.0|    1|    0| 106.425|       C|
|       1140|     2|Hold, Mrs. Stephe...|female|              29.0|    1|    0|    26.0|       S|
|       1185|     1|Dodge, Dr. Washin...|  male|              53.0|    1|    1| 81.8583|       S|
|       1257|     3|Sage, Mrs. John (...|female|22.789618277529406|    1|    9|   69.55|       S|
|       1308|     3|

In [11]:
df_features = m.transform(df).select(col('PassengerId').alias('id'), col('features'))
df_features.show(truncate=False)

+----+---------------------------------------------------------------------+
|id  |features                                                             |
+----+---------------------------------------------------------------------+
|911 |(19,[0,1,2,4,10,18],[45.0,7.225,1.0,1.0,1.0,1.0])                    |
|933 |(19,[0,1,3,4,10,16,17],[42.21393334298665,26.55,1.0,1.0,1.0,1.0,1.0])|
|1042|(19,[0,1,3,4,11,18],[23.0,83.1583,1.0,1.0,1.0,1.0])                  |
|1131|(19,[0,1,3,5,10,18],[48.0,106.425,1.0,1.0,1.0,1.0])                  |
|1140|(19,[0,1,5,10,17],[29.0,26.0,1.0,1.0,1.0])                           |
|1185|(19,[0,1,3,5,11,16,17],[53.0,81.8583,1.0,1.0,1.0,1.0,1.0])           |
|1257|(19,[0,1,2,5,10,17],[22.789618277529406,69.55,1.0,1.0,1.0,1.0])      |
|1308|(19,[0,1,2,4,10,16,17],[27.844465003907715,8.05,1.0,1.0,1.0,1.0,1.0])|
|1041|(19,[0,1,5,11,16,17],[30.0,26.0,1.0,1.0,1.0,1.0])                    |
|1075|(19,[0,1,2,4,10,16],[27.844465003907715,7.75,1.0,1.0,1.0,1.0])       |

In [12]:
engine.write(df_features,'test', mode='overwrite')