### Przykładowe problemy związane ze skalowalnością zadań ML:

Ograniczenie CPU: Dane mieszczą się w pamięci RAM, ale proces uczenia trwa za długo. Np. W przypadku konieczności sprawdzenia wielu kombinacji parametrów modelu, wielu modeli, itd.


Ograniczenia pamięci: Dane są na tyle duże, że nie mieszczą się w pamięci RAM.


#### Pipeline

![](https://github.com/kornisch/ds-notebooks/blob/main/img/ml-Pipeline.png?raw=1)

#### Pipeline Model

![](https://github.com/kornisch/ds-notebooks/blob/main/img/ml-PipelineModel.png?raw=1)

### Potok przetwarzania ML

* <b>DataFrame</b>: interfejs API ML używa DataFrame, w którym można przechowywać różne typy danych. Na przykład DataFrame może mieć różne kolumny przechowujące tekst, wektory cech, prawdziwe etykiety i prognozy.

* <b>Transformer</b>: Transformator to algorytm, który może przekształcić jedną ramkę danych w inną ramkę danych. Na przykład model ML to transformator, który przekształca ramkę danych z funkcjami w ramkę danych z prognozami. Innym przykładem transformatora jest StringIndexer, który koduje zmienne tekstowe jako zmienne całkowito liczbowe.


* <b>Estimator</b>: Estymator to algorytm, który który można zaaplikować do DataFrame w celu wytworzenia transformatora. Np. Algorytm uczenia się jest estymatorem, który trenuje się na DataFrame i tworzy model.


* <b>Pipeline</b>: Potok przetwarzania łączy wiele transformatorów i estymatorów razem, aby określić przepływ pracy ML.


* <b>Parametr</b>: Wszystkie transformatory i estymatory mają teraz wspólny interfejs API do określania parametrów.

### Wczytanie danych

In [None]:
import os
user_name = os.environ.get('USER')
print(user_name)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config('spark.driver.memory','1g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()

In [None]:
gs_path = f'gs://bdg-lab-{user_name}/survey/2020/survey_results_public.csv'

In [None]:
db_name = user_name.replace('-','_')

In [None]:
spark.sql(f'DROP DATABASE IF EXISTS {db_name} CASCADE')
spark.sql(f'CREATE DATABASE {db_name}')
spark.sql(f'USE {db_name}')

In [None]:
table_name = "survey_2020"

In [None]:
spark.sql(f'DROP TABLE IF EXISTS {table_name}')

spark.sql(f'CREATE TABLE IF NOT EXISTS {table_name} \
          USING csv \
          OPTIONS (HEADER true, INFERSCHEMA true, NULLVALUE "NA") \
          LOCATION "{gs_path}"')

In [None]:
spark.sql(f'describe {table_name}').show(100)

### Przygotowanie danych do analizy

W ramach zadania chcemy stworzyć klasyfikator, który będzie przewidywać czy respondent zarabia więcej niż 60000 USD rocznie.

In [None]:
spark_df= spark.sql(f'SELECT *, CAST((convertedComp > 60000) AS STRING) AS compAboveAvg \
                    FROM {table_name} where convertedComp IS NOT NULL ')
spark_df.limit(5).toPandas()

<B>Dążymy do tego, żeby przygotować jeden wektor cech oraz jedną kolumnę z oznaczeniami.</B>

Kodujemy kolumny tekstowe na numeryczne oraz kodujemy wartości liczbowe na reprezentacje onehotencoder. Następnie dokonujemy asemblacji do jednego wektora.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
# chcemy przewidziec compAboveAvg
y = 'compAboveAvg'
# na podstawie:
feature_columns = ['OpSys', 'EdLevel', 'MainBranch' , 'Country', 'JobSeek', 'YearsCode']

In [None]:
#Zaczynamy od transformatora StringIndexer, zamieniajacego wartosci 'string' na liczbe
# dla cech, ktore zostana wykorzystane do predykcji

##### najpierw pokazujemy prosta petle z FOR, a potem zrefactorujmy do list comprehension
stringindexer_stages_1 = []
for c in feature_columns:
    stringindexer_stages_1.append (StringIndexer(inputCol=c, outputCol='stringindexed_' + c).setHandleInvalid("keep"))

# i dla zmiennej objaśnianej
stringindexer_stages_1.append(StringIndexer(inputCol=y, outputCol='label').setHandleInvalid("keep"))


<b>handleInvalid</b> = How to handle invalid data during transform(). Options are 'keep' (invalid data presented as an extra categorical feature) or error (throw an error).

In [None]:
# Refactoring do list comprehension
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='stringindexed_' + c).setHandleInvalid("keep") for c in feature_columns]

# i dla zmiennej objaśnianej
stringindexer_stages += [StringIndexer(inputCol=y, outputCol='label').setHandleInvalid("keep")]
stringindexer_stages

In [None]:
# Po wykonaniu takiej transformacji do DF zostaje dodane  7 nowych kolumn z prefixem "stringindexed_"
Pipeline(stages=stringindexer_stages).fit(spark_df).transform(spark_df).toPandas()

In [None]:
onehotencoder_stages = [OneHotEncoder(inputCol='stringindexed_' + c, outputCol='onehot_' + c) for c in feature_columns]

Rozbudowujemy pipeline:

Po wykonaniu takiej transformacji (stringIndexer+onehotencoder) do DF zostaje dodane  6 nowych kolumn z prefixem "onehot_".

In [None]:

pa = Pipeline(stages=stringindexer_stages + onehotencoder_stages).fit(spark_df).transform(spark_df).toPandas()

In [None]:
pa.columns

Nowe kolumny zawieraja wartosci typu SparseVector zawierajacy mape bitowa.


In [None]:
from IPython.display import Image
from IPython.core.display import HTML
Image(url= "https://miro.medium.com/max/2400/1*ggtP4a5YaRx6l09KQaYOnw.png")

In [None]:
print("Orginal values:")
print(pa['OpSys'].unique())
print ("---------")
print("StringIndexed values:")
print(pa['stringindexed_OpSys'].unique())
print ("---------")
print("OneHot values:")
print(pa['onehot_OpSys'].unique())

#### <B>Asemblacja</B> - połączenie wszystkich kolumn predykcyjnych do jednej (kolumna features)

In [None]:
extracted_columns = ['onehot_' + c for c in feature_columns]
vectorassembler_stage = VectorAssembler(inputCols=extracted_columns, outputCol='features')

### Połączenie wszystkich krokþw przygotowania danych w jednym potoku przetwarzania (pipeline)

In [None]:
# wybór kolumn do ostatecznej ramki danych
# poza kolumnami features i label (które będą wykorzystywane do budowania modelu)
# zostawiamy m.in. oryginalne kolumn (feature_columns)
final_columns = [y] + feature_columns + extracted_columns + ['features', 'label']
final_columns

In [None]:
transformed_df = Pipeline(stages=stringindexer_stages + \
                          onehotencoder_stages + \
                          [vectorassembler_stage]).fit(spark_df).transform(spark_df).select(final_columns)

transformed_df.limit(5).toPandas()

### Podzial na zbior treningowy/testowy

In [None]:
training, test = transformed_df.randomSplit([0.8, 0.2], seed=1234)

In [None]:
training.count()

### Uczenie modelu - model.fit()

In [None]:
# na poczatek wybierzemy drzewo decyzyjne. Nie musimy podawac zadnych parametrow
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [None]:
simple_model = Pipeline(stages=[dt]).fit(training)

In [None]:
simple_model.stages[0]

### Predykcja - model.transform()

In [None]:
pred_simple = simple_model.transform(test)

In [None]:
show_columns = final_columns + ['prediction', 'rawPrediction', 'probability']
pred_simple.limit(5).select(show_columns).toPandas()

## Ewaluacje

In [None]:
# macierz pomyłek (confusion matrix)
label_and_pred = pred_simple.select('label', 'prediction')
label_and_pred.groupBy('label', 'prediction').count().toPandas()

In [None]:
# Ewaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [None]:
auroc_simple = evaluator.evaluate(pred_simple)
auroc_simple

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_m = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_m.evaluate(pred_simple)
accuracy

## Dodanie hiperparametrów

In [None]:
# Jakie wartości hiperparametru maxDepth mają być przetestowane:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(dt.maxDepth, [2,3,4,5,6]).\
    build()

In [None]:
# Walidacja krzyżowa wykonwyana w celu optymalizacji hiperparametrów
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [None]:
# Budowa modelu na podstawie danych treningowych
cv_model = cv.fit(training)

In [None]:
cv_model.bestModel

## Predykcja z nowym modelem

In [None]:
# Jak wygląda predykcja na zbiorze danych testowych?
pred_cv = cv_model.transform(test)
show_columns = final_columns + ['prediction', 'rawPrediction', 'probability']
pred_cv.limit(5).select(show_columns).toPandas()

In [None]:
# Confusion matrix
label_and_pred = pred_cv.select('label', 'prediction')
label_and_pred.groupBy('label', 'prediction').count().toPandas()

In [None]:
auroc_cv = evaluator.evaluate(pred_cv)
auroc_cv

In [None]:
acc_cv = evaluator_m.evaluate(pred_cv)
acc_cv

## Klasyfikacja za pomoca Gradient Boosted Trees

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
model = gbt.fit(training)

In [None]:
evaluator.evaluate(model.transform(test))

## Zadania:

* Czy mozna jeszcze poprawic jakosc predykcji:
    * a) dodajac cechy
    * b) zmieniajac model
    * c) lepiej dobierajac parametry modelu ?

In [None]:
#Kod w R
#library(data.table)
#srv <- fread("survey_results_public.csv")
#srv$OpSys2 <- srv$OpSys == "Windows"
#library(rpart)
#srv$CompAboveAvg <- CompAboveAvg$ConvertedComp > 60e3
#dt_fit = rpart(CompAboveAvg ~ Age + EdLevel + JobSeek + OpSys + YearsCode , data = srv, method = 'class')
#pred_y = predict(dt_fit, type = 'class')
#table(predict(dt_fit, srv[,c("Age" , "EdLevel", "JobSeek", "OpSys", "YearsCode")], type = "class"), srv$CompAboveAvg)
#srv(cor)
