### Inicjalizacja środowiska

In [2]:
import json

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml import Pipeline, Transformer

from modules.features import (
    MeanFeaturesTransformer,
    MedianFeaturesTransformer,
    NumberOfOccurrencesFeaturesTransformer
)

sc = SparkContext('local[*]', 'PipelineFlow')
sess = SparkSession(sc)
sqlContext = SQLContext(sc)
    

### Wczytywanie plików

In [4]:
def load_features(spark_ctx, files):
    rdd = spark_ctx.wholeTextFiles(files)
    rdd = rdd.map(lambda x: (x[0], x[1]))
    df = rdd.toDF(['file', 'content'])
    return df

def load_posts(spark_ctx, files):
    rdd = spark_ctx.wholeTextFiles(files)
    rdd = rdd.map(lambda x: (x[0], json.loads(x[1])))
    df = rdd.toDF(['file', 'content'])
    return df

### Przykład zastosowania TransformerProxy do automatyzacji ewaluacji

Eksploracja metod wykorzystuje dostarczany przez Sparka mechanizm `CrossValidator`a, który szuka optymalnych wartości w zadanej wcześniej przestrzeni parametrów pipeline'u.

Ponieważ `CrossValidator` może jedynie podmieniać wartości parametrów stage'y , nie zaś same stage, wykorzystujemy obiekty `TransformerProxy` do zasymulowania podmiany stage'y w trakcie cross-walidacji.

`TransformerProxy` posiada pole `transformer` przeznaczone dla 'właściwego' transformera, który będziemy chcieli wypróbować. 

`CrossValidator` otrzyma przestrzeń parametrów, w której zamiast 'zwykłych' parametrów liczbowych będą się znajdować różne implementacje transformerów, przypisane do odpowiednich stage'y.

Tym spososbem będziemy w stanie modyfikować logikę działania pipeline'u bez tworzenia własnych nakładek a jedynie kreatywnie wykorzystując istniejące w Sparku mechanizmy.

### Klasa TransformerProxy

In [8]:
class TransformerProxy(Transformer):

    def __init__(self):
        super(TransformerProxy, self).__init__()
        self.transformer = Param(self, "transformer", "")

    def set_transformer(self, transformer):
        self._paramMap[self.transformer] = transformer
        return self

    def get_transformer(self):
        return self.getOrDefault(self.transformer)

    def _transform(self, dataset):
        return self.get_transformer().transform(dataset)

### Utworzenie instancji transformerów

In [10]:
features = [
        "leaf",
        "has-attribute-class",
    ]

content_column='content'
metrics_column='metric'

mean = MeanFeaturesTransformer(features=features)
mean.setInputCol(content_column).setOutputCol(metrics_column)

median = MedianFeaturesTransformer(features=features)
median.setInputCol(content_column).setOutputCol(metrics_column)

count = NumberOfOccurrencesFeaturesTransformer(features=features)
count.setInputCol(content_column).setOutputCol(metrics_column)
    

#### Przygotowanie przestrzeni parametrów 

Każdy transformer w pipelinie stanowi osobny wymiar w przestrzeni parametrów.
W naszym przypadku grid ma tylko jeden wymiar o długości 2

In [12]:
metric_stage_proxy = TransformerProxy()

param_grid_builder = ParamGridBuilder()
  param_grid_builder.addGrid(metric_stage_proxy.transformer, [mean, median, count])
param_grid = param_grid_builder.build()

### Przygotowanie modyfikowalnego pipeline'u
W tej wersji, wszystkie istniejące wczesniej stage zastępujemy obiektami `TransformerProxy`

In [14]:
parameterized_pipeline = Pipeline(stages=[metric_stage_proxy])

In [15]:
evaluator = RegressionEvaluator(labelCol="metric",
                                predictionCol="evaluation",
                                metricName="rmse")

# CrossValidator will automatically find the best set of parameters
cv = CrossValidator(estimator=parameterized_pipeline,
                   estimatorParamMaps=parameter_grid,
                   evaluator=evaluator,
                   numFolds=1) # numFolds=1 zapewnia, że każdy zestaw metod zostanie przetestowany tylko raz

In [16]:
feature_file = 'external/data/featuresample.json'
loaded_features = load_features(sc, feature_file)

cv_result = cv.fit(loaded_features)

### Wypisanie nazw transformerow wybranych przez CV

In [18]:
transformed_input_dataframe = cv_result.transform(example_dataframe)
transformed_input_dataframe.show()

# print out the name of the best transformer
print("Selected transformers:")
  for stage in model.bestModel.stages:
    best_transformer_param = stage.getParam("transformer")
    best_transformer = stage._paramMap[best_transformer_param]
    print(type(best_transformer).__name__)
