### Inicjalizacja środowiska

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.param import Param
from pyspark.ml.param.shared import HasInputCol, HasOutputCol

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import * 
from pyspark.ml.linalg import Vectors, VectorUDT

from pyspark.sql.functions import floor, rand, udf, max
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from post_extractor.modules.features_ import (
    MeanFeaturesTransformer,
    MedianFeaturesTransformer,
    NumberOfOccurrencesFeaturesTransformer
)

sconf = SparkConf()              \
    .setMaster('local[*]')       \
    .setAppName('PipelineFlow')

sc = SparkContext.getOrCreate(sconf)
sess = SparkSession(sc)
sqlContext = SQLContext(sc)
    

### Wczytywanie plików

In [3]:
def load_features(spark_ctx, files):
    rdd = spark_ctx.wholeTextFiles(files)
    rdd = rdd.map(lambda x: (x[0], x[1]))
    df = rdd.toDF(['file', 'content'])
    return df


### Przykład zastosowania TransformerProxy do automatyzacji ewaluacji

`TransformerProxy` jest obiektem który opakowuje inny Transformer. Dzięki temu, możliwe jest stworzenie uniwersalnego pipeline'u (jak na rysunku poniżej) bez specyfikowania od razu konkretnych implementacji poszczególnych etapów. Np. wiemy, że pierwszy etap parsuje plik html dzieląc go na słowa, drugi etap usuwa obrazki a trzeci rozpoznaje i zlicza czasowniki, ale nie wiemy jakie konkretne implementacje będziemy chcieli dostarczyć dla poszczególnych etapów.
 
W szczególności jeśli bedziemy chcieli mieć wiele różnych implementacji dla tego samego etapu opisane podejście będzie użyteczne. Testowanie takiego przepływu będzie odbywało się za pomocą klasy CrossValidator która zostanie opisana później. Na razie wspomnijmy jedynie o tym, że CrossValidator nie potrafi modyfikować pipeline'u poprzez zamianę np. jednego transformera na drugi, potrafi natomiast modyfikować parametry kolejnych etapów przepływu. Dzięki zastosowanemu podejściu CrossValidator będzie w stanie testować kombinację różnych implementacji poszególnych etapów pipeline'u.

![title](pipeline.jpg)

### Klasa TransformerProxy

In [9]:
class DenseVectorTransformer(Transformer, HasInputCol, HasOutputCol):
    def __init__(self):
        super(DenseVectorTransformer, self).__init__()
    def _transform(self, dataset):
        toDenseVector = udf(lambda arr: Vectors.dense(arr), VectorUDT())
        return dataset.withColumn(self.getOutputCol(), toDenseVector(self.getInputCol()))

In [10]:
class TransformerProxy(Transformer):

    def __init__(self):
        super(TransformerProxy, self).__init__()
        self.transformer = Param(self, "transformer", "")

    def set_transformer(self, transformer):
        self._paramMap[self.transformer] = transformer
        return self

    def get_transformer(self):
        return self.getOrDefault(self.transformer)

    def _transform(self, dataset):
        return self.get_transformer().transform(dataset)

### Utworzenie instancji transformerów

W przygotowanym pipeline możemy wykorzystać w pierwszym TransformerProxy trzy różne implementacje: 
- MeanFeaturesTransformer 
- MedianFeaturesTransformer 
- NumerOfOccurrencesFeaturesTransformer 

Możemy je zatem przekazać do abstrakcji ParamGridBuilder'a, który będzie parametrem przekazanym do klasy CrossValidator.   

In [None]:
import pprint
pprint.pprint(MeanFeaturesTransformer.__doc__)

In [None]:
pprint.pprint(MedianFeaturesTransformer.__doc__)

In [None]:
pprint.pprint(NumberOfOccurrencesFeaturesTransformer.__doc__)

In [11]:
features = {
    "features1": [
        "leaf",
        "has-attribute-class",
    ],
    "features2": [
        "contains-adjectives",
        "contains-date"
    ]
}

content_column='content'
metrics_column='metric'

tf = [MeanFeaturesTransformer, MedianFeaturesTransformer, NumberOfOccurrencesFeaturesTransformer]

features_transfomers = {featureSetName : [transformer(features=features, inputCol=content_column, 
                                                       outputCol=metrics_column)
                   for transformer
                   in tf] for featureSetName, features in features.items()}


dv_transformer = DenseVectorTransformer()
dv_transformer.setInputCol(metrics_column).setOutputCol('features')

dt = DecisionTreeClassifier(labelCol='label')

lr = LogisticRegression()


### Przygotowanie przestrzeni parametrów 

Każdy transformer w pipelinie stanowi osobny wymiar w przestrzeni parametrów.
W naszym przypadku grid ma tylko jeden wymiar o długości 3

In [12]:
metric_stage_proxy = TransformerProxy()

param_grids = {}

for featuresSetName, transformersList in features_transfomers.items():
    param_grid_builder = ParamGridBuilder()
    param_grid_builder.addGrid(metric_stage_proxy.transformer, transformersList)
    param_grid = param_grid_builder.build()
    param_grids[featuresSetName] = param_grid

### Przygotowanie modyfikowalnego pipeline'u
W tej wersji, wszystkie istniejące wczesniej stage zastępujemy obiektami `TransformerProxy`

In [13]:
parameterized_pipeline = Pipeline(stages=[metric_stage_proxy, dv_transformer, dt])

In [14]:
feature_file = 'external/data/'
loaded_features = load_features(sc, feature_file)
dataWithLabels = loaded_features.withColumn('label', floor(rand() * 3).cast(DoubleType()))

CrossValidator uwzględniając wszystkie kombinacje dostarczonych parametrów wskazuje który zestaw parametrów cechuje się najlepszymi wynikami.

In [17]:
# CrossValidator will automatically find the best set of parameters
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='label')

cv_results = {} 

for featuresSetName, param_grid in param_grids.items():
    cv = CrossValidator(estimator=parameterized_pipeline,
                       estimatorParamMaps=param_grid,
                       evaluator=evaluator,
                       numFolds=3)
    cv_result = cv.fit(dataWithLabels)
    cv_results[featuresSetName] = cv_result

### Wypisanie nazw transformerow wybranych przez CV

In [13]:
def display_results(cv_results):
    bestModels = {}
    for featureSetName, cv_result in cv_results.items():
        model_metric_pairs = zip(cv_result.avgMetrics, cv_result.getEstimatorParamMaps())
        bestModels[featureSetName] = sorted(model_metric_pairs, key=lambda x: x[0])[0]
 
    model_metric_pairs = sorted(bestModels.items(), key=lambda x: -x[1][0])
    best_model = model_metric_pairs[0]
 
    feature_set_name, best_metric_model_pair = best_model
    metric, model = best_metric_model_pair
    print("Best model transformers (feature set %s): " % feature_set_name)
    for pipeline_param, transformer in model.items():
      print(type(transformer).__name__)


Selected transformers:
MeanFeaturesTransformer


In [None]:
display_results(cv_results)