## Creating Custom Pyspark Estimators and Transformer
This tutorial is explains how to setup a custom pyspark estimators and models for use inside pyspark pipelines. In general there are two classes needed, an estimator class and a transformer class. There will be 4 classes use to accomplish this

+ HasVocabList, a class for storing a list of strings (persitant in the Transformer)

+ InputValidation, test that required columns are present

+ Estimator takes a dataframe input and return of Model
    * fits the vocabList
    * has _fit Method that returns the Transformer
+ transformer takes a dataframe input, returns a transformed data frame
    * is initilazed by the Estimator.fit() method
    * has a persistant vocabList
    * has a transform method that returns a new data frame

### Test Example:
For an example, we're going to create an Estimator and a Model takes a data frame input and creates a column of one hot encoded data (column of 0s and 1s) based whether the data containers a list of strings 
+ Estimator,
    takes data frame as input
    inside the inputCol, collects a list of every unique value and stores as vocabList
    + param: inputCol string input column name
    + param: featureCol string output column name
    + returns a Transfomer
   
inputCol and featuresCol can be imported from pyspark shared parameters, but to store a custom list of strings a new class is needed.





#### Logging Explanation
here a logger is setup for debugging purposes, use logger.setLevel(logging.DEBUG) to change the level to get more verbose output

In [12]:
# imports and logging setup
from pyspark import keyword_only
from pyspark.ml import Estimator, Transformer
from pyspark.ml.param.shared import HasInputCol, HasFeaturesCol, Param, Params, HasInputCol, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable, MLWritable, MLReadable
from pyspark.ml import PipelineModel
from pyspark.sql.types import  IntegerType, FloatType
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd

## setups logging
import logging

try: 
    logger.debug('logger is up')
except:
    name = 'pysparkCustomEstimator'
    formatter = logging.Formatter(fmt='%(asctime)s -  %(name)s - %(levelname)s  - %(message)s')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger = logging.getLogger(name)
    logger.setLevel(logging.WARNING)
    logger.addHandler(handler)


#### Create class to store VocabList
+ This create a getVocab method use to store variables 
+ super class help inhertience in parernt method

In [13]:
class HasVocabList(Params):
    '''
    MixIn for pyspark for pyspark estimators and transforms (holds a list of strings)

    '''
    def __init__(self):
        super(HasVocabList, self).__init__()
        self.vocabList = Param(self, "vocabList",  "list of strings ",
                           typeConverter=TypeConverters.toListString)

    def getVocabList(self):
        return self.getOrDefault(self.vocabList)
    

#### Creating an input Validator
This is a method that simply checks whether the expected inputCol is in the df being passed in.  It also checks to see whether featureCol already exisits in the df.  Since a featureCol is going to be created in the transform method, if it's already there this throws up an error 

In [14]:
class InputValidation():
    def __init__(self):
        super(InputValidation, self).__init__()
    def validate(self, df):
        if self.getInputCol() in df.columns:
            logger.debug('inputCol: {} found in df columns'.format(self.getInputCol() ))
        else:
            raise KeyError('inputCol: {0} not found in df columns {1}'.format( df.columns))
        if len(df.columns) == len(list(set(df.columns))):
            pass
        else:
            logger.warning('duplicate columns found in df, may cause unstable behavior')
        if self.getFeaturesCol() in df.columns:
            raise KeyError('data frame already contains featuresCol {}'.format(self.getFeaturesCol() ))
        else:
            pass
        logger.debug('input Validated')

#### Creating the Transform
The tranformer takes a df input, uses the vocab list and returns a data frame with a new column of data in it
+ When defining the tranfomer class, Inheret the classes of the parameters needed and the Transformer class 
    + in this case we use our HasVocabList class and HasInputCol, HasFeaturesCol from pyspark
+ keyword_only decorartor saves actual input keyword arguments in `_input_kwargs`
+ intit and set params, essentailly save all the keyword args to self.
+ _tranform takes a data frame and returns a data from
    + Uses a udf (user defined function) to apply the _look_up helper function across the rows of the spark data frame
+ _look_up is a helper function that takes a string and return 0 or 1 depending on whether that string is in the list

+ _input_validation method is use to ensure the need columns are present, and no duplicate columns are use


