Pokud pracujete s dvojkovým PySparkem, zakomentujte "UnivariateFeatureSelector"  a "from pyspark.ml.functions import vector_to_array".

In [1]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.window import Window

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import ChiSqSelector, UnivariateFeatureSelector

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel

from pyspark.ml.regression import LinearRegression

from pyspark.ml.functions import vector_to_array 

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# PySpark

## Obsah
- [Co je Spark a PySpark](#Co-je-Spark-a-PySpark)
- [Instalace](#Instalace)
- [PySpark sešny](#PySpark-sešny)
- [Práce s daty](#Práce-s-daty)
  - [Výroba tabulkoidního objektu](#Výroba-tabulkoidního-objektu) 
  - [Dataframový přístup](#Dataframový-přístup)
    - [Informační metody](#Informační-metody)
    - [Manipulace se sloupci](#Manipulace-se-sloupci)
    - [PySpark a datumy](#PySpark-a-datumy)
    - [Window funkce](#Window-funkce)
  - [SQL přístup](#SQL-přístup)
  - [Cachování](#Cachování)
- [Machine learning](#Machine-learning)
  - [String indexer](#String-indexer)
  - [Vektorizace](#Vektorizace)
  - [Scaling](#Scaling)
  - [Trénování modelu](#Trénování-modelu)
  - [Vyhodnocení přesnosti modelu](#Vyhodnocení-přesnosti-modelu)
  - [Feature selection](#Feature-selection)
  - [Gridsearch](#Gridsearch)
  - [Pipelina](#Pipelina)
  - [Ukládání modelu](#Ukládání-modelu)
  - [Oversampling v PySparku](#Oversampling-v-PySparku)
  - [Bucketování](#Bucketování)
  - [Imputing](#Imputing)
  - [One-hot encoding](#One-hot-encoding)

## Co je Spark a PySpark
Jeden počítač sice z hlediska výpočetní síly určitou práci zastane, více počítačů najednou je ale mocnějších. Přitom zde nemluvíme pouze o nějakých složitých algoritmech - tento poznatek je možná ještě očividnější při "tupém" procházení velkého množství dat řádek po řádku. Nicméně hromada počítačů/serverů (alias cluster) sama o sobě tolik nezmůže - musí být ošetřena komunikace a rozdělování práce mezi nimi. Tuto úlohu může krom jiných zastávat právě Spark. Alternativně lze cluster počítačů řídit nečím jiným, třeba [YARNem](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html), přičemž na tuto věc se Spark napojí.  
Aplikace běžící ve Sparku se skládá z jednoho driver procesu a několika executor procesů. Driver proces má za úkol spravovat informace o sparkovské aplikaci, komunikovat s uživatelem a řídit a distribuovat práci executor procesů. Executory pak vykonávají práci jim zadanou a výsledky svých výpočtů vrací na driver. V případě našeho povídání, kdy si vše budeme ukazovat na lokálu, budou driver i executory de facto obyčejné procesy. V reálném prostředí by pravda šlo též o obyčejné procesy, ty by ale běžely na odlišných mašinách.  
Spark je napsán ve Scale, což není úplně nejsnazší jazyk na pochopení. Nicméně byla by škoda o sílu tohoto frameworku přijít. Proto vznikl PySpark jako svého druhu pythoní obal nad Sparkem. Z hodně naivního pohledu připomíná PySpark Pandas (balíček pro práci s tabulkami) zkřížený s balíčkem Scikit-learn (balíček pro strojové učení). Velký rozdíl ale tkví ve způsobu vyhodnocování příkazů. V Pandách je vyhodnocování "eager" (asi nemá cenu tento termín překládat - výsledek by čtenáře jen mátl). To znamená, že když spustíme jakoukoli pandí funkci či metodu, provede se okamžitě. Oproti tomu PySpark je lazy. Některé příkazy - tzv. transformace - se při odklepnutí v IDE fakticky nespustí, pouze se přidají do grafu vyhodnocování transformací. Tento graf, resp. operace v něm, odstartuje až když je to potřeba, obvykle kvůli příkazům typu akce. Příkladem transformace mohou být všelikaké úpravy pysparkového dataframu, zatímco příkladem akce by bylo zobrazení tohoto dataframu uživateli. Důvod takovéhoto chování je redukce míry načítání obřích dataframů do paměti a zefektivnění práce s clustery serverů. Člověk musí mít toto chování pořád na paměti, jinak zjistí, že mu aplikace padá. Proč? Jelikož samotné výpočty (obvykle řetěz transformací s obříma datama) probíhá na executor procesech, driver procesy nemají alokovány tolik paměti. Pakliže ale na tyto velká data vypustíme nevhodnou akci (třeba akci "vytiskni celou obří tabulku"), tak všechny executory svá data pošlou na driver a ten to nerozchodí.  

## Instalace
V praxi Pyspark buďto bydlí na Linuxu a je napojen na HDFS, anebo ho uživatel najde v cloudu, dejme tomu v Azure jako Databrics. Nicméně první by pro potřeby tohoto výukového textu znamenalo složitejší instalaci, druhé zase vytažení platební karty. Proto si zde povíme, jak PySpark nainstalovat na obyčejný počítač, kde jako operační systém slouží MS Windows.  
Při instalaci jsem postupoval podle tohoto [návodu](https://sparkbyexamples.com/pyspark/install-pyspark-in-anaconda-jupyter-notebook/). Pokud by odkaz nebyl v budoucnu funkční, shrnu zde hlavní body:  
- instalace [Anacondy](https://www.anaconda.com/)  
- instalace Javy (tu jsem měl nainstalovanou již dříve, nicméně podle návodu by to šlo udělat i přes anacondí repozitář pomocí "conda install openjdk")  
- instalace PySparku pomocí "conda install pyspark"  
- instalace balíčku findspark  

V mém případě toto bohužel nestačilo. Nejprve se při otestování funkčnosti (napsání "pyspark" do anacondí konzole) objevila hláška na způsob
```
Python was not found; run without arguments to install from the Microsoft Store
```
Bylo třeba klepnout na Start a psát (přesněji začít psát) "Manage app execution aliases". Po kliknutí na odpovídající položku ve Startu se v okně, které se objevilo, musely vypnout všechny věci vypadající, že mají nějakou spojitost s Pythonem.  
Při následovném pokusu o spuštění PySparku se v konzoli vypsala zpráva
```
Missing Python executable 'python3', defaulting to 'C:\Users\ThereWouldBeUserName\miniconda3\Scripts\..' for SPARK_HOME environment variable. Please install Python or specify the correct Python executable in PYSPARK_DRIVER_PYTHON or PYSPARK_PYTHON environment variable to detect SPARK_HOME safely.
```
Řešení spočívalo v nastavení proměnné PYSPARK_PYTHON. To se realizuje následujícím příkazem napsaným do konzole
```
set PYSPARK_PYTHON=python
```
Zdůrazněme, že takto nastavená proměnná existuje jen do té doby, dokud není konzole zavřena. Pokud bychom na tento krok zapomenuli a spustili Jupyter Notebook, projeví se to v nekonečném vytváření sparkové sešny (obvykle první věc, kterou budeme dělat po importování balíčků). Nicméně v konzoli, ve které jsme Jupyter pustili, by se objevila výše uvedená "missing python executable" chyba.  
Zmiňme nakonec, že v konzoli jak při použití Jupyteru, tak při spuštění PySpaarku napřímo uvidíme chybu spojenou s hadoopem:
```
java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
```
Ta naši práci neovlivní (žádný Hadoop u sebe nainstalovaný nemáme), takže ji můžeme ignorovat.

## PySpark sešny
Pro komunikaci se sparkovským enginem, bez kterého se neobejdeme, je třeba vytvořit sparkovskou sešnu. To provedeme následujícím kódem (vnější sada závorek tu je jen proto, abychom mohli provádět odřádkování a neměli tak jeden nekonečný řádek):

In [2]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("toy_application")
    .getOrCreate()
)

V tomto minimálním příkladu jsme sešnu pojmenovali (skrze *appName*) a poté jsme zavolali metodu *getOrCreate*. Ta zajistí, že šesna bude singleton. Tj. pokud sešna neexistuje, bude vytvořena, ale pokud už nějaká sešna předtím vznikla, *getOrCreate* vrátí právě tu. Toto chování si demonstrujme na zobrazení spark objektu a na vypsání jeho IDčka.  
Když vložíme do buňky samostojící jméno sešny, dostaneme odkaz na Spark UI - de facto webový inteface, ve kterém můžeme běh sešny kontrolovat. Pod labelem "Version" se skrývá verze Sparku, "Master" nám zase řekne, kde vlastně Spark bydlí. Důležité pro nás v tomto bodě je "AppName", kde vidíme jméno naší aplikace - toy_application.

In [40]:
spark

Co se Spark UI týče, tak v záložce "Jobs" nalezneme obecné informace o sparkovské aplikaci a timelime graf. Ve "stages" se dá proklikat na DAGy (directed acyclic graph) reprezentující operace na sparkovských objektech a jsou zde i podrobné informace o tom, co a kdy se v rámci určité operace vlastně dělo. Ve storage záložce uvidíme, co vše bylo nacachované (viz kapitola v cca polovině výkladu - ale vidět tu je pouze sql cachování, nikoli "normální" pythoní cachování). Enviroments záložka obdahuje informace o všelijakých systémových proměnných. V executors záložce můžeme nalézt informace o executorech (co který executor dělá, kolik má k dispozici corů, kolik má reálně paměti apod.). Nakonec v sql záložce uvidíme exekuční plán sql dotazů.

Vypišme si idčko spark objektu.

In [3]:
id(spark)

1392617635216

Když se nyní pokusíme vytvořit novou sparkovsou sešnu, můžeme se přesvědčit, že nám *getOrCreate* opravdu vrátil sešnu původní.

In [4]:
spark_2 = (
    SparkSession
    .builder
    .appName("toy_application")
    .getOrCreate()
)

id(spark_2)

1392617635216

Dokonce ani snaha o nastavení odlišného appNamu nepovede k vytvoření opravdu nové sešny.

In [5]:
spark_3 = (
    SparkSession
    .builder
    .appName("another_toy_application")
    .getOrCreate()
)

id(spark_3)

1392617635216

Vidíme, že nové jméno ani nedokázalo přepsat jméno staré.

In [6]:
spark_3

Chceme-li z nějakého důvodu vytvořit sešnu novou, musíme na sešně staré zavolat *newSession*.

In [7]:
spark_4 = spark.newSession()
id(spark_4)

1392616115504

Nicméně takto vytvořená sešna není úplně oddělená od sešny původní. Reálně se hodí jen pokud bychom opravdu potřebovali mít dva od sebe oddělené namespacy, tj. pokud bychom potřebovali, aby se pod stejným jménem ve dvou sešnách nacházely jiné objekty. Koneckonců i v dokumentaci se o *newSession* píše, že 
```
Returns a new SparkSession as new session, that has separate SQLConf,
registered temporary views and UDFs, but shared SparkContext and
table cache.
```
Zde se mluví o jakémsi SparkContextu. Pokud bychom na internetu narazili na staré tutoriály (před Spark 2.0), viděli bychom, že se v nich SparkContext používá jako vstup do sparkovského enginu namísto SparkSession. Jaký je mezi těmito entitami rozdíl? SparkContext se musí složitěji (a explicitněji) nastavovat a na celém JVM (Java Virtual Machine alias to, na čem jede Spark) může existovat jen jeden. Nicméně i když ho v našem uživatelském kódu nikde nevytváříme, v pozadí pořád existuje. Když například chceme sešnu ukončit a provoláme na ní metodu *stop*, zastavíme i všechny ostatní sešny - koneckonců zkume si poté, co jsme ukončili sešnu spark_4, otevřít odkazy na Spark UI z předešlých sešen.

In [8]:
spark_4.stop()

Docstring v metodě *stop* totiž zmiňuje právě SparkContext:
```
Stop the underlying :class:`SparkContext`.
```
Jinak co se týče zastavování sešen, v našem případě, kdy pracujeme na lokálu, to potřeba není. Celý Spark se totiž vypne s vypnutím Jupyteru. Nicméně na normálním prostředí už by zastavování sešen po skončení práce bylo životně důležité, aby se neblokovaly hardwarové prostředky.  
Tyto hardwarové prostředky, které bude naše aplikace používat, bychom mohli specifikovat při vytváření sešny explicitně
```
spark_example = (
    SparkSession
    .builder
    .config("spark.executors.cores", "5")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "4G")
    .config("spark.driver.memory","2G")
    .appName("another_toy_application")
    .getOrCreate()
)
```
Možností pro nastavení je vcelku mnoho. Přitom zvolené hodnoty častokrát záleží na hardwarovém vybavení clusteru a na úloze, kterou je třeba řešit. Příklad nastavení pro různá předpokládána zatížení se nalézá v následující tabulce. Co který parametr dělá lze najít v [dokumentaci](https://spark.apache.org/docs/latest/configuration.html).

| Parametr | Nízké zatížení | Střední zatížení | Vysoké zatížení | 
| --- | --- | --- | --- |  
|spark.driver.memory|2G|3G|4G|  
|spark.driver.memoryOverhead|256M|512M|1G|
|spark.dynamicAllocation.enabled|true|true|true|
|spark.dynamicAllocation.maxExecutors|10|20|50|
|spark.executor.memory|4G|8G|12G|
|spark.executor.memoryOverhead|512M|1G|2G|
|spark.sql.shuffle.partitions|40|80|200|

Zmiňme matoucí rozdíl mezi *executors.cores* a *executor.instances*. *Executers/driver.cores* říká, kolik má být v rámci jednoho executoru či drivero alokováno jader. Oproti tomu *executor.instances* řídí, kolik vůbec executorů bude. 

Na každý pád bude spíše než ruční nastavování prostředků častější situace, kdy pro nás někdo z infrastruktury vytvoří uživatelský pool s dedikovanými prostředky, do kterého se přihlásíme variací na
```
spark_example = (
    SparkSession
    .builder
    .config("spark.yaml.queue", "pool_data_scientist")
    .appName("another_toy_application")
    .getOrCreate()
)
```

## Práce s daty
Základní stavební jednotkou dat, se kterým Spark pracuje, je tzv. Resilient Distributed Dataset (RDD). Můžeme si ho představit jako svého druhu tuple, na kterém lze efektivně práci paralelizovat. Zdůrazněme, že RDD má blíž k tuplu než k listu - je immutable, tj. operace na něm proběhlá ho nemění, nýbrž vrací zeditovanou kopii. Na každý pád dnes není moc důvodů s RDD napřímo pracovat. Vznikl totiž datový objekt, se kterým se manipuluje snadněji - DataFrame. Jméno připomíná základní stavební jednotku balíčku Pandas a podobnost není čistě náhodná - opravdu se jedná o reprezentaci tabulkoidního objektu. Naše povídání se tak bude celou dobu a zejména v této kapitole týkat právě tohoto typu objektu. Pro úplnost ještě poznamenejme, že ve Sparku existují tzv. Datasety, které ale nejsou v PySparku implementovány.

### Výroba tabulkoidního objektu

In [9]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("toy_application")
    .getOrCreate()
)

Asi nejpřímočařejším, byť ne úplně využívaným postupem na vytvoření pysparkového dataframu, je jeho konstrukce z dat zadaných v programu v podobě listu. Tento list se skládá z podlistů či z tuplů. Ty v budoucí tabulce budou reprezentovat jednotlivé řádky.

In [10]:
data_in_list = [
    ["Švejk",40,"Praha"], 
    ["Vyskočil",50,"Horní Dolní"]
]

List vložíme jako parametr do funkce *createDataFrame*.

In [11]:
table_from_list = spark.createDataFrame(data_in_list)

Výsledný dataframe si zobrazíme pomocí metody *show* (a ano, *show* je akce). Všimněme si, že jelikož jsme nespecifikovali hlavičku tabulky, objevila se nám posloupnost čísel \_1, \_2 atd.

In [12]:
table_from_list.show()

+--------+---+-----------+
|      _1| _2|         _3|
+--------+---+-----------+
|   Švejk| 40|      Praha|
|Vyskočil| 50|Horní Dolní|
+--------+---+-----------+



Pro vytvoření dataframu s hlavičkou existuje v *createDataFrame* speciální parametr - *schema* s listem obsahujícím jména sloupců.

In [13]:
table_from_list = spark.createDataFrame(
    data_in_list,
    schema=["name", "age", "city"]
)
table_from_list.show()

+--------+---+-----------+
|    name|age|       city|
+--------+---+-----------+
|   Švejk| 40|      Praha|
|Vyskočil| 50|Horní Dolní|
+--------+---+-----------+



Pysparkový dataframe lze vytvořit i z pandího dataframu - zkrátka se do *createDataFrame* namísto listu vloží právě onen pandí dataframe.

In [14]:
import pandas as pd
pandas_dataframe = pd.DataFrame({
    "name": ["Švejk", "Vyskočil"],
    "age": [40, 50],
    "city": ["Praha", "Horní Dolní"]
})
table_from_pandas = spark.createDataFrame(pandas_dataframe)
table_from_pandas.show()

+--------+---+-----------+
|    name|age|       city|
+--------+---+-----------+
|   Švejk| 40|      Praha|
|Vyskočil| 50|Horní Dolní|
+--------+---+-----------+



Pokud v *createDataFrame* uvedeme jména sloupců, mají tyto jména oproti těm v pandím dataframu prioritu.

In [15]:
import pandas as pd
pandas_dataframe = pd.DataFrame({
    "name": ["Švejk", "Vyskočil"],
    "age": [40, 50],
    "city": ["Praha", "Horní Dolní"]
})
table_from_pandas = spark.createDataFrame(
    pandas_dataframe,
    schema=["last_name", "age", "address"]
)
table_from_pandas.show()

+---------+---+-----------+
|last_name|age|    address|
+---------+---+-----------+
|    Švejk| 40|      Praha|
| Vyskočil| 50|Horní Dolní|
+---------+---+-----------+



Pro úplnost zmiňme, že pokud bychom potřebovali konverzi opačným směrem, tj. z PySparku do Pand, použijeme metodu *toPandas*. Jenže bacha - když to uděláme, půjdou všechna data na driver, který to v případě velké tabulky nerozdýchá. 

In [17]:
new_pandas_frame = table_from_pandas.toPandas()
new_pandas_frame

Unnamed: 0,last_name,age,address
0,Švejk,40,Praha
1,Vyskočil,50,Horní Dolní


Načíst data jde pochopitelně i z csv souboru. Na to slouží *read.csv*. Parametry jsou obdobné jaké v pandím *read_csv*. Zdůrazněme ale, že v parametru *path* specifikujícím cestu k souboru musí být string (resp. list stringů), nikoli *pathlib.Path*.

In [7]:
table_from_csv = spark.read.csv(path="pyspark_data/csv/data_for_csv_load.csv", sep=",", header=True)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Takto to vypadá neproblematicky. Nicméně zkusme na náš dataframe vypustit metodu *printSchema*, který ukáže datové typy sloupců. Uvidíme, že jsou všechny brány jako stringy.

In [8]:
table_from_csv.printSchema()

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)



Takové je chování, které nastává při nastavení parametru *inferSchema* na hodnotu None resp. False. Co dělat, kdybychom chtěli mít rozumné datové typy už od načtení? Když se *inferSchema* položí rovno True, PySpark načte prvních pár řádků ze souboru a z nich datový typ odvodí. Problém nastane v okamžiku, kdy se první řádky liší od řádků následujících a je tak automaticky zvolen nevhodný datový typ. Anebo se "zbytečně" kvůli typům načte celý soubor, což může trvat docela dlouho. Proto bývá vhodné definovat datový typ explicitně. Na to použijeme objekt typu *StructType* reprezentující schéma celého dataframu. Tento *StructType* obsahuje objekty *StructField* obsahující informace o jednotlivých sloupcích. První parametr konstruktoru *StructField* obsahuje jméno sloupce, druhý parametr datový typ. Třetí parametr by měl dle [dokumentace](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.types.StructField.html) obdsahovat informaci, zda pole může být nullable. Nicméně při mých pokusech na Sparku 3 se tento atribut po vytvoření dataframu nezávisle na tom, co jsem do konstuktoru vložil, automaticky nastavil na True (zjištěno prostřednictvím *table_from_csv.schema*). 

In [10]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

file_table_schema = StructType([
    StructField("id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", DoubleType()),
    StructField("gender", StringType())
])

table_from_csv = spark.read.csv(
    path="pyspark_data/csv/data_for_csv_load.csv", sep=",", 
    header=True, schema=file_table_schema
)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



In [11]:
table_from_csv.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)



Když nějaký sloupec dostane špatný datový typ (například "age" vs souboru je ve fromátu 25.0, tj. je to float, ale my mu dáme IntegerType), celý sloupec se naplnít nully (u dvojkového Sparku se nully objevily ve *všech* sloupcích).

In [21]:
file_table_schema = StructType([
    StructField("id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", IntegerType()),
    StructField("gender", StringType())
])

table_from_csv = spark.read.csv(
    path="pyspark_data/csv/data_for_csv_load.csv", 
    sep=",", header=True, schema=file_table_schema
)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|null|     M|
|200|      Mary|    Shelly|null|     F|
|300|    Johann|    Geothe|null|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|null|     M|
+---+----------+----------+----+------+



Výše jsme zmínili, že při načítání souborů mužeme uvést nikoli pouze string, ale i list.

In [22]:
csv_files_list = [
    "pyspark_data/csv/data_for_csv_load.csv", 
    "pyspark_data/csv/data_for_csv_load_2.csv"
]
table_from_csv = spark.read.csv(path=csv_files_list, sep=",", header=True)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
|160|     Isaac|    Asimov|48.0|     M|
+---+----------+----------+----+------+



Nicméně musíme si dát pozor na to, aby soubory měly stejná schémata. V opačném případě se použije schéma jednoho ze souborů. U dat ze souborů ostatních se neexistující sloupce doplní nully a nadbytečné sloupce se useknou. Je otázkou, co určuje schématu dominující soubor - pořadí v listu to není.

In [23]:
csv_files_list = [
    "pyspark_data/csv/data_for_csv_load.csv", 
    "pyspark_data/csv/data_for_csv_load_3.csv"
]
table_from_csv = spark.read.csv(path=csv_files_list, sep=",", header=True)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
|160|     Isaac|    Asimov|48.0|     M|
+---+----------+----------+----+------+



Údajně by dokonce mělo stačit do pathy vložit adresář obsahující csv soubory a do dataframu by se měl obsah všech souborů načíst. Nicméně na lokálu se mi to nepodařilo. Na druhou stranu na HDFS tato featura funguje.

Uložení dataframu do csv souboru se provede pomocí
```
table_from_csv.write.csv("data_after_saving_to_csv.csv", sep="|")
```
Když se o to ale pokusíme na lokále na Windows, obdržíme chybovou hlášku mluvící o Hadoopu. Proto pokud chceme dataframe uložit na (normální) disk, bude asi nejsnazší ho napřed zkonvertovat do pandí verze pomocí *toPandas* a poté uložit pandí metodou *to_csv*.  

Na HDFS je ale situace odlišná. Při proběhnutí ukládací metody dojde v našem konkrétním příkladě k tomu, že se vytvoří adresář "data_after_saving_to_csv.csv" (ano, včetně souborové přípony). V tomto adresáři nalezneme jednak prázdný soubor SUCCESS, jednak několikero part souborů. Počet těchto souborů odpovídá počtu partitions (data v souboru jsou ta, která byla v příslušné partition). A co že je to ta partition? Jedná se o výsek dat, který je zpracováván jedním nodem, přesněji jedním jádrem na nodu (tj. na nodu může bydlet jedna nebo více partition, avšak jedna partition nemůže být najednou na více nodech). Defaultně je počet partition roven právě počtu jader přes celý cluster nodů. Příliš málo partition vede k tomu, že některé stroje stojí a data jsou skewnutá (např. na jednom nodu je 50 záznamů a na druhém 50 milionů), příliš mnoho partitions zase znamená, že orchestrování celého výpočetního procesu zabere více zdrojů než proces sám.  

Co dělat, pokud bychom chtěli mít jenom jeden soubor namísto několika? Nejdříve si musíme ujasnit, jestli to opravdu chceme. Mít data v jednom souboru znamená mít data na jednom nodu - a na něj se nemusí vejít. Pro změnu počtu partition lze použít buďto dataframovou metodu *repartition*, anebo metodu *coalesce*. *Repartition* se snaží popřesunovat data skrze síť (shuffling) tak, aby byly velikosti jednotlivých partition zhruba stejné. Přitom nový počet partitions může být větší i menší než počet původní. U *coalesce* dává smysl jen počet menší. *Coalesce* se totiž snaží přesunout pouze data, u kterých je to nutné. Pokud na nodu už data byla v nějaké partition původní a na onom nodu má být partition nová, data zůstávají. To samozřejmě může vést k tomu, že na některých nodech bude záznamů více než na jiných, tj. můžeme se setkat se skewem.  

Mějme json o obsahu
```json
{"first_name": "John", "last_name": "Doe"},
{"first_name": "Jane", "last_name": "Doe"}
```
Do dataframu ho načteme příkazem *spark.read.json*:

In [24]:
table_from_json = spark.read.json(path="pyspark_data/json/data_for_json_load.json")
table_from_json.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
|      Jane|      Doe|
+----------+---------+



Nicméně pokud budou záznamy roztaženy na více než jeden řádek, např. takto:
```json
{
  "first_name": "John", 
  "last_name": "Doe"
},
{
  "first_name": "Jane", 
  "last_name": "Doe"
}
```
stejný kód nám spadne s chybovou hláškou:

In [25]:
table_from_json = spark.read.json(path="pyspark_data/json/data_for_json_load_multiline.json")
table_from_json.show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

Při pohledu do dokumentace by se zdálo, že řešením je použití parametru multiLine s hodnotou True. Jenže ouha, v takto vytvořené tabulce je jen jeden záznam.

In [26]:
table_from_json = spark.read.json(
    path="pyspark_data/json/data_for_json_load_multiline.json",
    multiLine=True
)
table_from_json.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
+----------+---------+



Aby Spark správně multiline jsony interpretoval, musí tyto začínat a končit hranatými závorkami, tj. musí vypadat takto:
```json
[{
  "first_name": "John", 
  "last_name": "Doe"
},
{
  "first_name": "Jane", 
  "last_name": "Doe"
}]
```

In [27]:
table_from_json = spark.read.json(
    path="pyspark_data/json/data_for_json_load_multiline_2.json",
    multiLine=True
)
table_from_json.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
|      Jane|      Doe|
+----------+---------+



Hnízděné jsony lze načíst též, ale poté musí následovat zprocesování dataframu do použitelnější podoby.

In [28]:
table_from_json = spark.read.json(
    path="pyspark_data/json/data_for_json_load_complicated.json",
)
table_from_json.show()

+--------------------+----------+---------+
|             address|first_name|last_name|
+--------------------+----------+---------+
|[{Prague, Some st...|      John|      Doe|
|[{Brno, Some stre...|      Jane|      Doe|
+--------------------+----------+---------+



Krátce zmiňme práci s parquet soubory. Parquet je columnar file storage, což fakticky znamená, že se u něj efektivně dají provádět operace na sloupcích. Například pokud bychom měli tabulku ve formátu csv a chtěli spočítat sumu jednoho sloupce, musely by se před oním výpočtem přečíst všechny řádky souboru. U parquetu postačí přečíst odpovídající jednolitou sekci parquet souboru.  
Uložení do parquetu provedeme takto:
```python
table_to_parquet.write.parquet("some_directory_created_by_this_command")
```
Zdůrazněme, že stejně jako výše u csvčka dává smysl použít *coalesce*/*repartition* na redukci počtu souborů - opět se totiž nevytvoří jeden soubor, ale adresář se souborem za každou partition.
Data z parquetu načteme tímto způsobem:
```python
spark.read.parquet("some_directory")
```

Nakonec pokud bychom chtěli dataframe uložit jako hivovsku tabulku, použijeme
```python
pyspark_table_to_hive_table.write.saveAsTable("hive_database_container.hive_table_name")
```
Jen bacha na práva. Mohlo by se totiž stát, že takto vytvořenou tabulku nebudeme právě kvůli nim později schopní smazat...

### Dataframový přístup
#### Informační metody
V této podkapitole se budeme věnovat operacím s dataframy. Víceméně nám půjde o to najít ekvivalenty k pandasím konstrukcím.  
V předcházející podkapitole jsme viděli, že se dataframe (přesněji prvních pár řádků) dá zobrazit pomocí metody *show*.

In [3]:
table_from_csv = spark.read.csv(path="pyspark_data/csv/data_for_csv_load.csv", sep=",", header=True)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



V případě, že chceme vidět jen prvních N řádků, vložíme toto číslo N jako parametr do *show*. Ten bude tudíž ekvivalentní pandasímu *head*. Defaultní hodnota tohoto parametru nesoucího jméno *n* je 20.

In [30]:
table_from_csv.show(2)

+---+----------+---------+----+------+
| id|first_name|last_name| age|gender|
+---+----------+---------+----+------+
|100|    Victor|     Hugo|25.0|     M|
|200|      Mary|   Shelly|30.0|     F|
+---+----------+---------+----+------+
only showing top 2 rows



Pakliže je délka řetězce v některém sloupci delší než 20 znaků, je z hlediska toho, co zobrazuje *show*, tento řetězec oříznut (reálně se ale obsah buňky neztrácí).

In [31]:
numbers_list = [
    [number, number*10, "abc"*number] for number in range(100)
]
numbers_frame = spark.createDataFrame(
    numbers_list,
    schema=["numbers_1", "numbers_2", "long_word"]
)
numbers_frame.show()

+---------+---------+--------------------+
|numbers_1|numbers_2|           long_word|
+---------+---------+--------------------+
|        0|        0|                    |
|        1|       10|                 abc|
|        2|       20|              abcabc|
|        3|       30|           abcabcabc|
|        4|       40|        abcabcabcabc|
|        5|       50|     abcabcabcabcabc|
|        6|       60|  abcabcabcabcabcabc|
|        7|       70|abcabcabcabcabcab...|
|        8|       80|abcabcabcabcabcab...|
|        9|       90|abcabcabcabcabcab...|
|       10|      100|abcabcabcabcabcab...|
|       11|      110|abcabcabcabcabcab...|
|       12|      120|abcabcabcabcabcab...|
|       13|      130|abcabcabcabcabcab...|
|       14|      140|abcabcabcabcabcab...|
|       15|      150|abcabcabcabcabcab...|
|       16|      160|abcabcabcabcabcab...|
|       17|      170|abcabcabcabcabcab...|
|       18|      180|abcabcabcabcabcab...|
|       19|      190|abcabcabcabcabcab...|
+---------+

Pokud ořezávání chceme vypnout, přidáme do *show* parametr *truncate* s hodnotou False.

In [32]:
numbers_frame.show(truncate=False)

+---------+---------+---------------------------------------------------------+
|numbers_1|numbers_2|long_word                                                |
+---------+---------+---------------------------------------------------------+
|0        |0        |                                                         |
|1        |10       |abc                                                      |
|2        |20       |abcabc                                                   |
|3        |30       |abcabcabc                                                |
|4        |40       |abcabcabcabc                                             |
|5        |50       |abcabcabcabcabc                                          |
|6        |60       |abcabcabcabcabcabc                                       |
|7        |70       |abcabcabcabcabcabcabc                                    |
|8        |80       |abcabcabcabcabcabcabcabc                                 |
|9        |90       |abcabcabcabcabcabca

Zdůrazněme, že *show* slouží opravdu jen na zobrazení prvních pár řádků. Pokud ale potřebujeme z dataframu prvních několik řádků vybrat, aby se s nimi posléze dály nějaké další operace, musí se použít metoda *limit*:

In [33]:
numbers_frame.limit(5).show()

+---------+---------+------------+
|numbers_1|numbers_2|   long_word|
+---------+---------+------------+
|        0|        0|            |
|        1|       10|         abc|
|        2|       20|      abcabc|
|        3|       30|   abcabcabc|
|        4|       40|abcabcabcabc|
+---------+---------+------------+



Nechat si zobrazit prvních pár řádků bývá užitečné, avšak dobré je dostat se i k nějakým informacím popisujícím celý dataframe. Již jsme si ukázali, že pokud chceme znát schéma dataframu (aka datové typy sloupců), použijeme metodu *printSchema*.

In [34]:
numbers_frame.printSchema()

root
 |-- numbers_1: long (nullable = true)
 |-- numbers_2: long (nullable = true)
 |-- long_word: string (nullable = true)



Shrnující statistiky přináší metoda *summary*. Jelikož vrací dataframe, musí se za její provolání nalepit *show*.

In [35]:
numbers_frame.summary().show()

+-------+------------------+-----------------+--------------------+
|summary|         numbers_1|        numbers_2|           long_word|
+-------+------------------+-----------------+--------------------+
|  count|               100|              100|                 100|
|   mean|              49.5|            495.0|                null|
| stddev|29.011491975882016|290.1149197588202|                null|
|    min|                 0|                0|                    |
|    25%|                24|              240|                null|
|    50%|                49|              490|                null|
|    75%|                74|              740|                null|
|    max|                99|              990|abcabcabcabcabcab...|
+-------+------------------+-----------------+--------------------+



Parametry *summary* jsou chtěné statistiky. Defaultně se ukazuje téměř vše, co *summary* dokáže. Výjimkou je snad jen možnost specifikovat chtěný percentil: 

In [36]:
numbers_frame.summary("10%").show()

+-------+---------+---------+---------+
|summary|numbers_1|numbers_2|long_word|
+-------+---------+---------+---------+
|    10%|        9|       90|     null|
+-------+---------+---------+---------+



Extrémně podobnou funkcí je *describe*. Zde se ale jako parametry neudávají statistiky, ale sloupce, nad kterými se statistiky mají napočítat.

In [37]:
numbers_frame.describe().show()

+-------+------------------+-----------------+--------------------+
|summary|         numbers_1|        numbers_2|           long_word|
+-------+------------------+-----------------+--------------------+
|  count|               100|              100|                 100|
|   mean|              49.5|            495.0|                null|
| stddev|29.011491975882016|290.1149197588202|                null|
|    min|                 0|                0|                    |
|    max|                99|              990|abcabcabcabcabcab...|
+-------+------------------+-----------------+--------------------+



In [38]:
numbers_frame.describe("numbers_2", "long_word").show()

+-------+-----------------+--------------------+
|summary|        numbers_2|           long_word|
+-------+-----------------+--------------------+
|  count|              100|                 100|
|   mean|            495.0|                null|
| stddev|290.1149197588202|                null|
|    min|                0|                    |
|    max|              990|abcabcabcabcabcab...|
+-------+-----------------+--------------------+



Pro získání počtu řádků dataframu se v PySparku nepoužije *len*, nýbrž metoda *count*.

In [39]:
numbers_frame.count()

100

#### Manipulace se sloupci
PySparkové dataframy jsou immutable. To znamená, že je jejich metody nemění, nýbrž po svém doběhnutí vrací nový dataframe. Ukažme si to na metodě přidávající/přepisující sloupec *withColumn*. Jejím prvním parametrem je název nového/přepisovaného sloupce, parametrem druhým pak hodnoty onoho sloupce. Pro jejich vytvoření budeme obvykle potřebovat funkce z *pyspark.sql.functions*. Kupříklad na šáhnutí na jiný sloupec se použije funkce *functions.col*.

In [40]:
from pyspark.sql import functions as f
table_from_csv.withColumn("age_2", f.col("age")+10)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Vidíme že tabulka je pořád stejná. Když chceme její o sloupec bohatší verzi, musíme do proměnné vložit dataframe z výstupu metody. Připomeňme, že *withColumn* je transformace, tudíž aby se něco vůbec udělalo (nejen zobrazilo, ale aby opravdu PySpark udělal něco jiného než si jen transformaci zapsal do sady úkolů), musí se zavolat akce - v našem případě metoda *show*.

In [41]:
table_from_csv = table_from_csv.withColumn("age_2", f.col("age")+10)
table_from_csv.show()

+---+----------+----------+----+------+-----+
| id|first_name| last_name| age|gender|age_2|
+---+----------+----------+----+------+-----+
|100|    Victor|      Hugo|25.0|     M| 35.0|
|200|      Mary|    Shelly|30.0|     F| 40.0|
|300|    Johann|    Geothe|75.0|     M| 85.0|
|400|    Albert|     Camus|null|     M| null|
|500|   William|Shakespear|38.0|     M| 48.0|
+---+----------+----------+----+------+-----+



Co když ale chceme vytvořit nový sloupec za použití dat starého sloupce a určité if/else logiky? Tehdy jako druhý parametr *withColumn* použijeme *functions.when*. Do této funkce vložíme jako první parametr podmínku a jako parametr druhý hodnotu, kterou má sloupec nabýt v případě její platnosti. Do ostatních buněk se vloží null. Pokud ten nechceme, pověsíme za *when* metodu *otherwise*, kde bude jako argument hodnota sloupce v případě nesplnění podmínky.

In [42]:
table_from_csv.withColumn("is_female", f.when(f.col("gender")=="F", 1)).show()

+---+----------+----------+----+------+-----+---------+
| id|first_name| last_name| age|gender|age_2|is_female|
+---+----------+----------+----+------+-----+---------+
|100|    Victor|      Hugo|25.0|     M| 35.0|     null|
|200|      Mary|    Shelly|30.0|     F| 40.0|        1|
|300|    Johann|    Geothe|75.0|     M| 85.0|     null|
|400|    Albert|     Camus|null|     M| null|     null|
|500|   William|Shakespear|38.0|     M| 48.0|     null|
+---+----------+----------+----+------+-----+---------+



In [43]:
table_from_csv.withColumn("is_female", f.when(f.col("gender")=="F", 1).otherwise(0)).show()

+---+----------+----------+----+------+-----+---------+
| id|first_name| last_name| age|gender|age_2|is_female|
+---+----------+----------+----+------+-----+---------+
|100|    Victor|      Hugo|25.0|     M| 35.0|        0|
|200|      Mary|    Shelly|30.0|     F| 40.0|        1|
|300|    Johann|    Geothe|75.0|     M| 85.0|        0|
|400|    Albert|     Camus|null|     M| null|        0|
|500|   William|Shakespear|38.0|     M| 48.0|        0|
+---+----------+----------+----+------+-----+---------+



Wheny za sebou můžeme řetězit a tak se vypočádat i s podmínkou mající více možností (hodnoty podmínek v následujícím příkladu prosím neberte vážně):

In [8]:
table_from_csv.withColumn(
    "age_category",
    f.when(f.col("age")<=25, "young").when(f.col("age")<=50, "middle-aged").otherwise("old")
).show()

+---+----------+----------+----+------+------------+
| id|first_name| last_name| age|gender|age_category|
+---+----------+----------+----+------+------------+
|100|    Victor|      Hugo|25.0|     M|       young|
|200|      Mary|    Shelly|30.0|     F| middle-aged|
|300|    Johann|    Geothe|75.0|     M|         old|
|400|    Albert|     Camus|null|     M|         old|
|500|   William|Shakespear|38.0|     M| middle-aged|
+---+----------+----------+----+------+------------+



Pro vyhození sloupce slouží metoda *drop*:

In [44]:
table_from_csv = table_from_csv.drop("age_2")
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Sloupce přejmenujeme pomocí *withColumnRenamed*, kde prvním parametrem je staré jméno sloupce a parametrem druhým jméno nové:

In [45]:
table_from_csv.withColumnRenamed("last_name", "author_last_name").show()

+---+----------+----------------+----+------+
| id|first_name|author_last_name| age|gender|
+---+----------+----------------+----+------+
|100|    Victor|            Hugo|25.0|     M|
|200|      Mary|          Shelly|30.0|     F|
|300|    Johann|          Geothe|75.0|     M|
|400|    Albert|           Camus|null|     M|
|500|   William|      Shakespear|38.0|     M|
+---+----------+----------------+----+------+



Občas je potřeba provést konverzi datového typu sloupce 

In [46]:
data_types_list = [
    ["zero", "0"],
    ["one", "1"],
    ["two", "2"]
]
data_types_frame = spark.createDataFrame(
    data_types_list,
    schema=["words", "numbers"]
)
data_types_frame.printSchema()

root
 |-- words: string (nullable = true)
 |-- numbers: string (nullable = true)



To se provede s použitím metody *cast*, která obsahuje chtěný datový typ a která je vypuštěna na příslušný sloupec vybraný pomocí *functions.col*. Efektivně starý sloupec přepisujeme sloupcem novým:

In [47]:
from pyspark.sql.types import IntegerType

data_types_frame = data_types_frame.withColumn(
    "numbers",
    f.col("numbers").cast(IntegerType())
)
data_types_frame.printSchema()

root
 |-- words: string (nullable = true)
 |-- numbers: integer (nullable = true)



Pokud bychom chtěli z nějakého důvodu dostat najednou data z celého dataframu, použijeme metodu *collect*. Všimněte si na níže uvedeném příkladu, že tato metoda vrací list řádků.  
Chceme ale opravdu najednou dostat data z celého dataframu? No, obvykle ne. Znamená to totiž, že se z jednotlivých executorů pošlou data na driver, což pro dostatečně velký dataframe vyústí v "OutOfMemory" error.

In [48]:
table_from_csv.collect()

[Row(id='100', first_name='Victor', last_name='Hugo', age='25.0', gender='M'),
 Row(id='200', first_name='Mary', last_name='Shelly', age='30.0', gender='F'),
 Row(id='300', first_name='Johann', last_name='Geothe', age='75.0', gender='M'),
 Row(id='400', first_name='Albert', last_name='Camus', age=None, gender='M'),
 Row(id='500', first_name='William', last_name='Shakespear', age='38.0', gender='M')]

Každopádně pokud už hodláme data získat pomocí collectu, můžeme z nich vyzobnout jednu buňku s pomocí indexů. Takový postup je pak vzdáleně podobný pandasímu *iloc*.

In [49]:
table_from_csv.collect()[4][1]

'William'

Jak tedy provádět vybírání sloupců a řádků správně? Pro výběr sloupců se použije metoda *select*. Zdůrazněme, že narozdíl od Pandas se tu u více sloupců nutně neočekává, že budou v listu, ale mohou jít jeden za druhým jako obyčejné argumenty.

In [50]:
table_from_csv.select("first_name", "last_name").show()

+----------+----------+
|first_name| last_name|
+----------+----------+
|    Victor|      Hugo|
|      Mary|    Shelly|
|    Johann|    Geothe|
|    Albert|     Camus|
|   William|Shakespear|
+----------+----------+



Nicméně *select* funguje i s listem:

In [51]:
table_from_csv.select(["first_name", "last_name"]).show()

+----------+----------+
|first_name| last_name|
+----------+----------+
|    Victor|      Hugo|
|      Mary|    Shelly|
|    Johann|    Geothe|
|    Albert|     Camus|
|   William|Shakespear|
+----------+----------+



Pro vybrání řádků na základě hodnoty se použije metoda *filter* (resp. *where* - jedná se o alias pro *filter*). Jelikož pro potřeby takovéto operace je nutno na sloupce šáhnout, musíme opět použít metodu *col* z *pyspark.sql.functions*:

In [52]:
table_from_csv.filter(f.col("age")>=30).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



In [53]:
table_from_csv.filter(f.col("last_name")=="Shakespear").show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Pro znegování podmínky slouží vlnka.

In [54]:
table_from_csv.filter(~(f.col("last_name")=="Shakespear")).show()

+---+----------+---------+----+------+
| id|first_name|last_name| age|gender|
+---+----------+---------+----+------+
|100|    Victor|     Hugo|25.0|     M|
|200|      Mary|   Shelly|30.0|     F|
|300|    Johann|   Geothe|75.0|     M|
|400|    Albert|    Camus|null|     M|
+---+----------+---------+----+------+



Pokud chceme provést filtrování nikoli na základě celého textového řetězce, ale pouze jeho části, použijeme metodu *like*. Ta funguje obdobně jako stejnojmenné klíčové slovo v SQL (tj. například procento má stejnou funkčnost):

In [4]:
table_from_csv.filter(
    f.col("last_name").like("%ug%")
).show()

+---+----------+---------+----+------+
| id|first_name|last_name| age|gender|
+---+----------+---------+----+------+
|100|    Victor|     Hugo|25.0|     M|
+---+----------+---------+----+------+



Pakliže potřebujeme uplatnit složitější podmínku na textové řetězce, aplikujeme regulární výrazy. Konkrétně pattern vkládáme do metody *rlike*:

In [6]:
table_from_csv.filter(
    f.col("last_name").rlike("^\\w{4,5}$")
).show()

+---+----------+---------+----+------+
| id|first_name|last_name| age|gender|
+---+----------+---------+----+------+
|100|    Victor|     Hugo|25.0|     M|
|400|    Albert|    Camus|null|     M|
+---+----------+---------+----+------+



Co se týče použití více podmínek naráz, je situace stejná jako v pandách. Tj. jednotlivé podmínky se vloží do kulatých závorek a spojí se buďto znakem & (v případě "and" logiky), anebo znakem | (v případě "or" logiky).

In [55]:
table_from_csv.filter(
    (f.col("last_name")=="Shakespear")
    | (f.col("first_name")=="Mary")
).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|200|      Mary|    Shelly|30.0|     F|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



In [56]:
table_from_csv.filter(
    (f.col("gender")=="M")
    & (f.col("age")>=30)
).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Pro vybrání řádků s null hodnotami použijeme metodu *isNull* aplikovanou na f.col("jmeno_sloupce"). Podobný modus operandi má metoda *isNotNull*.

In [57]:
table_from_csv.filter(f.col("age").isNull()).show()

+---+----------+---------+----+------+
| id|first_name|last_name| age|gender|
+---+----------+---------+----+------+
|400|    Albert|    Camus|null|     M|
+---+----------+---------+----+------+



In [58]:
table_from_csv.filter(f.col("age").isNotNull()).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Co máme dělat, když chceme selectování a filtrování provést najednou? Zkrátka selecty a filtery napíšeme za sebe. Abychom neměli jeden gigantický řádek, celý příkaz obalíme do kulatých závorek.

In [59]:
(
  table_from_csv
  .filter(f.col("last_name")=="Shakespear")
  .select("first_name", "last_name")
  .show()
)

+----------+----------+
|first_name| last_name|
+----------+----------+
|   William|Shakespear|
+----------+----------+



Pro seřazení řádků podle hodnot v určitém sloupci použijeme *sort*.

In [60]:
table_from_csv.sort("age").show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|400|    Albert|     Camus|null|     M|
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|500|   William|Shakespear|38.0|     M|
|300|    Johann|    Geothe|75.0|     M|
+---+----------+----------+----+------+



Pokud chceme sestupné řazení, přidáme do *sort* parametr *ascending* s hodnotou False.

In [61]:
table_from_csv.sort("age", ascending=False).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|100|    Victor|      Hugo|25.0|     M|
|400|    Albert|     Camus|null|     M|
+---+----------+----------+----+------+



Pokud chceme vidět unikátní hodnoty z určitého sloupce, napřed onen sloupec selectem vybereme a následně uplatníme metodu *distinct*.

In [62]:
table_from_csv.select("gender").distinct().show()

+------+
|gender|
+------+
|     F|
|     M|
+------+



Pro vyhození řádků s nully se použije konstrukce *na.drop*:

In [63]:
table_from_csv.na.drop().show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Naopak pro nahrazení nullů nějakou hodotou použijeme *na.fill*. Té předáme parametr *value* říkající, čím se mají nully nahradit, a parametr *subset* specifikující, na kterých sloupcích má nahrazování probíhat.

In [64]:
table_from_csv.na.fill(value="?", subset=["age"]).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|   ?|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Pro agregované statistiky napřed data shlukneme do tříd s pomocí *groupBy*. Jako parametr se uvede jméno sloupce, podle kterého grupování proběhne. Následně do metody *agg* vložíme metody (obvykle z *pyspark.sql.functions*) statistiky napočítávající. Pokud se nám nelíbí defaultní jména takto vyprodukovaných sloupců, použijeme metodu *alias*.

In [65]:
table_from_csv.groupBy("gender").agg(
  f.count("*"),
  f.min("age"),
  f.max("age"),
  f.avg("id").alias("nonsensical_aggregation")
).show()

+------+--------+--------+--------+-----------------------+
|gender|count(1)|min(age)|max(age)|nonsensical_aggregation|
+------+--------+--------+--------+-----------------------+
|     F|       1|    30.0|    30.0|                  200.0|
|     M|       4|    25.0|    75.0|                  325.0|
+------+--------+--------+--------+-----------------------+



Pro najoinování dvou dataframů na sebe použijeme metodu *join* zavolanou na jednom z nich. Proměnná s druhým dataframem tvoří první parametr metody *join*. Druhý parametr,  *on*, specifikuje joinovací sloupec. Pokud se ten v obou tabulkách jmenuje stejně, má formu obyčejného stringu.

In [66]:
books_list = [
    [100, 5],
    [300, 7],
    [400, 9]
]
books_frame = spark.createDataFrame(
    books_list,
    schema=["id", "books_count"]
)

joined_frame = table_from_csv.join(books_frame, on="id")
joined_frame.show()

+---+----------+---------+----+------+-----------+
| id|first_name|last_name| age|gender|books_count|
+---+----------+---------+----+------+-----------+
|100|    Victor|     Hugo|25.0|     M|          5|
|300|    Johann|   Geothe|75.0|     M|          7|
|400|    Albert|    Camus|null|     M|          9|
+---+----------+---------+----+------+-----------+



Pokud se ale joinovací sloupec v dataframech jmenuje odlišně, obsahuje *on* porovnávání.

In [67]:
books_list = [
    [100, 5],
    [300, 7],
    [400, 9]
]
books_frame = spark.createDataFrame(
    books_list,
    schema=["some_id", "books_count"]
)

joined_frame = table_from_csv.join(
    books_frame, 
    on=table_from_csv["id"]==books_frame["some_id"]
)
joined_frame.show()

+---+----------+---------+----+------+-------+-----------+
| id|first_name|last_name| age|gender|some_id|books_count|
+---+----------+---------+----+------+-------+-----------+
|100|    Victor|     Hugo|25.0|     M|    100|          5|
|300|    Johann|   Geothe|75.0|     M|    300|          7|
|400|    Albert|    Camus|null|     M|    400|          9|
+---+----------+---------+----+------+-------+-----------+



Pakliže chceme jiný typ joinu než inner join, musíme to specifikovat do parametru *how*.

In [68]:
books_list = [
    [100, 5],
    [300, 7],
    [400, 9]
]
books_frame = spark.createDataFrame(
    books_list,
    schema=["id", "books_count"]
)

joined_frame = table_from_csv.join(books_frame, on="id", how="left")
joined_frame.show()

+---+----------+----------+----+------+-----------+
| id|first_name| last_name| age|gender|books_count|
+---+----------+----------+----+------+-----------+
|500|   William|Shakespear|38.0|     M|       null|
|100|    Victor|      Hugo|25.0|     M|          5|
|200|      Mary|    Shelly|30.0|     F|       null|
|400|    Albert|     Camus|null|     M|          9|
|300|    Johann|    Geothe|75.0|     M|          7|
+---+----------+----------+----+------+-----------+



#### PySpark a datumy
Dost často člověk potřebuje pracovat s datumovým datovým typem. Jelikož kvůli tomu se obvykle musí dělat konverze a i počítání rozdílů mezi dvěma dny nebývá úplně zjevné, věnujme této problematice trochu času.  
Nejprve si vytvoříme dataframe. Všimněte si, že ačkoli druhý sloupec obsahuje de facto datumy, pro Spark jsou to stringy.

In [69]:
dates_data = [
    [1,"20.12.2021"],
    [2,"05.01.2022"],
    [3,"08.01.2022"],
    [4,"12.02.2022"]
    
]
dates_frame = spark.createDataFrame(dates_data, ["number","date"])
dates_frame.printSchema()

root
 |-- number: long (nullable = true)
 |-- date: string (nullable = true)



Konverzi provedeme s pomocí *functions.to_date*. První parametrem je konvertovaný sloupec, druhý pak obsahuje předpis popisující původní data. Význam jednotlivých písmen, které se v něm mohou objevit, nalezneme [zde](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html).

In [70]:
from pyspark.sql import functions as f


dates_frame = dates_frame.withColumn(
    "date_converted",
     f.to_date(f.col("date"), "dd.MM.yyyy")
)

dates_frame.printSchema()
dates_frame.show()

root
 |-- number: long (nullable = true)
 |-- date: string (nullable = true)
 |-- date_converted: date (nullable = true)

+------+----------+--------------+
|number|      date|date_converted|
+------+----------+--------------+
|     1|20.12.2021|    2021-12-20|
|     2|05.01.2022|    2022-01-05|
|     3|08.01.2022|    2022-01-08|
|     4|12.02.2022|    2022-02-12|
+------+----------+--------------+



Aktuální datum získáme s pomocí *functions.current_date*.

In [71]:
dates_frame = dates_frame.withColumn(
    "current_date",
     f.current_date()
)

dates_frame.printSchema()
dates_frame.show()

root
 |-- number: long (nullable = true)
 |-- date: string (nullable = true)
 |-- date_converted: date (nullable = true)
 |-- current_date: date (nullable = false)

+------+----------+--------------+------------+
|number|      date|date_converted|current_date|
+------+----------+--------------+------------+
|     1|20.12.2021|    2021-12-20|  2022-07-18|
|     2|05.01.2022|    2022-01-05|  2022-07-18|
|     3|08.01.2022|    2022-01-08|  2022-07-18|
|     4|12.02.2022|    2022-02-12|  2022-07-18|
+------+----------+--------------+------------+



Rozdíl datumů ve dnech spočítáme prostřednictvím *functions.datediff*.

In [73]:
dates_frame.withColumn(
    "current_minus_converted",
     f.datediff(f.col("current_date"), f.col("date_converted"))
).show()

+------+----------+--------------+------------+-----------------------+
|number|      date|date_converted|current_date|current_minus_converted|
+------+----------+--------------+------------+-----------------------+
|     1|20.12.2021|    2021-12-20|  2022-07-18|                    210|
|     2|05.01.2022|    2022-01-05|  2022-07-18|                    194|
|     3|08.01.2022|    2022-01-08|  2022-07-18|                    191|
|     4|12.02.2022|    2022-02-12|  2022-07-18|                    156|
+------+----------+--------------+------------+-----------------------+



Pakliže potřebujeme k datu přičíst či odečíst den či měsíc, sáhneme po *functions.add_months* a *functions.date_add* (obě funkce dokáží přijmout kladné i záporné počty měsíců/dnů).

In [74]:
(
    dates_frame
    .withColumn("added_months", f.add_months(f.col("date_converted"),2))
    .withColumn("substr_months", f.add_months(f.col("date_converted"),-2)) 
    .withColumn("added_days", f.date_add(f.col("date_converted"),15)) 
    .withColumn("substr_days", f.date_add(f.col("date_converted"),-15))
    .show()
)

+------+----------+--------------+------------+------------+-------------+----------+-----------+
|number|      date|date_converted|current_date|added_months|substr_months|added_days|substr_days|
+------+----------+--------------+------------+------------+-------------+----------+-----------+
|     1|20.12.2021|    2021-12-20|  2022-07-18|  2022-02-20|   2021-10-20|2022-01-04| 2021-12-05|
|     2|05.01.2022|    2022-01-05|  2022-07-18|  2022-03-05|   2021-11-05|2022-01-20| 2021-12-21|
|     3|08.01.2022|    2022-01-08|  2022-07-18|  2022-03-08|   2021-11-08|2022-01-23| 2021-12-24|
|     4|12.02.2022|    2022-02-12|  2022-07-18|  2022-04-12|   2021-12-12|2022-02-27| 2022-01-28|
+------+----------+--------------+------------+------------+-------------+----------+-----------+



Poznamenejme, že funkce *add_month* může občas vracet poněkud překvapující výsledky - viz následující tabulka. 

|Original date| Original date + 1 month|
|-|-|
|2022-03-29|2022-04-29|
|2022-03-30|2022-04-30|
|2022-03-31|2022-04-30|
|2022-04-29|2022-05-29|
|2022-04-30|2022-05-31|
|2022-01-31|2022-02-28|
|2022-02-28|2022-03-31|

Existují i funkce na extrakci roku, měsíce, dne atd. z datumu: 

In [75]:
(
    dates_frame
    .withColumn("extracted_year", f.year(f.col("date_converted")))
    .withColumn("extracted_month", f.month(f.col("date_converted"))) 
    .withColumn("extracted_day", f.dayofmonth(f.col("date_converted"))) 
    .withColumn("day_of_year", f.dayofyear(f.col("date_converted")))
    .withColumn("day_of_week", f.dayofweek(f.col("date_converted")))
    .withColumn("week_of_year", f.weekofyear(f.col("date_converted")))
    .show()
)

+------+----------+--------------+------------+--------------+---------------+-------------+-----------+-----------+------------+
|number|      date|date_converted|current_date|extracted_year|extracted_month|extracted_day|day_of_year|day_of_week|week_of_year|
+------+----------+--------------+------------+--------------+---------------+-------------+-----------+-----------+------------+
|     1|20.12.2021|    2021-12-20|  2022-07-18|          2021|             12|           20|        354|          2|          51|
|     2|05.01.2022|    2022-01-05|  2022-07-18|          2022|              1|            5|          5|          4|           1|
|     3|08.01.2022|    2022-01-08|  2022-07-18|          2022|              1|            8|          8|          7|           1|
|     4|12.02.2022|    2022-02-12|  2022-07-18|          2022|              2|           12|         43|          7|           6|
+------+----------+--------------+------------+--------------+---------------+------------

#### Window funkce
Někdy se hodí provádět určité operace nikoli na celém dataframu, ale pouze na jeho části určené buďto hodnotou nějakého sloupce, anebo počtem řádek.  
Nejprve si vytvořme pokusný dataframe:

In [76]:
for_window_list = [
    ["one", 10],
    ["one", 10],
    ["one", 30],
    ["one", 40],
    ["one", 50],
    ["two", 60],
    ["two", 70],
    ["two", 80],
    ["two", 90],
    ["three", 100]
    
]
for_window_frame = spark.createDataFrame(
    for_window_list,
    schema=["word", "number"]
)

První ukázka se bude týkat číslování řádek. Nejprve si vytvoříme definici okna. Budeme chtít pro jednotlivá slova samostatné číslování, proto *partitionBy("word")*. Záznamy v jedné skupině pak chceme setřídit podle hodnoty sloupce "number", proto následuje *orderBy("number")*.   

Čísla řádků budeme chtít umístit do nového sloupce, proto použijeme metodu *withColumn*. Na samotné napočítání vložíme do druhého parametru této sloupec vytvářející metody *functions.row_number().over(window_definition)*.

In [77]:
from pyspark.sql.window import Window
from pyspark.sql import functions as f

window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "row_number",
    f.row_number().over(window_definition)
).show()

+-----+------+----------+
| word|number|row_number|
+-----+------+----------+
|  two|    60|         1|
|  two|    70|         2|
|  two|    80|         3|
|  two|    90|         4|
|  one|    10|         1|
|  one|    10|         2|
|  one|    30|         3|
|  one|    40|         4|
|  one|    50|         5|
|three|   100|         1|
+-----+------+----------+



Pokud bychom chtěli nikoli číslo řádky, ale pořadí čísel ve sloupci "number", použijeme namísto funkce *row_number* funkci *rank*. Rozdíl je patrný pro řádky, kde hodnota sloupce "word" je rovna "one" a hodnota sloupce "number" odpovídá 10. Zatímco u *row_number* měl u sebe jeden řádek jedničku a druhý dvojku, v případě *rank* mají jedničku oba. Následující řádek má ale trojku, tj. pořadí 2 není. 

In [78]:
window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "rank",
    f.rank().over(window_definition)
).show()

+-----+------+----+
| word|number|rank|
+-----+------+----+
|  two|    60|   1|
|  two|    70|   2|
|  two|    80|   3|
|  two|    90|   4|
|  one|    10|   1|
|  one|    10|   1|
|  one|    30|   3|
|  one|    40|   4|
|  one|    50|   5|
|three|   100|   1|
+-----+------+----+



Pokud by nám to vadilo, použijeme namísto *rank* funkci *dense_rank*.

In [79]:
window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "dense_rank",
    f.dense_rank().over(window_definition)
).show()

+-----+------+----------+
| word|number|dense_rank|
+-----+------+----------+
|  two|    60|         1|
|  two|    70|         2|
|  two|    80|         3|
|  two|    90|         4|
|  one|    10|         1|
|  one|    10|         1|
|  one|    30|         2|
|  one|    40|         3|
|  one|    50|         4|
|three|   100|         1|
+-----+------+----------+



Pokud potřebujeme mít sloupec, který bude o jednu či více řádek posunutý níže, aplikujeme funkci *lag*. Kde nebude co dát, tam se objeví null. Pokud bychom chtěli mít sloupec posunout opačným směrem (tj. o N řádků výše), použijeme funkci *lead*.

In [80]:
window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "lagged_column",
    f.lag("number", 1).over(window_definition)
).show()

+-----+------+-------------+
| word|number|lagged_column|
+-----+------+-------------+
|  two|    60|         null|
|  two|    70|           60|
|  two|    80|           70|
|  two|    90|           80|
|  one|    10|         null|
|  one|    10|           10|
|  one|    30|           10|
|  one|    40|           30|
|  one|    50|           40|
|three|   100|         null|
+-----+------+-------------+



Pakliže potřebujeme kumulativní sumu, aplikujeme *functions.sum*.

In [81]:
window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "cumulative_sum",
    f.sum("number").over(window_definition)
).show()

+-----+------+--------------+
| word|number|cumulative_sum|
+-----+------+--------------+
|  two|    60|            60|
|  two|    70|           130|
|  two|    80|           210|
|  two|    90|           300|
|  one|    10|            20|
|  one|    10|            20|
|  one|    30|            50|
|  one|    40|            90|
|  one|    50|           140|
|three|   100|           100|
+-----+------+--------------+



### SQL přístup
Doposud jsme si ukazovali práci s PySparkem, která se svým modem operandi dosti podobala práci s pandami. Nicméně existuje ještě jeden přístup, který se podobá zacházení s SQL databázemi.  
Nejprve musíme na dataframu provolat metodu *createOrReplaceTempView*. Té předáme jen jeden parametr - jméno onoho dočasného pohledu.

In [45]:
table_from_csv.createOrReplaceTempView("some_sql_table")

Tento pohled můžeme posléze používat v sql dotazech, které vložíme jako parametr do metody *sql*, která je napojená na samotnou pysparkovou sešnu. Výsledkem je další pysparkový dataframe, na který, pokud chceme vidět jeho obsah, musíme napojit metodu *show*.

In [46]:
spark.sql("select * from some_sql_table").show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



### Cachování
Při psaní této sekce jsem extenzivně čerpal  [odsud](https://towardsdatascience.com/best-practices-for-caching-in-spark-sql-b22fb0f02d34?gi=407c957f2bcb) - doporučuji přečíst.  
Představme si, že bychom s pomocí selectování a filtrování vyrobili z nějakého dataframu dataframe menší a ten umístili do nové proměnné. Když bychom na této nové tabulce začali provádět nějaké akce, dojde bohužel k tomu, že se vytváření onoho redukovaného dataframu provede pro každou z těchto akcí zvlášť. Aby se zbytečně neplýtvalo výpočetními prostředky, existuje možnost si dataframe nacachovat, tj. uložit do paměti/na harddisk pro budoucí využití.  
Cachování se realizuje provoláním dataframové metody *cache* resp. *persist*. Zdůrazněme, že tyto metody jsou [de facto transformací](https://docs.microsoft.com/en-us/azure/databricks/kb/scala/best-practice-cache-count-take), takže k samotné operaci cachování dojde až po provolání nějaké akce.

In [43]:
table_from_csv.cache()

DataFrame[id: string, first_name: string, last_name: string, age: string, gender: string]

Rozdíl mezi nimi spočívá jen v tom, že u persistu lze specifikovat, kam přesně budou data uložena. Nicméně ve většině případů si člověk vystačí s defaultním MEMORY_AND_DISK, tj. data se prioritně ukládají do RAMky a když ta dojde, míří zbytek na disk.  
V případě panda-like přístupu je cachování lazy transformace. U SQL přístupu, který má podobu

In [47]:
spark.sql("cache table some_sql_table")

DataFrame[]

to ale neplatí - zde máme co do činění s eager operací, tj. potřebná posloupnost transformací ihned proběhne a umístí data do úložiště. Nicméně toto defaultní chování máme možnost změnit na lazy s použitím odpovídajícího klíčového slova 

In [86]:
spark.sql("cache lazy table some_sql_table")

DataFrame[]

Pro zjištění, zda je dataframe nacachovaný, se použije atribut *is_cached*:

In [44]:
table_from_csv.is_cached

True

Pokud bychom ručně chtěli data z cache odstranit, použijeme

In [88]:
table_from_csv.unpersist()

DataFrame[id: string, first_name: string, last_name: string, age: string, gender: string]

resp. 

In [48]:
spark.sql("uncache table some_sql_table")

DataFrame[]

Předejdeme tak situaci, kdy dojde paměť a Spark samotný začne vyhazovat tabulky na základě míry jejich nepoužívanosti. Navíc čím víc je volné paměti, tím operace probíhají rychleji (není nutné swapovat na disk). Ze stejného důvodu může být nastaven administrátorem PySparkového clusteru timeout (k nalezení v případě Cloudery v Cloudera Manageru v sekci Spark -> Configuration -> proměnná spark.dynamicAllocation.cachedExecutorIdleTimeout) - pokud se na dataframe nesáhlo po dobu delší než je tento časový úsek, je onen dataframe z cache automaticky odstraněn.  
Stejnou motivaci má i rada cachovat nikoli celé dataframy, ale pouze ty jejich části, které potřebujeme.  
Zmiňme nakonec, že ne vždy cachování vede k rychlejšímu běhu programu. Pokud bychom měli relativně velký parquet soubor a prováděli na něm filtrování, asi tato operace odběhne rychleji, než kdyby se musel napřed celý načíst do paměti (nebo dokonce swapovat na disk).

## Machine learning
ML algoritmy jsou vedle manipulace s daty druhou důležitou věcí, které PySpark umožňuje. Nicméně hned na začátku člověk narazí na ten problém, že tu existují dvě podobně vyhlížející knihovny - MLLib (pyspark.mllib) a ML (pyspark.ml). První z nich je starší, založená na práci s RDD a již se nevyvíjí. Proto zde budeme mluvit pouze o ML.  
Pro začátek se podíváme na něco triviálního - vytvoříme model pro klasifikaci kosatců. Data seženeme [zde](https://archive.ics.uci.edu/ml/datasets/iris). Všimněme si, že v datovém souboru není přítomna hlavička. Musíme tak datové schéma definovat ručně, i když by PySpark asi datové typy určil správně.

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

file_table_schema = StructType([
    StructField("sepal_length_cm", DoubleType()),
    StructField("sepal_width_cm", DoubleType()),
    StructField("petal_length_cm", DoubleType()),
    StructField("petal_width_cm", DoubleType()),
    StructField("flower_class", StringType())
])


iris_frame = (
    spark.read.csv(
        path="iris.data",
        header=False,
        schema=file_table_schema
    )
)

Data rozdělíme na trénovací a testovací množinu pomocí metody *randomSplit*, která je navázána na pysparkovský dataframe. Abychom byli z hlediska opakovaných experimentů konzistentní, zafixujeme i *seed*.

In [4]:
iris_train, iris_test = iris_frame.randomSplit([0.8, 0.2], seed=42)

### String indexer
PySparkovské modely se narozdíl od těch scikit-learnovských nedokáží srovnat s tím, že by byla třída záznamu uložená jako string. Pokud bychom se o to přesto pokusili, koukali bychom o pár buněk níže při samotném trénování na chybovou hlášku
```
(IllegalArgumentException: requirement failed: Column flower_class must be of type numeric but was actually of type string.).
```
Na převod třídy-stringu na třídu-číslo použijeme *StringIndexer*. Ten třídám defaultně přidělí čísla z intervalu \[0, počet tříd). Přitom defaultně řazení bude podle míry výskytu, tj. nejzastoupenější třída bude mít index 0. Nicméně abychom věděli, které třídě odpovídá které číslo, asi bude lepší použít parametr *stringOrderType*, kterému podhodíme hodnotu "alphabetAsc".  
Zdůrazněme jednu obecnou věc vlastní pysparkovským konstrukcím s *fit* a *transform*. Bez řádku, kde do dedikované proměnné (*string_indexer_iris_fit*) vkládáme výsledek *fit* metody, by nám kód nefungoval. Narozdíl od scikit-learnu totiž samotné odpálení totiž nevede k zpřístupnění transform metody.  

In [5]:
from pyspark.ml.feature import StringIndexer

string_indexer_iris = StringIndexer(
    inputCol="flower_class", 
    outputCol="flower_class_indexed", 
    stringOrderType="alphabetAsc"
)
string_indexer_iris_fit = string_indexer_iris.fit(iris_train)
iris_train = string_indexer_iris_fit.transform(iris_train)
iris_test = string_indexer_iris_fit.transform(iris_test)

iris_train.show(n=5)

+---------------+--------------+---------------+--------------+------------+--------------------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|flower_class|flower_class_indexed|
+---------------+--------------+---------------+--------------+------------+--------------------+
|            4.3|           3.0|            1.1|           0.1| Iris-setosa|                 0.0|
|            4.4|           2.9|            1.4|           0.2| Iris-setosa|                 0.0|
|            4.4|           3.2|            1.3|           0.2| Iris-setosa|                 0.0|
|            4.5|           2.3|            1.3|           0.3| Iris-setosa|                 0.0|
|            4.6|           3.1|            1.5|           0.2| Iris-setosa|                 0.0|
+---------------+--------------+---------------+--------------+------------+--------------------+
only showing top 5 rows



### Vektorizace
Nyní bychom rádi provedli škálování (scaling). U škálovacího objektu v Pysparku je věcí, kterou se předávají jména upravovaných sloupců, parametr *inputCol*. Už z jednotného čísla v názvu tohoto parametru je vidět, že list s názvy sloupců očekáván není. Neočekává se ale ani string s názvem jednoho obyčejného sloupce plného doublů či integerů. Škálované totiž mohou být pouze vektorizované sloupce umístěné v jednom výsledném sloupci. I když se ale více sloupců sloučí do jednoho, jejich data se stále naštěstí škálují separátně.  
Co se ale myslí tou vektorizací? Hodnoty sloupců platných pro konkrétní záznam (alias jeden řádek tabulky) se umístí do vektoru (plus mínus je vektor jednorozměrné pole). Pro praktickou realizaci této operace musíme použít *VectorAssembler*, do kterého nasázíme list vstupních sloupců a jméno sloupce výstupního. Následně na takto vytvořeném objektu zavoláme metodu *transform* (*fit* tu absentuje).

In [6]:
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(
    inputCols=["sepal_length_cm", "sepal_width_cm", "petal_length_cm", "petal_width_cm"],
    outputCol="vectorized_params"
)

iris_train_vec = vector_assembler.transform(iris_train)
iris_test_vec = vector_assembler.transform(iris_test)

iris_train_vec.show(n=5)

+---------------+--------------+---------------+--------------+------------+--------------------+-----------------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|flower_class|flower_class_indexed|vectorized_params|
+---------------+--------------+---------------+--------------+------------+--------------------+-----------------+
|            4.3|           3.0|            1.1|           0.1| Iris-setosa|                 0.0|[4.3,3.0,1.1,0.1]|
|            4.4|           2.9|            1.4|           0.2| Iris-setosa|                 0.0|[4.4,2.9,1.4,0.2]|
|            4.4|           3.2|            1.3|           0.2| Iris-setosa|                 0.0|[4.4,3.2,1.3,0.2]|
|            4.5|           2.3|            1.3|           0.3| Iris-setosa|                 0.0|[4.5,2.3,1.3,0.3]|
|            4.6|           3.1|            1.5|           0.2| Iris-setosa|                 0.0|[4.6,3.1,1.5,0.2]|
+---------------+--------------+---------------+--------------+---------

### Scaling
Nyní už můžeme provést škálování. Škálovacích objektů existuje více (např *MinMaxScaler*), my zde použije *StandardScaler*. Pokud zapomeneme do scaleru uvést jméno výstupního sloupce, bude se jeho jméno nést v duchu "MinMaxScaler_8c8aaa300eb8__output". Pozor - u StandardScaleru nejsou *withMean*=True, *withStd*=True defaulty, ale bez nich výstupy moc nedávají smysl.

In [7]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="vectorized_params", outputCol="scaled_params", withMean=True, withStd=True)
scaler_fit = scaler.fit(iris_train_vec)
iris_train_scal = scaler_fit.transform(iris_train_vec)
iris_test_scal = scaler_fit.transform(iris_test_vec)

print(scaler_fit.mean, scaler_fit.std)

iris_train_scal.select("vectorized_params", "scaled_params").show(n=5)

[5.896031746031745,3.041269841269841,3.8515873015873,1.2301587301587302] [0.838799217324461,0.43968531315382203,1.742170330454936,0.7524115726138021]
+-----------------+--------------------+
|vectorized_params|       scaled_params|
+-----------------+--------------------+
|[4.3,3.0,1.1,0.1]|[-1.9027577912180...|
|[4.4,2.9,1.4,0.2]|[-1.7835397496002...|
|[4.4,3.2,1.3,0.2]|[-1.7835397496002...|
|[4.5,2.3,1.3,0.3]|[-1.6643217079823...|
|[4.6,3.1,1.5,0.2]|[-1.5451036663645...|
+-----------------+--------------------+
only showing top 5 rows



### Trénování modelu
Teď již máme data připravená k tomu, abychom je protáhli ML algortimem. Pro jednoduchost použijeme logistickou regresi. Přesněji tedy *LogisticRegression*, kterému podhodíme jednak vektorizovaný škálovaný mnohosloupec s featury (parametr *featuresCol*), jednak sloupec s štítkem třídy (parametr *labelCol*). Opět si vytvoříme separátní objekt s nafitovaným modelem, na kterém poté provoláme *transform* metodu. Ta do výsledného dataframu jednak zkopíruje svůj vstup (trénovací či testovací dataframy), jednak k němu přilepí sloupce rawProbability (hodnota, která jde do logistické funkce), probability (pravděpodobnostit toho, že záznam patří do  jednotlivých tříd) a prediction (index třídy s největší pravděpodobností).

In [8]:
from pyspark.ml.classification import LogisticRegression

log_reg_model = LogisticRegression(featuresCol="scaled_params", labelCol="flower_class_indexed")
log_reg_model_fitted = log_reg_model.fit(iris_train_scal)

predictions_train = log_reg_model_fitted.transform(iris_train_scal)
predictions_test = log_reg_model_fitted.transform(iris_test_scal)

predictions_train.select(
    "flower_class", "flower_class_indexed", "rawPrediction","probability","prediction"
).show(n=5)

+------------+--------------------+--------------------+--------------------+----------+
|flower_class|flower_class_indexed|       rawPrediction|         probability|prediction|
+------------+--------------------+--------------------+--------------------+----------+
| Iris-setosa|                 0.0|[72.9291354525068...|[1.0,4.6345127662...|       0.0|
| Iris-setosa|                 0.0|[61.5820297479640...|[1.0,1.0695348023...|       0.0|
| Iris-setosa|                 0.0|[74.4702770586939...|[1.0,1.8428463586...|       0.0|
| Iris-setosa|                 0.0|[36.0972767602046...|[0.99999999999995...|       0.0|
| Iris-setosa|                 0.0|[66.4935520663596...|[1.0,1.0682311689...|       0.0|
+------------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



Pakliže bychom potřebovali znát parametry modelu logistické regrese, dostaneme se k nim skrze atributy *coefficientMatrix* a *interceptVector*.

In [9]:
log_reg_model_fitted.coefficientMatrix

DenseMatrix(3, 4, [-6.3791, 16.9901, -22.5747, -21.3316, 4.1024, -7.0033, 3.658, 4.4611, 2.2767, -9.9868, 18.9167, 16.8706], 1)

In [10]:
log_reg_model_fitted.interceptVector

DenseVector([-5.3097, 11.1964, -5.8866])

### Vyhodnocení přesnosti modelu
Natrénováno i oštítkováno máme, nyní bychom rádi predikce vyhodnotili. Nejprve bychom chtěli accuracy, precision, recall a f1. Jenže nemáme binární problém, třídy jsou tři. Vzpomeňme, že u scikit-learnu jsme měli pro každou metriku speciální funkci, u které jsme u vícetřídových problémů museli nastavovat parametr average. Pokud se ten nastavil jako "macro", spočítaly se pro každou třídu TP, FP a FN, sečetly se dohromady a na základě těchto součtů se metriky spočítaly. Oproti tomu pro average="micro" se pro každou třídu spočítaly metriky separátně a tyto metriky se poté zprůměrovaly.  
Jaký z těchto postupů byl vybrán ve Sparku? Toť otázkou - popravdě ani jednou z metod nedokážu dosáhnout přesně těch hodnot, které Spark poskytuje. Nicméně možná je ve hře i nějaké zaokrouhlení - potom bych tipoval, že tu máme micro přístup.   
Člověk musí v za metriky odpovědném objektu nastavit s pomocí *metricLabel* třídu, pro kterou se spočítá metrika uvedená v parametru *metricName*. Zde zdůrazněme, že můžeme chtít jak metriky brané dohromady za všechny třídy ("accuracy",  "weightedPrecision", "weightedRecall", "f1" - tehdy pochopitelně *metricLabel* nehraje roli), tak metriky vztažené vždy k jedné konkrétní třídě ("precisionByLabel", "recallByLabel","fMeasureByLabel" atd.). Dále se musí stanovit jméno sloupce s reálnou třídou (parametr *labelCol*) a s predikcí (parametr *predictionCol*). Následně se na takto vzniklém objektu zavolá metoda *evaluate*, které se podhodí dataframe, na kterém vyhodnocení má proběhnout.

In [11]:
#for PySpark 3
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

metrics_list = [
    "accuracy",  "weightedPrecision", "weightedRecall", "f1",
    "truePositiveRateByLabel", "falsePositiveRateByLabel",
    "precisionByLabel", "recallByLabel","fMeasureByLabel"
]
classes_list = [0, 1, 2]

for one_class in classes_list:
    for one_metrics in metrics_list:
        evaluator = MulticlassClassificationEvaluator(
            metricName=one_metrics, 
            labelCol="flower_class_indexed", 
            predictionCol="prediction",
            metricLabel=one_class
        )
        
        metrics_train_value = evaluator.evaluate(predictions_train)
        metrics_test_value = evaluator.evaluate(predictions_test)
        
        print(
            f"Metrics: {one_metrics}, class: {one_class}, "
            f"value on train: {metrics_train_value}, "
            f"value on test: {metrics_test_value}"
        )
    print()


Metrics: accuracy, class: 0, value on train: 0.9841269841269841, value on test: 1.0
Metrics: weightedPrecision, class: 0, value on train: 0.9841269841269842, value on test: 1.0
Metrics: weightedRecall, class: 0, value on train: 0.9841269841269842, value on test: 1.0
Metrics: f1, class: 0, value on train: 0.9841269841269842, value on test: 1.0
Metrics: truePositiveRateByLabel, class: 0, value on train: 1.0, value on test: 1.0
Metrics: falsePositiveRateByLabel, class: 0, value on train: 0.0, value on test: 0.0
Metrics: precisionByLabel, class: 0, value on train: 1.0, value on test: 1.0
Metrics: recallByLabel, class: 0, value on train: 1.0, value on test: 1.0
Metrics: fMeasureByLabel, class: 0, value on train: 1.0, value on test: 1.0

Metrics: accuracy, class: 1, value on train: 0.9841269841269841, value on test: 1.0
Metrics: weightedPrecision, class: 1, value on train: 0.9841269841269842, value on test: 1.0
Metrics: weightedRecall, class: 1, value on train: 0.9841269841269842, value on t

In [14]:
(39+43+42)/(39+43+42+1)

0.992

In [None]:
(0.9767441860465116*0.9767441860465116)/2

In [12]:
(1+0.9772727272727273+0.9767441860465116)/3

0.9846723044397464

Výše uvedený kód bohužel v PySparku 2 nebude fungovat. Dvojkový Spark "něcoByLabel" nezná, nýbrž pracuje se zprůměrovanými metrikami přes všechny třídy. Kód by tudíž vypadal následovně:

In [None]:
#for PySpark 2
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

metrics_list = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]


for one_metrics in metrics_list:
    evaluator = MulticlassClassificationEvaluator(
        metricName=one_metrics, 
        labelCol="flower_class_indexed", 
        predictionCol="prediction",
        metricLabel=one_class
    )
        
    metrics_train_value = evaluator.evaluate(predictions_train)
    metrics_test_value = evaluator.evaluate(predictions_test)
        
    print(
        f"Metrics: {one_metrics}, "
        f"value on train: {metrics_train_value}, "
        f"value on test: {metrics_test_value}"
    )
print()

Ok, možná bychom k těmto číslům byli zpočátku nedůvěřiví. V rámci ověření bychom si chtěli vytvořit confusion matici a z ní vybrat čísla pro definici výpočtu metrik. Jenže jak onu matici vyrobit? V MLLib sice existuje *MulticlassMetrics.confusionMatrix*, ale to je jednak zastaralé, jednak určené pro RDD a ne pro dataframy. Správným řešením je metoda *crosstab* vlastní pysparkovským dataframům. Ta jak první parametr přebírá jméno sloupce, jehož hodnoty budou tvořit řádky, zatímco druhý parametr bude jméno sloupce zodpovědného za sloupce:

In [13]:
predictions_train.crosstab("flower_class_indexed", "prediction").show()

+-------------------------------+---+---+---+
|flower_class_indexed_prediction|0.0|1.0|2.0|
+-------------------------------+---+---+---+
|                            2.0|  0|  1| 42|
|                            1.0|  0| 43|  1|
|                            0.0| 39|  0|  0|
+-------------------------------+---+---+---+



Pokud chceme spočítat pro multiclass model AUC (area under curve of receiver operator characteristic), tak... to nebude úplně lehké. Zejména pokud pracujeme se starší verzí PySparku. Řešení, které tu ukazuji, totiž používá funkci *vector_to_array*, která je v PySparku až od verze 3.0.0.  
A k čemu vlastně onu funkci potřebujeme? AUC se (by definition) dá spočítat jen pro binární problém, tj. musíme mít k dispozici pravděpodobnost, že záznam patří do třídy X, a flag (0/1), že tomu tak opravdu je. Jenže v predikčních dataframech máme uložené pravděpodobnosti dohromady a navíc právě jako vektor. Z tohoto vektorového sloupce si tudíž napřed vytvoříme array sloupec, který posléze roztrhneme na tři části. Taktéž si ze sloupce s třídou vytvoříme tři flagové sloupce.   

In [14]:
from pyspark.ml.functions import vector_to_array 
from pyspark.sql import functions as f

(
    predictions_train
    .withColumn("probability_array", vector_to_array("probability"))
    .select([f.col("probability_array")[class_index] for class_index in range(3)] + ["flower_class_indexed"])
    .withColumn("is_class_zero",f.when(f.col("flower_class_indexed") == 0,1.0).otherwise(0.0))
    .withColumn("is_class_one",f.when(f.col("flower_class_indexed") == 1,1.0).otherwise(0.0))
    .withColumn("is_class_two",f.when(f.col("flower_class_indexed") == 2,1.0).otherwise(0.0))
    .show(n=5)
)

+--------------------+--------------------+--------------------+--------------------+-------------+------------+------------+
|probability_array[0]|probability_array[1]|probability_array[2]|flower_class_indexed|is_class_zero|is_class_one|is_class_two|
+--------------------+--------------------+--------------------+--------------------+-------------+------------+------------+
|                 1.0|4.634512766260516...|2.069343685718057...|                 0.0|          1.0|         0.0|         0.0|
|                 1.0|1.069534802345435...|5.452528787959966...|                 0.0|          1.0|         0.0|         0.0|
|                 1.0|1.842846358623886...|5.109985808186763...|                 0.0|          1.0|         0.0|         0.0|
|  0.9999999999999578|4.229003000406716...|2.204030908750582...|                 0.0|          1.0|         0.0|         0.0|
|                 1.0|1.068231168988486...|2.177642182715996...|                 0.0|          1.0|         0.0|      

Metrika AUC je v PySpakrku dostupná skrze *BinaryClassificationEvaluator*. U toho se musí nastavit *metricName* jako "areaUnderROC" a *labelCol* jako sloupec s 0/1 flagem pro danou třídu, pro kterou chceme AUC spočítat. Nakonec do rawPredictionCol se musí vložit sloupec s pravděpodobností dané třídy. Člověk by si mohl myslet (i po nahlédnutí do dokumentace), že do tohoto parametru by spíš patřila rawPrediction dané třídy, ale s tím to prostě nedává očekávané výsledky.

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

for_auc_train_frame = (
    predictions_train
    .withColumn("probability_array", vector_to_array("probability"))
    .select([f.col("probability_array")[class_index] for class_index in range(3)] + ["flower_class_indexed"])
    .withColumn("is_class_zero",f.when(f.col("flower_class_indexed") == 0,1.0).otherwise(0.0))
    .withColumn("is_class_one",f.when(f.col("flower_class_indexed") == 1,1.0).otherwise(0.0))
    .withColumn("is_class_two",f.when(f.col("flower_class_indexed") == 2,1.0).otherwise(0.0))
    )

col_names = [
    ("probability_array[0]", "is_class_zero"),
    ("probability_array[1]", "is_class_one"),
    ("probability_array[2]", "is_class_two")
]

for class_index, one_name_tuple in enumerate(col_names):
    
    binary_evaluator = BinaryClassificationEvaluator(
        metricName="areaUnderROC",
        rawPredictionCol=one_name_tuple[0],
        labelCol=one_name_tuple[1]
    )
    
    metrics_train_value = binary_evaluator.evaluate(for_auc_train_frame)
    
    print(f"AUC for class {class_index}: {metrics_train_value}")

AUC for class 0: 1.0
AUC for class 1: 0.9983370288248337
AUC for class 2: 0.9983188568226394


### Feature selection
Představme si, že bychom měli více než 4 feature sloupce a potřebovali jejich počet redukovat. Na to lze použít [chi-squared test] (https://en.wikipedia.org/wiki/Chi-squared_test) implementovaný v podobě třídy *ChiSqSelector*. Do jejího konstruktoru vložíme parametr *numTopFeaturesnumTopFeatures* říkající, kolik nejdůležitějších featurů chceme na výstupu mít. Alternativně by šlo uvést jejich procento v parametru *percentile* či by šlo použít pár dalších podmínek. Do parametru  *featuresCol* se vloží jméno vektorizovaného sloupce s featurami, *outputCol* bude obsahovat jméno nového sloupce obsahujícího jen vybrané featury a *labelCol* jméno sloupce s indexy tříd. Následuje obvyklé kolečko s fitem a transformací framu.  
Popravdě mi není úplně jasné, proč následující kód nevyvolá chybu. Přeci jen naše featury obsahují reálná čísla a nikoli diskrétní kategorie, které by chi-square z definice vyžadovalo. Možná se v implementaci provádí bucketování? Anebo zkrátka v definici třídy žádná kontrola není a dostáváme úplné nesmysly :D, což je asi nejpravděpodobnější varianta.

In [16]:
from pyspark.ml.feature import ChiSqSelector

selector = ChiSqSelector(
    numTopFeatures=2, 
    featuresCol="scaled_params", 
    outputCol="selected_scaled_params",
    labelCol="flower_class_indexed"
)

fitted_selector = selector.fit(iris_train_scal)
selected_frame = fitted_selector.transform(iris_train_scal)

selected_frame.select("scaled_params","selected_scaled_params").show(n=2, truncate=False)

+----------------------------------------------------------------------------------+-----------------------------------------+
|scaled_params                                                                     |selected_scaled_params                   |
+----------------------------------------------------------------------------------+-----------------------------------------+
|[-1.9027577912180789,-0.09386222381142634,-1.5794019984651975,-1.5020485746021588]|[-1.5794019984651975,-1.5020485746021588]|
|[-1.7835397496002372,-0.3212976122775758,-1.4072029919985565,-1.3691425911752826] |[-1.4072029919985565,-1.3691425911752826]|
+----------------------------------------------------------------------------------+-----------------------------------------+
only showing top 2 rows



Pokud chceme vědět, které sloupce výběrem prošly, podíváme se u nafitovaného selectoru na atribut *selectedFeatures*.

In [17]:
fitted_selector.selectedFeatures

[2, 3]

Podle dokumentace je nicméně od verze 3.1.0 *ChiSqSelector* ve stavu deprecated s tím, že by se měl namísto něho používat *UnivariateFeatureSelector* (v PySparku od verze 3.1.1). V jeho konstruktoru musíme v parametru *selectionMode* specifikovat,  jakých způsobem chceme stanovit množství sloupců, které selectorem prolezou - v příkladu volíme "numTopFeatures". Avšak počet sloupců se nenastavuje v konstuktoru, ale až posléze v metodě *setSelectionThreshold*. Na stejném místě se s pomocí metod *setFeatureType* a *setLabelType* PySparku říká, zda jsou featury a labely kontinuální či kategorické. Na základě toho se PySpark pokusí o redukci sloupců pomocí chi-square (featury i labely kategorické), ANOVY (featury spojité, labely kategorické) či jakési F-value (featury i labely spojité).

In [18]:
#only PySpark 3
from pyspark.ml.feature import UnivariateFeatureSelector

selector = UnivariateFeatureSelector(
    selectionMode="numTopFeatures", 
    featuresCol="scaled_params", 
    outputCol="selected_scaled_params",
    labelCol="flower_class_indexed"
)
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(2)
fitted_selector = selector.fit(iris_train_scal)
selected_frame = fitted_selector.transform(iris_train_scal)

selected_frame.select("scaled_params","selected_scaled_params").show(n=2, truncate=False)

+----------------------------------------------------------------------------------+-----------------------------------------+
|scaled_params                                                                     |selected_scaled_params                   |
+----------------------------------------------------------------------------------+-----------------------------------------+
|[-1.9027577912180789,-0.09386222381142634,-1.5794019984651975,-1.5020485746021588]|[-1.9027577912180789,-1.5794019984651975]|
|[-1.7835397496002372,-0.3212976122775758,-1.4072029919985565,-1.3691425911752826] |[-1.7835397496002372,-1.4072029919985565]|
+----------------------------------------------------------------------------------+-----------------------------------------+
only showing top 2 rows



In [19]:
fitted_selector.selectedFeatures

[0, 2]

### Gridsearch
I relativně jednoduché modely mají hromadu parametrů. Jejich vysvětlení lze nahlédnout s použitím metody *explainParams*:

In [20]:
log_reg_model_fitted.explainParams()

"aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)\nelasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)\nfamily: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)\nfeaturesCol: features column name. (default: features, current: scaled_params)\nfitIntercept: whether to fit an intercept term. (default: True)\nlabelCol: label column name. (default: label, current: flower_class_indexed)\nlowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)\nlowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound c

Faktické hodnoty těchto parametrů obdržíme po provolání metody *extractParamMap*:

In [21]:
log_reg_model_fitted.extractParamMap()

{Param(parent='LogisticRegression_01b9ff7ff564', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_01b9ff7ff564', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_01b9ff7ff564', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_01b9ff7ff564', name='featuresCol', doc='features column name.'): 'scaled_params',
 Param(parent='LogisticRegression_01b9ff7ff564', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_01b9ff7ff564', name='labelCol', doc='label column name.'): 'flower_class_indexed',
 Param(parent='LogisticRegression_01b9ff7ff564', name='maxBlockSizeInMB', doc='maximu

Pokud chceme znát hodnotu jen jednoho parametru, můžeme taktéž použít provolání typu jmeno_modelu.\_java_obj.parent().getXYZ, kde XYZ je název parametru (první písmeno slova za getem je velké!):

In [22]:
log_reg_model_fitted._java_obj.parent().getRegParam()

0.0

V případě, kdy chceme vyzkoušet, jak bude model performovat pro větší počet hodnot různých parametrů, musíme sáhnout po crossvalidaci. Nejprve si vyrobíme instanci chtěného modelu. Následně vytvoříme mapu parametrů s pomocí *ParamGridBuilder*. Parametry k testování do něj vkládáme pomocí metody *addGrid* - u ní je prvním parametrem kýžený parametr modelu, druhým parametrem pak list se seznamem potenciálních hodnot parametru (to je tedy věta). Na konci tohoto řetězu musí být metoda *build*. Posléze si vytvoříme evaluátor, podle kterého budeme jednotlivé model s různými parametry poměřovat. Nakonec vyrobíme cross validátor, do kterého nasypeme instanci modelu (parametr *estimator*), seznam parametrů a jejich hodnot (*estimatorParamMaps*), evaluátor (*evaluator*) a počet foldů, na kterých bude cross validace probíhat (*numFolds*). Nyní se k cross validátoru chováme jako k obyčejnému modelu, tj. následuje klasické kolečko fitu a transformace, přičemž na konci je predikční dataframe vyrobený modelem s nejlepšími parametry.

In [23]:
#PySpark 3
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

log_reg_model = LogisticRegression(featuresCol="scaled_params", labelCol="flower_class_indexed")

parameter_grid = (
    ParamGridBuilder()
    .addGrid(log_reg_model.elasticNetParam, [0, 1])
    .addGrid(log_reg_model.regParam, [5, 10, 100])
    .build()
)

multi_evaluator = MulticlassClassificationEvaluator(
    metricName="f1", 
    labelCol="flower_class_indexed", 
    predictionCol="prediction",
    metricLabel=0
)

crossvalidator = CrossValidator(
    estimator=log_reg_model,
    estimatorParamMaps=parameter_grid,
    evaluator=multi_evaluator,
    numFolds=5    
)

crossval_model = crossvalidator.fit(iris_train_scal)
predictions_train = crossval_model.transform(iris_train_scal)

predictions_train.show(n=2)

+---------------+--------------+---------------+--------------+------------+--------------------+-----------------+--------------------+--------------------+--------------------+----------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|flower_class|flower_class_indexed|vectorized_params|       scaled_params|       rawPrediction|         probability|prediction|
+---------------+--------------+---------------+--------------+------------+--------------------+-----------------+--------------------+--------------------+--------------------+----------+
|            4.3|           3.0|            1.1|           0.1| Iris-setosa|                 0.0|[4.3,3.0,1.1,0.1]|[-1.9027577912180...|[0.23203651806669...|[0.41241663443775...|       0.0|
|            4.4|           2.9|            1.4|           0.2| Iris-setosa|                 0.0|[4.4,2.9,1.4,0.2]|[-1.7835397496002...|[0.19368770028198...|[0.39875039812181...|       0.0|
+---------------+--------------+---------------+--

V dvojkovém PySparku nemůžeme mít v *MulticlassClassificationEvaluator* parametr *metricLabel*:

In [None]:
#PySpark 2
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

log_reg_model = LogisticRegression(featuresCol="scaled_params", labelCol="flower_class_indexed")

parameter_grid = (
    ParamGridBuilder()
    .addGrid(log_reg_model.elasticNetParam, [0, 1])
    .addGrid(log_reg_model.regParam, [5, 10, 100])
    .build()
)

multi_evaluator = MulticlassClassificationEvaluator(
    metricName="f1", 
    labelCol="flower_class_indexed", 
    predictionCol="prediction"
)

crossvalidator = CrossValidator(
    estimator=log_reg_model,
    estimatorParamMaps=parameter_grid,
    evaluator=multi_evaluator,
    numFolds=5    
)

crossval_model = crossvalidator.fit(iris_train_scal)
predictions_train = crossval_model.transform(iris_train_scal)

predictions_train.show(n=2)

Nejlepší model můžeme explicitně vybrat z atributu nafitovaného cross validátoru *bestModel*. Zvolené hodnoty atributů si opět zobrazíme pomocí *extractParamMap*.

In [24]:
best_model = crossval_model.bestModel
best_model.extractParamMap()

{Param(parent='LogisticRegression_62751db598fc', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_62751db598fc', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_62751db598fc', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_62751db598fc', name='featuresCol', doc='features column name.'): 'scaled_params',
 Param(parent='LogisticRegression_62751db598fc', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_62751db598fc', name='labelCol', doc='label column name.'): 'flower_class_indexed',
 Param(parent='LogisticRegression_62751db598fc', name='maxBlockSizeInMB', doc='maximu

### Pipelina
V praxi nebudeme chtít mít model a nutné data zpracovávací operace rozstrkané v jednotlivých objektech. Namísto toho si budeme přát mít všechno uložené v jednom objektu - v pipelině. Objekty zpracovávající sloupce (ať už featury, anebo labely) si vyrobíme jako obvykle. Pouze v případech, kdy jeden objekt-operace bude používat sloupce vytvořené operací předešlou, nenapíšeme do parametru *inputCol* jméno sloupce jako string, ale *nazev_predesleho_objektu_v_pipeline.getOutputCol()*. Nakonec si vytvoříme objekt typu *Pipeline*, do jejíhož konstruktoru vložíme do parametru *stages* list s operacemi/objekty, jak by měly jít za sebou. S pipelinou pak můžeme zacházet jako s obyčejným modelem, tj. lze použít *fit* a *transform* operace.

In [25]:
from pyspark.ml import Pipeline

iris_train, iris_test = iris_frame.randomSplit([0.8, 0.2], seed=42)

cols_for_scaling = ["sepal_length_cm", "sepal_width_cm", "petal_length_cm", "petal_width_cm"]

vector_col_to_scale = VectorAssembler(
    inputCols=cols_for_scaling, 
    outputCol="vectorized_cols"
)
scaling_stage = StandardScaler(
    inputCol=vector_col_to_scale.getOutputCol(), 
    outputCol="scaledFeatures"
)

target_indexer = StringIndexer(
    inputCol="flower_class", 
    outputCol="flower_class_indexed", 
    stringOrderType="alphabetAsc"
)

ml_model = LogisticRegression(
    featuresCol=scaling_stage.getOutputCol(), 
    labelCol=target_indexer.getOutputCol()
)

classification_pipeline = Pipeline(
    stages=[vector_col_to_scale, scaling_stage, target_indexer, ml_model]
)

fited_model = classification_pipeline.fit(iris_train)

prediction_train = fited_model.transform(iris_train)
prediction_test = fited_model.transform(iris_test)

prediction_test.select("flower_class", "flower_class_indexed", "prediction").show()

+---------------+--------------------+----------+
|   flower_class|flower_class_indexed|prediction|
+---------------+--------------------+----------+
|    Iris-setosa|                 0.0|       0.0|
|    Iris-setosa|                 0.0|       0.0|
|    Iris-setosa|                 0.0|       0.0|
|    Iris-setosa|                 0.0|       0.0|
|    Iris-setosa|                 0.0|       0.0|
|Iris-versicolor|                 1.0|       1.0|
|    Iris-setosa|                 0.0|       0.0|
|    Iris-setosa|                 0.0|       0.0|
|    Iris-setosa|                 0.0|       0.0|
|Iris-versicolor|                 1.0|       1.0|
|    Iris-setosa|                 0.0|       0.0|
|    Iris-setosa|                 0.0|       0.0|
|    Iris-setosa|                 0.0|       0.0|
|Iris-versicolor|                 1.0|       1.0|
|Iris-versicolor|                 1.0|       1.0|
|Iris-versicolor|                 1.0|       1.0|
| Iris-virginica|                 2.0|       2.0|


### Ukládání modelu
Model uložíme pomocí metody *save*. Na lokále při naší windows instalaci tento příkaz povede k pádu. Na hdfs se ale model uloží - defaultně do user složky. Zdůrazněme, že narozdíl od klasických jednosouborových picklů má v případě PySparku uložený model podobu vcelku složité adresářové struktury.

```python
fited_model.save("saved_model")
```

Pro načtení modelu musíme napřed naimportovat odpovídající typ modelu. Nicméně ne ten samý, který jsme importovali při trénovaní. Namísto toho musíme chtít JmenoTypuModelu**Model** (tj. například u lineární regrese musíme z  *pyspark.ml.classification* naimportovat *LogisticRegressionModel*). V případě uložené pipeliny potřebujeme *PipelineModel*.

```python
from  pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("saved_model")

prediction_test = loaded_model.transform(iris_test)
prediction_test.select("flower_class", "flower_class_indexed", "prediction").show()
```

### Oversampling v PySparku
V aktuálních kosatcových datech jsou všechny třídy zastoupeny stejnoměrně. Co kdyby ale byl počet záznamů patřících k jedné třídě řádově menší než počty záznamů z tříd ostatních? Tehdy by nastala potřeba provést oversampling, který v té nejjednodušší formě nabývá podoby naklonování minoritních dat. Bohužel narozdíl od obyčejného Pythonu při práci s PySparkem přístup k balíčku imbalanced-learn nemáme. Nezbývá tudíž nic jiného oversamplování provést s pomocí metody *sample*.  
Napřed si vytvořme dataframe, ve kterém bude pouze minoritní třída - třídy ostatní budou v separátním datasetu. Na tomto dataframu provoláme *sample*. Zde si musíme dát pozor, abychom jednak nezapomenuli na parametr *withReplacement*, jednak aby v parametru *fraction* říkajícím, kolikrát má být výsledný dataframe větší než ten původní, byl float. Tj. i kdybychom chtěli počet záznamů minoritní třídy nafouknout třikrát, nepíšeme zde 3, nýbrž 3.0.  
Nakonec musíme spojit oversamplovaný frame s framem pro zbylé třídy. To provedeme provoláním metody *union* na jednom z nich s tím, že dataframe druhý je parametrem unionu.  
Všimněme si, že metoda *sample* není co do počtu vyprodukovaných záznamů nikteram přesná. Tato cifra se bude lišit spuštění od spuštění, což říká ostatně i samotná [dokumentace](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.sample.html).  

Pozn.: tento příklad je přitažený za vlasy, neboť fakticky v něm imbalanced dataframe vyrábíme. Nicméně pokud bych chtěl napřed část záznamů jedné ze tříd odstranit, musel bych opět použít sample, což by vysvětlení spíše zatemnilo než osvětlilo.

In [27]:
from pyspark.sql import functions as f

dataframe_only_class_setosa = iris_frame.filter(f.col("flower_class")=="Iris-setosa")
dataframe_other_classes = iris_frame.filter(f.col("flower_class")!="Iris-setosa")

dataframe_setosa_oversampled = dataframe_only_class_setosa.sample(withReplacement=True, fraction=4.0)
oversampled_frame = dataframe_other_classes.union(dataframe_setosa_oversampled)

oversampled_frame.groupby("flower_class").agg(f.count("*")).show()

+---------------+--------+
|   flower_class|count(1)|
+---------------+--------+
| Iris-virginica|      50|
|    Iris-setosa|     197|
|Iris-versicolor|      50|
+---------------+--------+



### Bucketování
Zejména při práci s datumy bývá užitečné hodnoty určitého sloupce roztřídit do předem připravených kategorií. Takovouto operaci obstarává v PySparku *Buckerizer*. Do jeho konstruktoru se krom vstupního a výstupního sloupce musí do parametru *split* uvést, jaké jsou hranice jednotlivých intervalů. Je třeba, aby tato čísla zahrnovala celou množinu hodnot ve vstupních sloupci - v opačném případě se setkáme s hláškou
```
IllegalArgumentException: Bucketizer_e751d7c9ec1e parameter splits given invalid value [5.0,6.0].
```
Následně už lze vytvořit nový dataframe s použitím *transform* metody bucketizerovského objektu (fit není třeba).

In [28]:
from pyspark.ml.feature import Bucketizer
bucketizer_sepal_length = Bucketizer(
    splits=[0, 5.0, 5.5, 6.0, 10], 
    inputCol="sepal_length_cm", outputCol="bucketized_sep_len"
)
bucketized_frame = bucketizer_sepal_length.transform(iris_frame)
bucketized_frame.show()

+---------------+--------------+---------------+--------------+------------+------------------+
|sepal_length_cm|sepal_width_cm|petal_length_cm|petal_width_cm|flower_class|bucketized_sep_len|
+---------------+--------------+---------------+--------------+------------+------------------+
|            5.1|           3.5|            1.4|           0.2| Iris-setosa|               1.0|
|            4.9|           3.0|            1.4|           0.2| Iris-setosa|               0.0|
|            4.7|           3.2|            1.3|           0.2| Iris-setosa|               0.0|
|            4.6|           3.1|            1.5|           0.2| Iris-setosa|               0.0|
|            5.0|           3.6|            1.4|           0.2| Iris-setosa|               1.0|
|            5.4|           3.9|            1.7|           0.4| Iris-setosa|               1.0|
|            4.6|           3.4|            1.4|           0.3| Iris-setosa|               0.0|
|            5.0|           3.4|        

### Imputing
Ukažme si ještě jeden příklad vytvoření ML modelu ve Sparku. Tentokrát se bude jednat o regresi - budeme se snažit určit hodnocení cereálií na základě jejich výživových i jiných vlastností. Data jsou k nalezení [zde](https://perso.telecom-paristech.fr/eagan/class/igr204/datasets), jejich popis je [zde](http://lib.stat.cmu.edu/datasets/1993.expo/).  

Nejprve si data načtěme. Všimněme si, že na druhém řádku jsou datové typy, nikoli relevantní záznam. Kdybychom tuto skutečnost neošetřili, buďto by se nám onen špatný záznam vložil na první řádek tabulky (PySpark 3), anebo by kvůli konfliktu se schématem byla tabulka nablněná samými nully (PySpark 2). Řešení spočívá v přidání parametru *mode* s hodnotou *dropmalformed*.  

Pakliže pracujeme s PySparkem 2, bude nutné nastavit datové typy pro sloupce "sugars" a  "potass" na DoubleType. Pokud bychom ponechali IntegerType, selhalo by imputování.

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

file_table_schema = StructType([
    StructField("name", StringType()),
    StructField("mfr", StringType()),
    StructField("type", StringType()),
    StructField("calories", IntegerType()),
    StructField("protein", IntegerType()),
    StructField("fat", IntegerType()),
    StructField("sodium", IntegerType()),
    StructField("fiber", DoubleType()),
    StructField("carbo", DoubleType()),
    StructField("sugars", IntegerType()),
    StructField("potass", IntegerType()),
    StructField("vitamins", IntegerType()),
    StructField("shelf", IntegerType()),
    StructField("weight", DoubleType()),
    StructField("cups", DoubleType()),
    StructField("rating", DoubleType())
])


cereal_frame = (
    spark.read.csv(
        path="cereal.csv",
        header=True,
        sep=";",
        schema=file_table_schema,
        mode="dropmalformed"
    )
)

cereal_frame.show(n=5)

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
+---------------

Vidíme, že ve sloupci potass (aka potassium neboli draslík) je místo váhy v miligramech mínus jednička. Nevyskytuje se něco podobného i v dalších sloupcích? Vypusťme na dataframe metodu *summary*. Jelikož PySparkový výstup moc čitelný není a tabulka produkovaná *summary* metodou je opravdu malá, převedeme ji do pandího dataframu.  
Vidíme, že mínus jednička se kromě sloupce *potass* objevuje i v *carbo* a *sugars*. Nedá se předpokládat, že by v nějakých cereáliích bylo nula sacharidů. Spíš byla hodnota neznámá. Abychom s odpovídajícími záznamy mohli dále pracovat, musíme na místo mínus jedničky vložit nějakou rozumnou hodnotu, třeba průměr zbylých hodnot v dotčeném sloupci.

In [5]:
cereal_frame.summary().toPandas()

Unnamed: 0,summary,name,mfr,type,calories,protein,fat,sodium,fiber,carbo,sugars,potass,vitamins,shelf,weight,cups,rating
0,count,77,77,77,77.0,77.0,77.0,77.0,77.0,77.0,77.0,77.0,77.0,77.0,77.0,77.0,77.0
1,mean,,,,106.88311688311688,2.5454545454545454,1.0129870129870129,159.67532467532467,2.151948051948052,14.597402597402596,6.922077922077922,96.07792207792208,28.246753246753247,2.207792207792208,1.0296103896103894,0.8210389610389613,42.66570498701299
2,stddev,,,,19.484119056820845,1.0947897484455342,1.0064725594803927,83.83229524009316,2.383363964387222,4.278956280325907,4.444885392419359,71.2868125092621,22.342522500566307,0.832524100135788,0.1504767997368921,0.2327161384469138,14.047288743735216
3,min,100% Bran,A,C,50.0,1.0,0.0,0.0,0.0,-1.0,-1.0,-1.0,0.0,1.0,0.5,0.25,18.042851
4,25%,,,,100.0,2.0,0.0,130.0,1.0,12.0,3.0,40.0,25.0,1.0,1.0,0.67,33.174094
5,50%,,,,110.0,3.0,1.0,180.0,2.0,14.0,7.0,90.0,25.0,2.0,1.0,0.75,40.400208
6,75%,,,,110.0,3.0,2.0,210.0,3.0,17.0,11.0,120.0,25.0,3.0,1.0,1.0,50.828392
7,max,Wheaties Honey Gold,R,H,160.0,6.0,5.0,320.0,14.0,23.0,15.0,330.0,100.0,3.0,1.5,1.5,93.704912


BTW pokud bychom chtěli vidět minimum jen u jednoho sloupce, použijeme následující kód:

In [31]:
from pyspark.sql import functions as f

cereal_frame.select(f.min("carbo")).show()

+----------+
|min(carbo)|
+----------+
|      -1.0|
+----------+



Nahrazování provedeme s pomocí *Imputeru*. Ten napřed z imputovaného sloupce odfiltruje hodnoty určené v parametru *missingValue* (jeho defaultní hodnota je nan) a na hodnotách zbylých provede výpočet určený v parametru *strategy*. Zdůrazněme, že parametry pro vstupní a výstupní sloupce se jmenují inputCol**s** a outputCol**s**. Tj. není možné předat jen jeden sloupce v podobě stringu, nýbrž v imputovacím konstrukoru se musí nacházet listy se seznamem jmen sloupců.  
Všimněme si též alternativního zápisu *fit* a *transform* metod - zde jsou napojeny za sebou. Jde o i jinde, nicméně z hlediska konzistence našeho textu budeme i nadále používat víceřádkový zápis.

In [6]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["carbo", "sugars", "potass"], 
    outputCols=["carbo", "sugars", "potass"],
    strategy="mean",
    missingValue=-1
)
cereal_frame = imputer.fit(cereal_frame).transform(cereal_frame)
cereal_frame.show(n=5)

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    98|      25|    3|   1.0|0.75|34.384843|
+---------------

Bohužel ani v PySparku 3 není možné provádět [imputování kategorických veličin](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html). Tj. textové řetězce jsou mimo hru. Nezbývá tak než použít regulárních výrazů. Například kdybychom chtěli nahradit libovolný počet mezer písmenem X, použijeme následující kód:

In [14]:
# works optimally for PySpark 2
from pyspark.sql import functions as f

space_dataframe_pandas = pd.DataFrame({
    "text": ["normal", "zero", "one", "two", "three", "normal again"],
    "spaces": ["something", "", " ", "  ", "   ", "anything"]    
})
space_dataframe_spark = spark.createDataFrame(space_dataframe_pandas)

space_dataframe_spark = space_dataframe_spark.withColumn(
    "without_spaces",
    f.regexp_replace(f.col("spaces"), "^\s*$", "X")
)

space_dataframe_spark.show()

+------------+---------+--------------+
|        text|   spaces|without_spaces|
+------------+---------+--------------+
|      normal|something|     something|
|        zero|         |              |
|         one|         |             X|
|         two|         |             X|
|       three|         |             X|
|normal again| anything|      anything|
+------------+---------+--------------+



Výše uvedený kód funguje v PySparku 2 podle očekávání. V PySparku 3 se ale zjevně něco změnilo v definici regulárních výrazů. Prázdný řetězec tak musíme ošetřit separátně pomocí when-otherwise konstrukce.

In [15]:
#for PySpark 3
from pyspark.sql import functions as f

space_dataframe_pandas = pd.DataFrame({
    "text": ["normal", "zero", "one", "two", "three", "normal again"],
    "spaces": ["something", "", " ", "  ", "   ", "anything"]    
})
space_dataframe_spark = spark.createDataFrame(space_dataframe_pandas)

space_dataframe_spark = space_dataframe_spark.withColumn(
    "without_spaces",
    f.regexp_replace(f.col("spaces"), "^\s*$", "X")
)

space_dataframe_spark = space_dataframe_spark.withColumn(
    "without_spaces", f.when(f.col("without_spaces")=="", "X").otherwise(f.col("without_spaces"))
)

space_dataframe_spark.show()

+------------+---------+--------------+
|        text|   spaces|without_spaces|
+------------+---------+--------------+
|      normal|something|     something|
|        zero|         |             X|
|         one|         |             X|
|         two|         |             X|
|       three|         |             X|
|normal again| anything|      anything|
+------------+---------+--------------+



### One-hot encoding

Pokračujme dál v přípravě modelu. Rozdělme si napřed data na trénovací a testovací sadu.

In [9]:
cereal_train, cereal_test = cereal_frame.randomSplit([0.8, 0.2], seed=42)

Oproti kosatcovému modelu zde budeme muset provést one-hot encoding. Ten v PySparku ale vyžaduje, aby byly třídy již ze stringové podoby převedeny na posloupnost integerů. Musíme tudíž na sloupce s kategorickými proměnnými vypustit StringIndexer. Pokud bychom měli jen jeden kategorický sloupec, mohli bychom psát následující: 

In [10]:
from pyspark.ml.feature import StringIndexer

string_indexer_mfr = StringIndexer(
    inputCol="mfr", 
    outputCol="mfr_index", 
    stringOrderType="alphabetAsc"
)
string_indexer_mfr_fit = string_indexer_mfr.fit(cereal_train)
cereal_train_indexer = string_indexer_mfr_fit.transform(cereal_train)
cereal_train_indexer.select("mfr", "mfr_index").show(n=10)

+---+---------+
|mfr|mfr_index|
+---+---------+
|  N|      3.0|
|  Q|      5.0|
|  K|      2.0|
|  R|      6.0|
|  G|      1.0|
|  G|      1.0|
|  P|      4.0|
|  Q|      5.0|
|  G|      1.0|
|  G|      1.0|
+---+---------+
only showing top 10 rows



Nicméně kategorických sloupců máme několik a tak namísto parametrů *inputCol* a *outputCol* musíme sáhnout po *inputCols* a *outputCols*. V dvojkovém PySparku tato možnost nebyla, což pozdější kroky poněkud komplikovalo.

In [11]:
#only PySpark 3
cols_for_one_hot = ["mfr", "type", "vitamins", "shelf"]

string_indexer_mfr = StringIndexer(
    inputCols=cols_for_one_hot, 
    outputCols=[column + "_index" for column in cols_for_one_hot], 
    stringOrderType="alphabetAsc"
)
string_indexer_mfr_fit = string_indexer_mfr.fit(cereal_train)
cereal_train_indexer = string_indexer_mfr_fit.transform(cereal_train)
cereal_train_indexer.select(
    "mfr", "type", "vitamins", "shelf",
    "mfr_index", "type_index", "vitamins_index", "shelf_index"
).show(n=10)

+---+----+--------+-----+---------+----------+--------------+-----------+
|mfr|type|vitamins|shelf|mfr_index|type_index|vitamins_index|shelf_index|
+---+----+--------+-----+---------+----------+--------------+-----------+
|  N|   C|      25|    3|      3.0|       0.0|           2.0|        2.0|
|  Q|   C|       0|    3|      5.0|       0.0|           0.0|        2.0|
|  K|   C|      25|    3|      2.0|       0.0|           2.0|        2.0|
|  R|   C|      25|    3|      6.0|       0.0|           2.0|        2.0|
|  G|   C|      25|    1|      1.0|       0.0|           2.0|        0.0|
|  G|   C|      25|    3|      1.0|       0.0|           2.0|        2.0|
|  P|   C|      25|    3|      4.0|       0.0|           2.0|        2.0|
|  Q|   C|      25|    2|      5.0|       0.0|           2.0|        1.0|
|  G|   C|      25|    1|      1.0|       0.0|           2.0|        0.0|
|  G|   C|      25|    2|      1.0|       0.0|           2.0|        1.0|
+---+----+--------+-----+---------+---

Na výstup *StringIndexeru* se posléze napojí *OneHotEncoder*. Zde ukazujeme příklad pro jeden sloupec. A co vlastně v novém sloupci vidíme? Pro jeho pochopení si musíme uvědomit, že se jedná o sparse vektor. První číslo značí počet tříd s tím, že poslední třída je vyřazená (to jsou ty záznamy typu (6,\[\],\[\]), tj. například pro pět tříd bychom zde viděli čtyřku. Druhé číslo označuje aktuální třídu alias index, na kterém se nachází třetí číslo - příznak přítomnosti třídy v podobě 1.0.

In [12]:
#only PySpark 3
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="mfr_index", outputCol="mfr_encoded")
encoder_fit = encoder.fit(cereal_train_indexer)
encoder_fit.transform(cereal_train_indexer).select("mfr", "mfr_index", "mfr_encoded").show()

+---+---------+-------------+
|mfr|mfr_index|  mfr_encoded|
+---+---------+-------------+
|  N|      3.0|(6,[3],[1.0])|
|  Q|      5.0|(6,[5],[1.0])|
|  K|      2.0|(6,[2],[1.0])|
|  R|      6.0|    (6,[],[])|
|  G|      1.0|(6,[1],[1.0])|
|  G|      1.0|(6,[1],[1.0])|
|  P|      4.0|(6,[4],[1.0])|
|  Q|      5.0|(6,[5],[1.0])|
|  G|      1.0|(6,[1],[1.0])|
|  G|      1.0|(6,[1],[1.0])|
|  G|      1.0|(6,[1],[1.0])|
|  R|      6.0|    (6,[],[])|
|  K|      2.0|(6,[2],[1.0])|
|  K|      2.0|(6,[2],[1.0])|
|  G|      1.0|(6,[1],[1.0])|
|  N|      3.0|(6,[3],[1.0])|
|  K|      2.0|(6,[2],[1.0])|
|  G|      1.0|(6,[1],[1.0])|
|  K|      2.0|(6,[2],[1.0])|
|  K|      2.0|(6,[2],[1.0])|
+---+---------+-------------+
only showing top 20 rows



Příklad uplatnění na více třídách najednou:

In [13]:
#only PySpark 3
encoder = OneHotEncoder(
    inputCols=[column + "_index" for column in cols_for_one_hot], 
    outputCols=[column + "_encoded" for column in cols_for_one_hot]
)
encoder_fit = encoder.fit(cereal_train_indexer)
encoder_fit.transform(cereal_train_indexer).select(
    "mfr", "mfr_index", "mfr_encoded", 
    "vitamins", "vitamins_index", "vitamins_encoded"
).show()

+---+---------+-------------+--------+--------------+----------------+
|mfr|mfr_index|  mfr_encoded|vitamins|vitamins_index|vitamins_encoded|
+---+---------+-------------+--------+--------------+----------------+
|  N|      3.0|(6,[3],[1.0])|      25|           2.0|       (2,[],[])|
|  Q|      5.0|(6,[5],[1.0])|       0|           0.0|   (2,[0],[1.0])|
|  K|      2.0|(6,[2],[1.0])|      25|           2.0|       (2,[],[])|
|  R|      6.0|    (6,[],[])|      25|           2.0|       (2,[],[])|
|  G|      1.0|(6,[1],[1.0])|      25|           2.0|       (2,[],[])|
|  G|      1.0|(6,[1],[1.0])|      25|           2.0|       (2,[],[])|
|  P|      4.0|(6,[4],[1.0])|      25|           2.0|       (2,[],[])|
|  Q|      5.0|(6,[5],[1.0])|      25|           2.0|       (2,[],[])|
|  G|      1.0|(6,[1],[1.0])|      25|           2.0|       (2,[],[])|
|  G|      1.0|(6,[1],[1.0])|      25|           2.0|       (2,[],[])|
|  G|      1.0|(6,[1],[1.0])|      25|           2.0|       (2,[],[])|
|  R| 

Nyní je čas, abychom si řekli, že one-hot encoding se mezi PySparkem 2 a 3 poněkud změnit. To, co se v trojkové verzi označuje jako *OneHotEncoder*, neslo ve verzi dvojkové jméno *OneHotEncoderEstimator*. *OneHotEncoder* tehdy též existoval, ale dokázal zpracovat pouze jeden sloupec. Navíc jak již padlo, dvojkový *StringIndexer* nemohl pracovat s více sloupci naráz. Toto omezení se pak muselo obcházet pomocí kódu podobnému tomuto:

In [None]:
#only for PySpark 2
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml import Pipeline

cols_for_one_hot = ["mfr", "type", "vitamins", "shelf"]
cols_for_one_hot_index = [name + "_index" for name in cols_for_one_hot]
cols_for_one_hot_out = [name + "_out" for name in cols_for_one_hot]

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in cols_for_onehot]
encoder = OneHotEncoderEstimator(inputCols=cols_for_onehot_index, outputCols=cols_for_onehot_out)
pipeline = Pipeline(stages=indexers+[encoder])
pipeline_fitted = pipeline.fit(cereal_train)
dataframe_edited = pipeline_fitted.transform(cereal_train)


Ukažme si natrénování celého modelu s tím, že budeme opět pracovat s pipelinou. Nejprve ve variantě pro PySpark 3:

In [15]:
#only for PySpark 3
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression

cereal_train, cereal_test = cereal_frame.randomSplit([0.8, 0.2], seed=42)

target_col = "rating"
cols_for_scaling = [
    "calories", "protein", "fat", "sodium", "fiber", 
    "carbo", "sugars", "potass", "weight", "cups"
]
cols_for_one_hot = ["mfr", "type", "vitamins", "shelf"]
cols_indexed = [col + "_index" for col in cols_for_one_hot]
cols_encoded = [col + "_out" for col in cols_for_one_hot]

assembler_col_to_scale = VectorAssembler(
    inputCols=cols_for_scaling, 
    outputCol="for_scaling_cols"
)
scaling_stage = StandardScaler(
    inputCol=assembler_col_to_scale.getOutputCol(),
    outputCol="scaled_features"
)
indexer = StringIndexer(
    inputCols=cols_for_one_hot, 
    outputCols=cols_indexed
)
encoder = OneHotEncoder(
    inputCols=cols_indexed, 
    outputCols=cols_encoded
)

assembler_all = VectorAssembler(
    inputCols=cols_encoded+["scaled_features"], 
    outputCol="features"
)

ml_model = LinearRegression(
    featuresCol=assembler_all.getOutputCol(), 
    labelCol=target_col
)

regression_pipeline = Pipeline(
    stages=[assembler_col_to_scale, scaling_stage, indexer, encoder, assembler_all, ml_model]
)

fitted_pipeline = regression_pipeline.fit(cereal_train)

prediction_train = fitted_pipeline.transform(cereal_train)
prediction_test = fitted_pipeline.transform(cereal_test)

prediction_test.select("rating", "prediction").show()

+---------+------------------+
|   rating|        prediction|
+---------+------------------+
|59.425505| 60.54358006629596|
|33.174094| 33.12394706709574|
|49.120253| 47.95570274543439|
|40.400208|40.373308841998984|
|40.448772| 41.35500976017807|
|44.330856| 46.23330342733896|
|28.025765| 27.70918673759257|
|21.871292| 21.40251073220914|
|34.139765| 33.73729220897782|
|30.313351|30.790146911007668|
|40.105965| 39.53195632838818|
| 40.69232|39.877234283445205|
|30.450843|31.506571421117194|
|63.005645| 63.54161635528529|
|40.560159| 40.58543925790734|
|38.839746|38.314970495626596|
+---------+------------------+



A následně pro PySpark 2:

In [None]:
#only for PySpark 2
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression

cereal_train, cereal_test = cereal_frame.randomSplit([0.8, 0.2], seed=42)

target_col = "rating"
cols_for_scaling = [
    "calories", "protein", "fat", "sodium", "fiber", 
    "carbo", "sugars", "potass", "weight", "cups"
]
cols_for_one_hot = ["mfr", "type", "vitamins", "shelf"]
cols_indexed = [col + "_index" for col in cols_for_one_hot]
cols_encoded = [col + "_out" for col in cols_for_one_hot]

assembler_col_to_scale = VectorAssembler(
    inputCols=cols_for_scaling, 
    outputCol="for_scaling_cols"
)
scaling_stage = StandardScaler(
    inputCol=assembler_col_to_scale.getOutputCol(),
    outputCol="scaled_features"
)
indexers = [
    StringIndexer(inputCol=column, outputCol=column + "_index") for column in cols_for_one_hot
]
encoder = OneHotEncoderEstimator(
    inputCols=cols_indexed, 
    outputCols=cols_encoded
)

assembler_all = VectorAssembler(
    inputCols=cols_encoded+["scaled_features"], 
    outputCol="features"
)

ml_model = LinearRegression(
    featuresCol=assembler_all.getOutputCol(), 
    labelCol=target_col
)

regression_pipeline = Pipeline(
    stages=[assembler_col_to_scale, scaling_stage] + indexers + [encoder, assembler_all, ml_model]
)

fitted_pipeline = regression_pipeline.fit(cereal_train)

prediction_train = fitted_pipeline.transform(cereal_train)
prediction_test = fitted_pipeline.transform(cereal_test)

prediction_test.select("rating", "prediction").show()

Na vyhodnocení přesnosti natrénovaného regresního modelu použijeme *RegressionEvaluator*. Jako *metricName* můžeme použít například "r2" či "rmse".

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="r2")

print(f"R2 metrics for train dataset: {evaluator.evaluate(prediction_train)}") 
print(f"R2 metrics for test dataset: {evaluator.evaluate(prediction_test)}") 

R2 metrics for train dataset: 0.9959503861337553
R2 metrics for test dataset: 0.9940305796610737


In [9]:
spark.stop()