In [15]:
class VocabListTransformer(Transformer, HasInputCol, HasFeaturesCol, HasVocabList, InputValidation,  
                           DefaultParamsReadable, DefaultParamsWritable, MLReadable, MLWritable):

    @keyword_only
    def __init__(self,  inputCol=None, featuresCol=None, vocabList=None):
        super(VocabListTransformer, self).__init__()
        logger.debug('VocabTranformerClass Initialized')
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, featuresCol=None, vocabList=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def _look_up(self, x):
        if x in self.getVocabList():
            return 1
        else:
            return 0

    def _transform(self, df):
        logger.debug('VocabTranformerClass transform method called')
        self.validate(df)
        # creates an on the fly udf  that applies a the look up help function to every row of the data frame
        tranform_udf = udf(lambda x: self._look_up(x), IntegerType())
        # creates a new data frame with a new column with the results on the look_up on each row
        df = df.withColumn(self.getFeaturesCol(), tranform_udf(self.getInputCol()))
        return df 


        

#### Creating the Estimator
The estimator uses inhertiance to store variables, and use a standard interface to extract them back.

+ When defining the class, Inheret the classes of the parameters needed
    + in this case we use our HasVocabList class and HasInputCol, HasFeaturesCol from pyspark
+ keyword_only decorartor saves actual input keyword arguments in `_input_kwargs`
+ input_validation method is use to ensure the need columns are present, and no duplicate columns are use


In [16]:
class VocabListEstimator(Estimator, HasInputCol, HasFeaturesCol, HasVocabList,  InputValidation):

    @keyword_only
    def __init__(self, inputCol=None, featuresCol=None):
        super(VocabListEstimator, self).__init__()
        logger.debug('VocabEstimatorClass Initialized')
        kwargs = self._input_kwargs
        self.setParams(**kwargs)  

    @keyword_only
    def setParams(self, inputCol=None, featuresCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _fit(self, df):
        logger.debug('VocabTranformerClass fit method called ')
        self.validate(df)
        self._get_vocab(df)
        ## returns the VocabListTransformer, with all the params from 
        return VocabListTransformer(inputCol=self.getInputCol(),
                                    featuresCol=self.getFeaturesCol(),
                                    vocabList=self.getVocabList())
    def _get_vocab(self, df):
        vocabList =  np.ravel(df.select(self.getInputCol()).distinct().collect())
        vocabList = [str(v) for v in vocabList]
        self._set(vocabList=vocabList)

#### Using and Testing the Methods
Fire up a local spark context!


In [17]:
spark = SparkSession.builder.getOrCreate()       

#### Create Some Test Data

In [18]:
# create a dataframe for fitting
df = spark.createDataFrame(pd.DataFrame({'pet':['cat', 'dog', 'fish']}))
# create a dataframe for testing, with a new unseen category.
df_new = spark.createDataFrame(pd.DataFrame({'pet':['cat', 'dog', 'fish', 'frog']}))
df.show()


+----+
| pet|
+----+
| cat|
| dog|
|fish|
+----+



#### Estimator / Transformer Usage
+ Estimate is intially fit, and returns a transformer object (including the fitted vocab list)
+ This is then used to transform unseen data

In [19]:
# Intiailizes the Estimator
trans = VocabListEstimator(inputCol='pet', featuresCol='SeenBefore')
# Fits the Estimator and returns the model
trans = trans.fit(df)
# transforms new and unseen data 
df_transformed = trans.transform(df_new)
df_transformed.show()

+----+----------+
| pet|SeenBefore|
+----+----------+
| cat|         1|
| dog|         1|
|fish|         1|
|frog|         0|
+----+----------+



#### Persistance and Integration to a Pyspark Pipeline 
To use this method inside a pyspark pipeline, just intialized a PipelineModel with a list of stages including the transformer ovject


In [20]:
stages = [trans]
pipe = PipelineModel(stages=stages)
pipe.transform(df_new)

DataFrame[pet: string, SeenBefore: int]

#### Loading and Save of the Transformer
Since  DefaultParamsReadable, DefaultParamsWritable, MLReadable, MLWritable were inherited on the definition of the
transformer class, then the transform should be persistant and savable and loadable inside a pyspark pipeline.
+ the transformer needs at attr on main, to be loadable


In [23]:
# sets the class on main, so that it can be loaded 
m = __import__("__main__")
setattr(m, 'VocabListTransformer',VocabListTransformer)
        
path = '_testPipeline.pipe'
# save the tranformer
pipe.write().overwrite().save(path)
# load the transformer
pipe_loaded = PipelineModel.load(path)
# test the loaded transformer
pipe_loaded.transform(df_new).show()

+----+----------+
| pet|SeenBefore|
+----+----------+
| cat|         1|
| dog|         1|
|fish|         1|
|frog|         0|
+----+----------+

