# WIP

# PySpark

## Obsah
- [Co je Spark a PySpark](#Co-je-Spark-a-PySpark)
- [Instalace](#Instalace)
- [PySpark sešny](#PySpark-sešny)
- [Práce s daty](#Práce-s-daty)
  - [Výroba tabulkoidního objektu](#Výroba-tabulkoidního-objektu) 
  - [Dataframový přístup](#Dataframový-přístup)
    - [Informační metody](#Informační-metody)
    - [Manipulace se sloupci](#Manipulace-se-sloupci)
    - [PySpark a datumy](#PySpark-a-datumy)
    - [Window funkce](#Window-funkce)
  - [SQL přístup](#SQL-přístup)
  - [Cachování](#Cachování)

In [None]:
- něco o Spark UI
- pysparková vizualizace

## Co je Spark a PySpark
Jeden počítač sice z hlediska výpočetní síly určitou práci zastane, více počítačů najednou je ale mocnějších. Přitom zde nemluvíme pouze o nějakých složitých algoritmech, takovýto poznatek je možná ještě očividnější při "tupém" procházení velkého množství dat řádek po řádku. Nicméně hromada počítačů/serverů (alias cluster) sama o sobě tolik nezmůže - musí být ošetřena komunikace a rozdělování práce mezi nimi. Tuto úlohu může krom jiných zastávat právě Spark. Alternativně lze cluster počítačů řídit nečím jiným, třeba [YARNem](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html), přičemž na tuto věc se Spark napojí.  
Aplikace běžící ve Sparku se skládá z jednoho driver procesu a několika executor procesů. Driver proces má za úkol spravovat informace o sparkovské aplikaci, komunikovat s uživatelem a řídit a distribuovat práci executor procesů. Executory pak vykonávají práci jim zadanou a výsledky svých výpočtů vrací na driver. V případě našeho povídání, kdy si vše budeme ukazovat na lokálu, budou driver i executory de facto obyčejné procesy. V reálném prostředí by pravda šlo též o obyčejné procesy, ty by ale běžely na odlišných mašinách.  
Spark je napsán ve Scale, což není úplně nejsnazší jazyk na pochopení. Nicméně byla by škoda o sílu tohoto frameworku přijít. Proto vznikl PySpark jako svého druhu pythoní obal nad Sparkem. Z hodně naivního pohledu připomíná PySpark Pandas (balíček pro práci s tabulkami) zkřížený s balíčkem Scikit-learn (balíček pro strojové učení). Velký rozdíl ale tkví ve způsobu vyhodnocování příkazů. V Pandách je vyhodnocování "eager" (asi nemá cenu tento termín překládat - výsledek by čtenáře jen mátl). To znamená, že když spustíme jakoukoli pandí funkci či metodu, provede se okamžitě. Oproti tomu PySpark je lazy. Některé příkazy - tzv. transformace - se při odklepnutí v IDE fakticky nespustí, pouze se přidají do grafu vyhodnocování transformací. Tento graf, resp. operace v něm, odstartuje až když je to potřeba, obvykle kvůli příkazům typu akce. Příkladem transformace mohou být všelikaké úpravy pysparkového dataframu, zatímco příkladem akce by bylo zobrazení tohoto dataframu uživateli. Důvod takovéhoto chování je redukce míry načítání obřích dataframů do paměti a zefektivnění práce s clustery serverů. Člověk musí mít toto chování pořád na paměti, jinak zjistí, že mu aplikace padá. Proč? Jelikož samotné výpočty (obvykle řetěz transformací s obříma datama) probíhá na executor procesech, driver procesy nemají alokovány tolik paměti. Pakliže ale na tyto velká data vypustíme nevhodnou akci (třeba akci "vytiskni celou obří tabulku"), tak všechny executory svá data pošlou na driver a ten to nerozchodí.  

## Instalace
V praxi Pyspark buďto bydlí na Linuxu a je napojen na HDFS, anebo ho uživatel najde v cloudu, dejme tomu v Azure jako Databrics. Nicméně první by pro potřeby tohoto výukového textu znamenalo složitejší instalaci, druhé zase vytažení platební karty. Proto si zde povíme, jak PySpark nainstalovat na obyčejný počítač, kde jako operační systém slouží MS Windows.  
Při instalaci jsem postupoval podle tohoto [návodu](https://sparkbyexamples.com/pyspark/install-pyspark-in-anaconda-jupyter-notebook/). Pokud by odkaz nebyl v budoucnu funkční, shrnu zde hlavní body:  
- instalace [Anacondy](https://www.anaconda.com/)  
- instalace Javy (tu jsem měl nainstalovanou již dříve, nicméně podle návodu by to šlo udělat i přes anacondí repozitář pomocí "conda install openjdk")  
- instalace PySparku pomocí "conda install pyspark"  
- instalace balíčku findspark  

V mém případě toto bohužel nestačilo. Nejprve se při otestování funkčnosti (napsání "pyspark" do anacondí konzole) objevila hláška na způsob
```
Python was not found; run without arguments to install from the Microsoft Store
```
Bylo třeba klepnout na Start a psát (přesněji začít psát) "Manage app execution aliases". Po kliknutí na odpovídající položku ve Startu se v okně, které se objevilo, musely vypnout všechny věci vypadající, že mají nějakou spojitost s Pythonem.  
Při následovném pokusu o spuštění PySparku se v konzoli vypsala zpráva
```
Missing Python executable 'python3', defaulting to 'C:\Users\ThereWouldBeUserName\miniconda3\Scripts\..' for SPARK_HOME environment variable. Please install Python or specify the correct Python executable in PYSPARK_DRIVER_PYTHON or PYSPARK_PYTHON environment variable to detect SPARK_HOME safely.
```
Řešení spočívalo v nastavení proměnné PYSPARK_PYTHON. To se realizuje následujícím příkazem napsaným do konzole
```
set PYSPARK_PYTHON=python
```
Zdůrazněme, že takto nastavená proměnná existuje jen do té doby, dokud není konzole zavřena. Pokud bychom na tento krok zapomenuli a spustily Jupyter Notebook, projeví se to v nekonečném vytváření sparkové sešny (obvykle první věc, kterou budeme dělat po importování balíčků). Nicméně v konzoli, ve které jsme Jupyter pustili, by se objevila výše uvedená "missing python executable" chyba.  
Zmiňme nakonec, že v konzoli jak při použití Jupyteru, tak při spuštění PySpaarku napřímo uvidíme chybu spojenou s hadoopem:
```
java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
```
Ta naši práci neovlivní (žádný Hadoop u sebe nainstalovaný nemáme), takže ji můžeme ignorovat.

## PySpark sešny
Pro komunikaci se sparkovským enginem, bez kterého se neobejdeme, je třeba vytvořit sparkovskou sešnu. To provedeme následujícím kódem (vnější sada závorek tu je jen proto, abychom mohli provádět odřádkování a neměli tak jeden nekonečný řádek):

In [1]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("toy_application")
    .getOrCreate()
)

V tomto minimálním příkladu jsem sešnu pojmenovali (*appName*) a poté jsme zavolali metodu *getOrCreate*. Ta zajistí, že šesna bude singleton. Tj. pokud sešna neexistuje, bude vytvořena, ale pokud už nějaká sešna předtím vznikla, *getorCreate* vrátí právě tu. Toto chování si demonstrujme na zobrazení spark objektu a na vypsání jeho IDčka.  
Když vložíme do buňka samostojící jméno sešny, dostaneme odkaz na Spark UI - de facto webový inteface, ve kterém můžeme běh sešny kontrolovat. Pod labelem "Version" se skrývá verze Sparku, "Master" nám zase řekne, kde vlasntě Spark bydlí. Důležité pro nás v tomto bodě je "AppName", kde vidíme jméno naší aplikace - toy_application.

In [2]:
spark

Vypišme si idčko spark objektu.

In [37]:
id(spark)

2881290530624

Když se nyní pokusíme vytvořit novou sparkovsou sešnu, můžeme se přesvědčit, že nám *getOrCreate* opravdu vrátil sešnu původní.

In [38]:
spark_2 = (
    SparkSession
    .builder
    .appName("toy_application")
    .getOrCreate()
)

id(spark_2)

2881290530624

Dokonce ani snaha o nastavení odlišného appNamu nepovede k vytvoření opravdu nové sešny.

In [39]:
spark_3 = (
    SparkSession
    .builder
    .appName("another_toy_application")
    .getOrCreate()
)

id(spark_3)

2881290530624

Vidíme, že nové jméno ani nedokázalo přepsat jméno staré.

In [40]:
spark_3

Chceme-li z nějakého důvodu vytvořit sešnu novou, musíme na sešně staré zavolat *newSession*.

In [41]:
spark_4 = spark.newSession()
id(spark_4)

2881290526784

Nicméně takto vytvořená sešna není úplně oddělená od sešny původní. Reálně se hodí jen pokud bychom opravdu potřebovali mít dva od sebe oddělené namespacy, tj. pokud bychom potřebovali, aby se pod stejným jménem ve dvou sešnách nacházely jiné objekty. Koneckonců i v dokumentaci se píše, že 
```
Returns a new SparkSession as new session, that has separate SQLConf,
registered temporary views and UDFs, but shared SparkContext and
table cache.
```
Zde se mluví o jakémsi SparkContextu. Pokud bychom na internetu narazili na staré tutoriály (před Spark 2.0), viděli bychom, že se v nich SparkContext používá jako vstup do sparkovského enginu namísto SparkSession. Jaký je mezi těmito entitami rozdíl? SparkContext se musí složitěji (a explicitněji) nastavovat a na celém JVM (Java Virtual Machine alias to, na čem jede Spark) může existovat jen jeden. Nicméně i když ho v našem uživatelském kódu nikde nevytváříme, v pozadí pořád existuje. Když například chceme sešnu ukončit a provoláme na ní metodu *stop*, zastavíme i všechny ostatní sešny - koneckonců zkume si poté, co jsme ukončili sešnu spark_4, otevřít odkazy na Spark UI z předešlých sešen.

In [43]:
spark_4.stop()

Docstring v metodě *stop* totiž zmiňuje právě SparkContext:
```
Stop the underlying :class:`SparkContext`.
```
Jinak co se týče zastavování sešen, v našem případě, kdy pracujeme na lokálu, to potřeba není. Celý Spark se totiž vypne s vypnutím Jupyteru. Nicméně na normálním prostředí už by zastavování sešen po skončení práce bylo životně důležité, aby se neblokovaly hardwarové prostředky.  
Tyto hardwarové prostředky, které bude naše aplikace používat, bychom mohli specifikovat při vytváření sešny explicitně
```
spark_example = (
    SparkSession
    .builder
    .config("spark.executors.cores", "5")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "4G")
    .config("spark.driver.memory","2G")
    .appName("another_toy_application")
    .getOrCreate()
)
```
Asi častější ale bude, že někdo z infrastruktury pro nás vytvoří uživatelský pool s dedikovanými prostčedky, do kterého se přihlásíme variací na
```
spark_example = (
    SparkSession
    .builder
    .config("spark.yaml.queue", "pool_data_scientist")
    .appName("another_toy_application")
    .getOrCreate()
)
```

## Práce s daty
Základní stavební jednotkou dat, se kterým Spark pracuje, je tzv. Resilient Distributed Dataset (RDD). Můžeme si ho představit jako svého druhu tuple, na kterém lze efektivně práci paralelizovat. Zdůrazněme, že RDD má blíž k tuplu než k listu - je immutable, tj. operace na něm proběhla ho nemění, nýbrž vrací zeditovanou kopii. Na každý pád dnes není moc důvodů s RDD pracovat - vznikl totiž datový objekt, se kterým se snadněji pracuje - DataFrame. Jméno připomíná základní stavební jednotku balíčku Pandas a podobnost není čistě náhodná - opravdu se jedná o reprezentaci tabulkoidního objektu. Naše povídání se tak bude celou dobu a zejména v této kapitole týkat tohoto typu objektu. Pro úplnost ještě poznamenejme, že ve Sparku existují tzv. Datasety, které ale nejsou v pySparku implementovány.

### Výroba tabulkoidního objektu

In [1]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("toy_application")
    .getOrCreate()
)

Asi nejpřímočařejším, byť ne úplně využívaným postupem na vytvoření pysparkového dataframu, je jeho konstrukce z dat zadaných v programu v podobě listu. Tento list se skládá z podlistů či z tuplů. Ty v budoucí tabulce budou reprezentovat jednotlivé řádky.

In [2]:
data_in_list = [
    ["Švejk",40,"Praha"], 
    ["Vyskočil",50,"Horní Dolní"]
]

List vložíme jako parametr do funkce *createDataFrame*.

In [3]:
table_from_list = spark.createDataFrame(data_in_list)

Výsledný dataframe si zobrazíme pomocí metody *show* (a ano, *show* je akce). Všimněme si, že jelikož jsme nespecifikovali hlavičku tabulky, objevila se nám posloupnost čísel \_1, \_2 atd.

In [4]:
table_from_list.show()

+--------+---+-----------+
|      _1| _2|         _3|
+--------+---+-----------+
|   Švejk| 40|      Praha|
|Vyskočil| 50|Horní Dolní|
+--------+---+-----------+



Pro vytvoření dataframu s hlavičkou je speciální parametr v *createDataFrame* - *schema* s listem obsahujícím jména sloupců.

In [13]:
table_from_list = spark.createDataFrame(
    data_in_list,
    schema=["name", "age", "city"]
)
table_from_list.show()

+--------+---+-----------+
|    name|age|       city|
+--------+---+-----------+
|   Švejk| 40|      Praha|
|Vyskočil| 50|Horní Dolní|
+--------+---+-----------+



Pysparkový dataframe lze vytvořit i z pandího dataframu - zkrátka se do *createDataFrame* namísto listu vloží právě onen pandí dataframe.

In [14]:
import pandas as pd
pandas_dataframe = pd.DataFrame({
    "name": ["Švejk", "Vyskočil"],
    "age": [40, 50],
    "city": ["Praha", "Horní Dolní"]
})
table_from_pandas = spark.createDataFrame(pandas_dataframe)
table_from_pandas.show()

+--------+---+-----------+
|    name|age|       city|
+--------+---+-----------+
|   Švejk| 40|      Praha|
|Vyskočil| 50|Horní Dolní|
+--------+---+-----------+



Pokud v *createDataFrame* uvedeme jména sloupců, mají tyto jména oproti těm v pandím dataframu prioritu.

In [15]:
import pandas as pd
pandas_dataframe = pd.DataFrame({
    "name": ["Švejk", "Vyskočil"],
    "age": [40, 50],
    "city": ["Praha", "Horní Dolní"]
})
table_from_pandas = spark.createDataFrame(
    pandas_dataframe,
    schema=["last_name", "age", "address"]
)
table_from_pandas.show()

+---------+---+-----------+
|last_name|age|    address|
+---------+---+-----------+
|    Švejk| 40|      Praha|
| Vyskočil| 50|Horní Dolní|
+---------+---+-----------+



Pro úplnost zmiňme, že pokud bychom potřebovali konverzi opačným směrem, tj. z PySparku do Pand, použijeme metodu *toPandas*.

In [16]:
new_pandas_frame = table_from_pandas.toPandas()
new_pandas_frame

Unnamed: 0,last_name,age,address
0,Švejk,40,Praha
1,Vyskočil,50,Horní Dolní


Načíst data jde pochopitelně i z csv souboru. Nicméně popravdě metoda, kterou si zde ukážeme, nebudeme asi fungovat, když ono csvčko nebude na normálním disku, ale někde na HDFS.  
Načtení realizujeme s pomocí *read.csv*. Parametry jsou obdobné jaké v pandím *read_csv*. Zdůrazněme ale, že v parametru *path* specifikující cestu k souboru musí být string (resp. list stringů), nikoli pathlib.Path.

In [2]:
table_from_csv = spark.read.csv(path="pyspark_data/csv/data_for_csv_load.csv", sep=",", header=True)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Takto to vypadá neproblematicky. Nicméně zkusme na náš dataframe vypustit metodu *describe*, který ukáže datové typy sloupců. Uvidíme, že jsou všechny brány jako stringy.

In [18]:
table_from_csv.describe()

DataFrame[summary: string, id: string, first_name: string, last_name: string, age: string, gender: string]

Takové je chování, které nastává při nastavení parametru *inferSchema* na hodnotu None resp. False. Co dělat, kdybychom chtěli mít rozumné datové typy už od načtení? Když se *inferSchema* položí rovno True, PySpark načte prvních pár řádků ze souboru a z nich datový typ odvodí. Problém nastane v okamžiku, kdy se první řádky liší od řádků následujících a je tak automaticky zvolen nevhodný datový typ. Anebo se "zbytečně" kvůli typům načte celý soubor, což může trvat docela dlouho. Proto bývá vhodné definovat datový typ explicitně. Na to použijeme objekt typu *StructType* reprezentující schéma celého dataframu. Tento *StructType* obsahuje objekty *StructField* obsahující informace o jednotlivých sloupcích. První parametr konstruktoru *StructField* obsahuje jméno sloupce, druhý parametr datový typ. Třetí parametr by měl dle [dokumentace](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.types.StructField.html) obdsahovat informaci, zda pole může být nullable. Nicméně při mých pokusech na Sparku 3 se tento atribut po vytvoření dataframu nezávisle na tom, co jsem do konstuktoru vložil, automaticky nastavil na True (zjištěno prostřednictvím *table_from_csv.schema*). 

In [13]:
file_table_schema = StructType([
    StructField("id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", DoubleType()),
    StructField("gender", StringType())
])

table_from_csv = spark.read.csv(
    path="pyspark_data/csv/data_for_csv_load.csv", sep=",", 
    header=True, schema=file_table_schema
)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Když nějaký sloupec dostane špatný datový typ (například "age" vs souboru je ve fromátu 25.0, tj. je to float, ale my mu dáme IntegerType), celý sloupec se naplnít nully.

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

file_table_schema = StructType([
    StructField("id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", IntegerType()),
    StructField("gender", StringType())
])

table_from_csv = spark.read.csv(
    path="pyspark_data/csv/data_for_csv_load.csv", 
    sep=",", header=True, schema=file_table_schema
)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|null|     M|
|200|      Mary|    Shelly|null|     F|
|300|    Johann|    Geothe|null|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|null|     M|
+---+----------+----------+----+------+



Výše jsme zmínili, že při načítání souborů mužeme uvést nikoli pouze string, ale i list.

In [5]:
csv_files_list = [
    "pyspark_data/csv/data_for_csv_load.csv", 
    "pyspark_data/csv/data_for_csv_load_2.csv"
]
table_from_csv = spark.read.csv(path=csv_files_list, sep=",", header=True)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
|160|     Isaac|    Asimov|48.0|     M|
+---+----------+----------+----+------+



Nicméně musíme si dát pozor na to, aby soubory měly stejná schémata. V opačném případě se použije schéma jednoho ze souborů. U dat ze souborů ostatních se neexistující sloupce doplní nully a nadbytečné sloupce se useknou. Je otázkou, co určuje schéma dominující soubor - pořadí v listu to není.

In [16]:
csv_files_list = [
    "pyspark_data/csv/data_for_csv_load.csv", 
    "pyspark_data/csv/data_for_csv_load_3.csv"
]
table_from_csv = spark.read.csv(path=csv_files_list, sep=",", header=True)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
|160|     Isaac|    Asimov|48.0|     M|
+---+----------+----------+----+------+



Údajně by dokonce mělo stačit do pathy vložit adresář obsahující csv soubory a do dataframu by se měl obsah všech souborů načíst. Nicméně aspoň na lokálu se mi to nepodařilo.

Uložení dataframu do csv souboru by se podle dokumentace mělo provést pomocí
```
table_from_csv.write.csv("data_after_saving_to_csv.csv", sep="|")
```
Když to ale provedeme u sebe na lokále na Windows, obdržíme chybovou hlášku mluvící o Hadoopu. Proto pokud chceme dataframe uložit na (normální) disk, bude asi nejsnazší ho napřed zkonvertovat do pandí verze pomocí toPandas a poté uložit pandí metodou *to_csv*.

Mějme json o obsahu
```json
{"first_name": "John", "last_name": "Doe"},
{"first_name": "Jane", "last_name": "Doe"}
```
Do dataframu ho načteme příkazem *spark.read.json*:

In [25]:
table_from_json = spark.read.json(path="pyspark_data/json/data_for_json_load.json")
table_from_json.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
|      Jane|      Doe|
+----------+---------+



Nicméně pokud budou záznamy roztaženy na více než jeden řádek, např. takto:
```json
{
  "first_name": "John", 
  "last_name": "Doe"
},
{
  "first_name": "Jane", 
  "last_name": "Doe"
}
```
stejný kód nám spadne s chybovou hláškou:

In [26]:
table_from_json = spark.read.json(path="pyspark_data/json/data_for_json_load_multiline.json")
table_from_json.show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

Při pohledu do dokumentace by se zdálo, že řešením je použití parametru multiLine s hodnotou True. Jenže ouha, v takto vytvořené tabulce je jen jeden záznam.

In [30]:
table_from_json = spark.read.json(
    path="pyspark_data/json/data_for_json_load_multiline.json",
    multiLine=True
)
table_from_json.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
+----------+---------+



Aby Spark správně multiline jsony interpretoval, musí tyto začínat a končit hranatými závorkami, tj. musí vypadat takto:
```json
[{
  "first_name": "John", 
  "last_name": "Doe"
},
{
  "first_name": "Jane", 
  "last_name": "Doe"
}]
```

In [31]:
table_from_json = spark.read.json(
    path="pyspark_data/json/data_for_json_load_multiline_2.json",
    multiLine=True
)
table_from_json.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
|      Jane|      Doe|
+----------+---------+



Hnízděné jsony lze načíst též, ale poté musí následovat zprocesování dataframu do použitelnější podoby.

In [26]:
table_from_json = spark.read.json(
    path="pyspark_data/json/data_for_json_load_complicated.json",
)
table_from_json.show()

+--------------------+----------+---------+
|             address|first_name|last_name|
+--------------------+----------+---------+
|[{Prague, Some st...|      John|      Doe|
|[{Brno, Some stre...|      Jane|      Doe|
+--------------------+----------+---------+



### Dataframový přístup
#### Informační metody
V této podkapitole se budeme věnovat operacím s dataframy. Víceméně nám půjde o to najít ekvivalenty k pandasím konstrukcím.  
V předcházející podkapitole jsme viděli, že se dataframe (přesněji prvních pár řádků) dá zobrazit pomocí metody *show*.

In [3]:
table_from_csv = spark.read.csv(path="pyspark_data/csv/data_for_csv_load.csv", sep=",", header=True)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



V případě, že chceme vidět jen prvních N řádků, vložíme toto číslo N jako parametr do *show*. Ten bude tudíž ekvivalentní pandasímu *head*. Defaultní hodnota tohoto parametru nesoucího jméno *n* je 20.

In [6]:
table_from_csv.show(2)

+---+----------+---------+----+------+
| id|first_name|last_name| age|gender|
+---+----------+---------+----+------+
|100|    Victor|     Hugo|25.0|     M|
|200|      Mary|   Shelly|30.0|     F|
+---+----------+---------+----+------+
only showing top 2 rows



Pakliže je délka řetězce v některém sloupci delší než 20 znaků, je z hlediska toho, co zobrazuje *show*, tento řetězec oříznut (reálně se ale obsah buňky neztrácí).

In [3]:
numbers_list = [
    [number, number*10, "abc"*number] for number in range(100)
]
numbers_frame = spark.createDataFrame(
    numbers_list,
    schema=["numbers_1", "numbers_2", "long_word"]
)
numbers_frame.show()

+---------+---------+--------------------+
|numbers_1|numbers_2|           long_word|
+---------+---------+--------------------+
|        0|        0|                    |
|        1|       10|                 abc|
|        2|       20|              abcabc|
|        3|       30|           abcabcabc|
|        4|       40|        abcabcabcabc|
|        5|       50|     abcabcabcabcabc|
|        6|       60|  abcabcabcabcabcabc|
|        7|       70|abcabcabcabcabcab...|
|        8|       80|abcabcabcabcabcab...|
|        9|       90|abcabcabcabcabcab...|
|       10|      100|abcabcabcabcabcab...|
|       11|      110|abcabcabcabcabcab...|
|       12|      120|abcabcabcabcabcab...|
|       13|      130|abcabcabcabcabcab...|
|       14|      140|abcabcabcabcabcab...|
|       15|      150|abcabcabcabcabcab...|
|       16|      160|abcabcabcabcabcab...|
|       17|      170|abcabcabcabcabcab...|
|       18|      180|abcabcabcabcabcab...|
|       19|      190|abcabcabcabcabcab...|
+---------+

Pokud ořezávání chceme vypnout, přidáme do *show* parametr *truncate* s hodnotou False.

In [22]:
numbers_frame.show(truncate=False)

+---------+---------+---------------------------------------------------------+
|numbers_1|numbers_2|long_word                                                |
+---------+---------+---------------------------------------------------------+
|0        |0        |                                                         |
|1        |10       |abc                                                      |
|2        |20       |abcabc                                                   |
|3        |30       |abcabcabc                                                |
|4        |40       |abcabcabcabc                                             |
|5        |50       |abcabcabcabcabc                                          |
|6        |60       |abcabcabcabcabcabc                                       |
|7        |70       |abcabcabcabcabcabcabc                                    |
|8        |80       |abcabcabcabcabcabcabcabc                                 |
|9        |90       |abcabcabcabcabcabca

Zdůrazněme, že *show* slouží opravdu jen na zobrazení prvních pár řádků. Pokud ale potřebujeme z dataframu prvních několik řádků vybrat, aby se s nimi posléze dály nějaké další operace, musí se použít metoda *limit*:

In [8]:
numbers_frame.limit(5).show()

+---------+---------+------------+
|numbers_1|numbers_2|   long_word|
+---------+---------+------------+
|        0|        0|            |
|        1|       10|         abc|
|        2|       20|      abcabc|
|        3|       30|   abcabcabc|
|        4|       40|abcabcabcabc|
+---------+---------+------------+



Nechat si zobrazit prvních pár řádků bývá užitečné, avšak dobré je dostat se i k nějakým informacím popisujícím celý dataframe. Například pokud chceme znát schéma dataframu (aka datové typy sloupců), použijeme metodu *printSchema*.

In [26]:
numbers_frame.printSchema()

root
 |-- numbers_1: long (nullable = true)
 |-- numbers_2: long (nullable = true)
 |-- long_word: string (nullable = true)



Shrnující statistiky přináší metoda *summary*. Jelikož vrací dataframe, musí se za její provolání nalepit *show*.

In [28]:
numbers_frame.summary().show()

+-------+------------------+-----------------+--------------------+
|summary|         numbers_1|        numbers_2|           long_word|
+-------+------------------+-----------------+--------------------+
|  count|               100|              100|                 100|
|   mean|              49.5|            495.0|                null|
| stddev|29.011491975882016|290.1149197588202|                null|
|    min|                 0|                0|                    |
|    25%|                24|              240|                null|
|    50%|                49|              490|                null|
|    75%|                74|              740|                null|
|    max|                99|              990|abcabcabcabcabcab...|
+-------+------------------+-----------------+--------------------+



Parametry *summary* jsou chtěné statistiky. Defaultně se ukazuje téměř vše, co *summary* dokáže. Výjimkou je snad jen možnost specifikovat chtěný percentil: 

In [34]:
numbers_frame.summary("10%").show()

+-------+---------+---------+---------+
|summary|numbers_1|numbers_2|long_word|
+-------+---------+---------+---------+
|    10%|        9|       90|     null|
+-------+---------+---------+---------+



Extrémně podobnou funkcí je *describe*. Zde se ale jako parametry neudávají statistiky, ale sloupce, nad kterými se statistiky mají napočítat.

In [33]:
numbers_frame.describe().show()

+-------+------------------+-----------------+--------------------+
|summary|         numbers_1|        numbers_2|           long_word|
+-------+------------------+-----------------+--------------------+
|  count|               100|              100|                 100|
|   mean|              49.5|            495.0|                null|
| stddev|29.011491975882016|290.1149197588202|                null|
|    min|                 0|                0|                    |
|    max|                99|              990|abcabcabcabcabcab...|
+-------+------------------+-----------------+--------------------+



In [35]:
numbers_frame.describe("numbers_2", "long_word").show()

+-------+-----------------+--------------------+
|summary|        numbers_2|           long_word|
+-------+-----------------+--------------------+
|  count|              100|                 100|
|   mean|            495.0|                null|
| stddev|290.1149197588202|                null|
|    min|                0|                    |
|    max|              990|abcabcabcabcabcab...|
+-------+-----------------+--------------------+



Pro získání počtu řádků dataframu se v PySparku nepoužije *len*, nýbrž metoda *count*.

In [4]:
numbers_frame.count()

100

#### Manipulace se sloupci
PySparkové datasety jsou immutable. To znamená, že je jejich metody nemění, nýbrž po svém doběhnutí vrací nový dataframe. Ukažme si to na metodě přidávající/přepisující sloupec *withColumn*. Jejím prvním parametrem je název nového/přepisovaného sloupce, parametrem druhým pak hodnoty onoho sloupce. Pro jejich vytvoření budeme obvykle potřebovat funkce z *pyspark.sql.functions*. Kupříklad na šáhnutí na jiný sloupec se použije funkce *functions.col*.

In [5]:
from pyspark.sql import functions as f
table_from_csv.withColumn("age_2", f.col("age")+10)
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Vidíme že tabulka je pořád stejná. Když chceme její o sloupec bohatší verzi, musíme do proměnné vložit dataframe z výstupu metody. Připomeňme, že withColumn je transformace, tudíž aby se něco vůbec udělalo (nejen zobrazilo, ale aby opravdu PySpark udělal něco jiného než si jen transformaci zapsal do sady úkolů), musí se zavolat akce - v našem případě metoda *show*.

In [6]:
table_from_csv = table_from_csv.withColumn("age_2", f.col("age")+10)
table_from_csv.show()

+---+----------+----------+----+------+-----+
| id|first_name| last_name| age|gender|age_2|
+---+----------+----------+----+------+-----+
|100|    Victor|      Hugo|25.0|     M| 35.0|
|200|      Mary|    Shelly|30.0|     F| 40.0|
|300|    Johann|    Geothe|75.0|     M| 85.0|
|400|    Albert|     Camus|null|     M| null|
|500|   William|Shakespear|38.0|     M| 48.0|
+---+----------+----------+----+------+-----+



Pro vyhození sloupce slouží metoda *drop*:

In [59]:
table_from_csv = table_from_csv.drop("age_2")
table_from_csv.show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Sloupce přejmenujeme pomocí *withColumnRenamed*, kde prvním parametrem je staré jméno sloupce a parametrem druhým jméno nové:

In [60]:
table_from_csv.withColumnRenamed("last_name", "author_last_name").show()

+---+----------+----------------+----+------+
| id|first_name|author_last_name| age|gender|
+---+----------+----------------+----+------+
|100|    Victor|            Hugo|25.0|     M|
|200|      Mary|          Shelly|30.0|     F|
|300|    Johann|          Geothe|75.0|     M|
|400|    Albert|           Camus|null|     M|
|500|   William|      Shakespear|38.0|     M|
+---+----------+----------------+----+------+



Občas je potřeba provést konverzi datového typu sloupce 

In [61]:
data_types_list = [
    ["zero", "0"],
    ["one", "1"],
    ["two", "2"]
]
data_types_frame = spark.createDataFrame(
    data_types_list,
    schema=["words", "numbers"]
)
data_types_frame.printSchema()

root
 |-- words: string (nullable = true)
 |-- numbers: string (nullable = true)



To se provede s použitím metody *cast*, která obsahuje chtěný datový typ a která je vypuštěna na příslušný sloupec vybraný pomocí *functions.col*. Efektivně starý sloupec přepisujeme novým:

In [63]:
from pyspark.sql.types import IntegerType

data_types_frame = data_types_frame.withColumn(
    "numbers",
    f.col("numbers").cast(IntegerType())
)
data_types_frame.printSchema()

root
 |-- words: string (nullable = true)
 |-- numbers: integer (nullable = true)



Pokud bychom chtěli z nějakého důvodu dostat najednou data z celého dataframu, použijeme metodu *collect*. Všimněte si na níže uvedeném příkladu, že tato metoda vrací list řádků.  
Chceme ale opravdu najednou dostat data z celého dataframu? No, obvykle ne. Znamená to totiž, že se z jednotlivých executorů pošlou data na driver, což pro dostatečně velký dataframe vyústí v "OutOfMemory" error.

In [30]:
table_from_csv.collect()

[Row(id='100', first_name='Victor', last_name='Hugo', age='25.0', gender='M'),
 Row(id='200', first_name='Mary', last_name='Shelly', age='30.0', gender='F'),
 Row(id='300', first_name='Johann', last_name='Geothe', age='75.0', gender='M'),
 Row(id='400', first_name='Albert', last_name='Camus', age=None, gender='M'),
 Row(id='500', first_name='William', last_name='Shakespear', age='38.0', gender='M')]

Každopádně pokud už hodláme data získat pomocí collectu, můžeme z nich vyzobnout jednu buňku s pomocí indexů. Takový postup je pak vzdáleně podobný pandasímu *iloc*.

In [32]:
table_from_csv.collect()[4][1]

'William'

Jak tedy provádět vybírání sloupců a řádků správně? Pro výběr sloupců se použije metoda select. Zdůrazněme, že narozdíl od Pandas se tu u více sloupců neočekává, že budou v listu, ale jdou jeden za druhým jako obyčejné argumenty.

In [36]:
table_from_csv.select("first_name", "last_name").show()

+----------+----------+
|first_name| last_name|
+----------+----------+
|    Victor|      Hugo|
|      Mary|    Shelly|
|    Johann|    Geothe|
|    Albert|     Camus|
|   William|Shakespear|
+----------+----------+



Pro vybrání řádků na základě hodnoty se použije metoda *filter* (resp. *where* - jedná se o alias pro *filter*). Jelikož pro potřeby takovéto operace je nutno na sloupce šáhnout, musíme opět použít metodu *col* z *pyspark.sql.functions*:

In [37]:
table_from_csv.filter(f.col("age")>=30).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



In [17]:
table_from_csv.filter(f.col("last_name")=="Shakespear").show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Pro znegování podmínky slouží vlnka.

In [19]:
table_from_csv.filter(~(f.col("last_name")=="Shakespear")).show()

+---+----------+---------+----+------+
| id|first_name|last_name| age|gender|
+---+----------+---------+----+------+
|100|    Victor|     Hugo|25.0|     M|
|200|      Mary|   Shelly|30.0|     F|
|300|    Johann|   Geothe|75.0|     M|
|400|    Albert|    Camus|null|     M|
+---+----------+---------+----+------+



Co se týče použití více podmínek naráz, je situace stejná jako v pandách. Tj. jednotlivé podmínky se vloží do kulatých závorek a spojí se buďto & (v případě "and" logiky), anebo | (v případě "or" logiky).

In [40]:
table_from_csv.filter(
    (f.col("last_name")=="Shakespear")
    | (f.col("first_name")=="Mary")
).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|200|      Mary|    Shelly|30.0|     F|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



In [41]:
table_from_csv.filter(
    (f.col("gender")=="M")
    & (f.col("age")>=30)
).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Pro vybrání řádků s null hodnotami použijeme metodu *isNull* aplikovanou na f.col("jmeno_sloupce"). Podobný modus operandi má metoda *isNotNull*.

In [51]:
table_from_csv.filter(f.col("age").isNull()).show()

+---+----------+---------+----+------+
| id|first_name|last_name| age|gender|
+---+----------+---------+----+------+
|400|    Albert|    Camus|null|     M|
+---+----------+---------+----+------+



In [52]:
table_from_csv.filter(f.col("age").isNotNull()).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Co máme dělat, když chceme selectování a filtrování provést najednou? Zkrátka selecty a filtery napíšeme za sebe. Abychom neměli jeden gigantický řádek, celý příkaz obalíme do kulatých závorek.

In [7]:
(
  table_from_csv
  .filter(f.col("last_name")=="Shakespear")
  .select("first_name", "last_name")
  .show()
)

+----------+----------+
|first_name| last_name|
+----------+----------+
|   William|Shakespear|
+----------+----------+



Pro seřazení řádků podle hodnot v určitém sloupci použijeme *sort*.

In [47]:
table_from_csv.sort("age").show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|400|    Albert|     Camus|null|     M|
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|500|   William|Shakespear|38.0|     M|
|300|    Johann|    Geothe|75.0|     M|
+---+----------+----------+----+------+



Pokud chceme sestupné řazení, přidáme do *sort* parametr *ascending* s hodnotou False.

In [49]:
table_from_csv.sort("age", ascending=False).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|100|    Victor|      Hugo|25.0|     M|
|400|    Albert|     Camus|null|     M|
+---+----------+----------+----+------+



Pokud chceme vidět unikátní hodnoty z určitého sloupce, napřed onen sloupec selectem vybereme a následně uplatníme metodu *distinct*.

In [50]:
table_from_csv.select("gender").distinct().show()

+------+
|gender|
+------+
|     F|
|     M|
+------+



Pro vyhození řádků s nully se použije konstrukce *na.drop*:

In [4]:
table_from_csv.na.drop().show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Naopak pro nahrazení nullů nějakou hodotou použijeme *na.fill*. Té předáme parametr *value* říkající, čím se mají nully nahradit, a parametr *subset* specifikující, na kterých sloupcích má nahrazování probíhat.

In [6]:
table_from_csv.na.fill(value="?", subset=["age"]).show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|   ?|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



Pro agregované statistiky napřed data zhlukneme do tříd s pomocí *groupBy*. Jako parametr se uvede jméno sloupce, podle kterého grupování proběhne. Následně do metody *agg* vložíme metody (obvykle z *pyspark.sql.functions*) statistiky napočítávající. Pokud se nám nelíbí defaultní jména takto vyprodukovaných sloupců, použijeme metodu *alias*.

In [20]:
table_from_csv.groupBy("gender").agg(
  f.count("*"),
  f.min("age"),
  f.max("age"),
  f.avg("id").alias("nonsensical_aggregation")
).show()

+------+--------+--------+--------+-----------------------+
|gender|count(1)|min(age)|max(age)|nonsensical_aggregation|
+------+--------+--------+--------+-----------------------+
|     F|       1|    30.0|    30.0|                  200.0|
|     M|       4|    25.0|    75.0|                  325.0|
+------+--------+--------+--------+-----------------------+



Pro najoinování dvou dataframů na sebe použijeme metodu *join* zavolanou na jednom z nich. Proměnná s druhým dataframem tvoří první parametr metody *join*. Druhý parametr,  *on*, specifikuje joinovací sloupec. Pokud se ten v obou tabulkách jmenuje stejně, má formu obyčejného stringu.

In [14]:
books_list = [
    [100, 5],
    [300, 7],
    [400, 9]
]
books_frame = spark.createDataFrame(
    books_list,
    schema=["id", "books_count"]
)

joined_frame = table_from_csv.join(books_frame, on="id")
joined_frame.show()

+---+----------+---------+----+------+-----------+
| id|first_name|last_name| age|gender|books_count|
+---+----------+---------+----+------+-----------+
|100|    Victor|     Hugo|25.0|     M|          5|
|300|    Johann|   Geothe|75.0|     M|          7|
|400|    Albert|    Camus|null|     M|          9|
+---+----------+---------+----+------+-----------+



Pokud se ale joinovací sloupec v dataframech jmenuje odlišně, obsahuje *on* porovnávání.

In [15]:
books_list = [
    [100, 5],
    [300, 7],
    [400, 9]
]
books_frame = spark.createDataFrame(
    books_list,
    schema=["some_id", "books_count"]
)

joined_frame = table_from_csv.join(
    books_frame, 
    on=table_from_csv["id"]==books_frame["some_id"]
)
joined_frame.show()

+---+----------+---------+----+------+-------+-----------+
| id|first_name|last_name| age|gender|some_id|books_count|
+---+----------+---------+----+------+-------+-----------+
|100|    Victor|     Hugo|25.0|     M|    100|          5|
|300|    Johann|   Geothe|75.0|     M|    300|          7|
|400|    Albert|    Camus|null|     M|    400|          9|
+---+----------+---------+----+------+-------+-----------+



Pakliže chceme jiný typ joinu než inner join, musíme to specifikovat do parametru *how*.

In [18]:
books_list = [
    [100, 5],
    [300, 7],
    [400, 9]
]
books_frame = spark.createDataFrame(
    books_list,
    schema=["id", "books_count"]
)

joined_frame = table_from_csv.join(books_frame, on="id", how="left")
joined_frame.show()

+---+----------+----------+----+------+-----------+
| id|first_name| last_name| age|gender|books_count|
+---+----------+----------+----+------+-----------+
|500|   William|Shakespear|38.0|     M|       null|
|100|    Victor|      Hugo|25.0|     M|          5|
|200|      Mary|    Shelly|30.0|     F|       null|
|400|    Albert|     Camus|null|     M|          9|
|300|    Johann|    Geothe|75.0|     M|          7|
+---+----------+----------+----+------+-----------+



#### PySpark a datumy
Dost často člověk potřebuje pracovat s datumovým datovým typem. Jelikož kvůli tomu se obvykle musí dělat konverze a i počítání rozdílů mezi dvěma dny nebývá úplně zjevné, věnujme této problematice trochu času.  
Nejprve si vytvoříme dataframe. Všimněte si, že ačkoli druhý sloupec obsahuje de facto datumy, pro Spark jsou to stringy.

In [43]:
dates_data = [
    [1,"20.12.2021"],
    [2,"05.01.2022"],
    [3,"08.01.2022"],
    [4,"12.02.2022"]
    
]
dates_frame = spark.createDataFrame(dates_data, ["number","date"])
dates_frame.printSchema()

root
 |-- number: long (nullable = true)
 |-- date: string (nullable = true)



Konverzi provedeme s pomocí *functions.to_date*. První parametrem je konvertovaný sloupec, druhý pak obsahuje předpis popisující původní data. Význam jednotlivých písmen, které se v něm mohou objevit, nalezneme [zde](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html).

In [44]:
from pyspark.sql import functions as f


dates_frame = dates_frame.withColumn(
    "date_converted",
     f.to_date(f.col("date"), "dd.MM.yyyy")
)

dates_frame.printSchema()
dates_frame.show()

root
 |-- number: long (nullable = true)
 |-- date: string (nullable = true)
 |-- date_converted: date (nullable = true)

+------+----------+--------------+
|number|      date|date_converted|
+------+----------+--------------+
|     1|20.12.2021|    2021-12-20|
|     2|05.01.2022|    2022-01-05|
|     3|08.01.2022|    2022-01-08|
|     4|12.02.2022|    2022-02-12|
+------+----------+--------------+



Aktuální datum získáme s pomocí *functions.current_date*.

In [45]:
dates_frame = dates_frame.withColumn(
    "current_date",
     f.current_date()
)

dates_frame.printSchema()
dates_frame.show()

root
 |-- number: long (nullable = true)
 |-- date: string (nullable = true)
 |-- date_converted: date (nullable = true)
 |-- current_date: date (nullable = false)

+------+----------+--------------+------------+
|number|      date|date_converted|current_date|
+------+----------+--------------+------------+
|     1|20.12.2021|    2021-12-20|  2022-06-05|
|     2|05.01.2022|    2022-01-05|  2022-06-05|
|     3|08.01.2022|    2022-01-08|  2022-06-05|
|     4|12.02.2022|    2022-02-12|  2022-06-05|
+------+----------+--------------+------------+



Rozdíl datumů ve dnech spočítáme prostřednictvím *functions.datediff*.

In [46]:
dates_frame.withColumn(
    "current_minus_converted",
     f.datediff(f.col("date_converted"),f.col("current_date"))
).show()

+------+----------+--------------+------------+
|number|      date|date_converted|current_date|
+------+----------+--------------+------------+
|     1|20.12.2021|    2021-12-20|  2022-06-05|
|     2|05.01.2022|    2022-01-05|  2022-06-05|
|     3|08.01.2022|    2022-01-08|  2022-06-05|
|     4|12.02.2022|    2022-02-12|  2022-06-05|
+------+----------+--------------+------------+



Pakliže potřebujeme k datu přičíst či odečíst den či měsíc, sáhneme po *functions.add_months* a *functions.date_add* (obě funkce dokáží přijmout kladné i záporné počty měsíců/dnů).

In [47]:
(
    dates_frame
    .withColumn("added_months", f.add_months(f.col("date_converted"),2))
    .withColumn("substr_months", f.add_months(f.col("date_converted"),-2)) 
    .withColumn("added_days", f.date_add(f.col("date_converted"),15)) 
    .withColumn("substr_days", f.date_add(f.col("date_converted"),-15))
    .show()
)

+------+----------+--------------+------------+------------+-------------+----------+-----------+
|number|      date|date_converted|current_date|added_months|substr_months|added_days|substr_days|
+------+----------+--------------+------------+------------+-------------+----------+-----------+
|     1|20.12.2021|    2021-12-20|  2022-06-05|  2022-02-20|   2021-10-20|2022-01-04| 2021-12-05|
|     2|05.01.2022|    2022-01-05|  2022-06-05|  2022-03-05|   2021-11-05|2022-01-20| 2021-12-21|
|     3|08.01.2022|    2022-01-08|  2022-06-05|  2022-03-08|   2021-11-08|2022-01-23| 2021-12-24|
|     4|12.02.2022|    2022-02-12|  2022-06-05|  2022-04-12|   2021-12-12|2022-02-27| 2022-01-28|
+------+----------+--------------+------------+------------+-------------+----------+-----------+



Existují i funkce na extrakci roku, měsíce, dne atd. z datumu: 

In [50]:
(
    dates_frame
    .withColumn("extracted_year", f.year(f.col("date_converted")))
    .withColumn("extracted_month", f.month(f.col("date_converted"))) 
    .withColumn("extracted_day", f.dayofmonth(f.col("date_converted"))) 
    .withColumn("day_of_year", f.dayofyear(f.col("date_converted")))
    .withColumn("day_of_week", f.dayofweek(f.col("date_converted")))
    .withColumn("week_of_year", f.weekofyear(f.col("date_converted")))
    .show()
)

+------+----------+--------------+------------+--------------+---------------+-------------+-----------+-----------+------------+
|number|      date|date_converted|current_date|extracted_year|extracted_month|extracted_day|day_of_year|day_of_week|week_of_year|
+------+----------+--------------+------------+--------------+---------------+-------------+-----------+-----------+------------+
|     1|20.12.2021|    2021-12-20|  2022-06-05|          2021|             12|           20|        354|          2|          51|
|     2|05.01.2022|    2022-01-05|  2022-06-05|          2022|              1|            5|          5|          4|           1|
|     3|08.01.2022|    2022-01-08|  2022-06-05|          2022|              1|            8|          8|          7|           1|
|     4|12.02.2022|    2022-02-12|  2022-06-05|          2022|              2|           12|         43|          7|           6|
+------+----------+--------------+------------+--------------+---------------+------------

#### Window funkce
Někdy se hodí provádět určité operace nikoli na celém dataframu, ale pouze na jeho části určené buďto hodnotou nějakého sloupce, anebo počtem řádek.  
Nejprve si vytvořme pokusný dataframe:

In [4]:
for_window_list = [
    ["one", 10],
    ["one", 10],
    ["one", 30],
    ["one", 40],
    ["one", 50],
    ["two", 60],
    ["two", 70],
    ["two", 80],
    ["two", 90],
    ["three", 100]
    
]
for_window_frame = spark.createDataFrame(
    for_window_list,
    schema=["word", "number"]
)

První ukázka se bude týkat číslování řádek. Nejprve si vytvoříme definici okna. Budeme chtít pro jednotlivá slova samostatné číslování, proto *partitionBy("word")*. Záznamy v jedné skupině pak chceme setřídit podle hodnoty sloupce "number", proto následuje *orderBy("number")*.   

Čísla řádků budeme chtít umístit do nového sloupce, proto použijeme metodu *withColumn*. Na samotné napočítání vložíme do druhého parametru této sloupec vytvářející metody *functions.row_number().over(window_definition)*.

In [5]:
from pyspark.sql.window import Window
from pyspark.sql import functions as f

window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "row_number",
    f.row_number().over(window_definition)
).show()

+-----+------+----------+
| word|number|row_number|
+-----+------+----------+
|  two|    60|         1|
|  two|    70|         2|
|  two|    80|         3|
|  two|    90|         4|
|  one|    10|         1|
|  one|    10|         2|
|  one|    30|         3|
|  one|    40|         4|
|  one|    50|         5|
|three|   100|         1|
+-----+------+----------+



Pokud bychom chtěli nikoli číslo řádky, ale pořadí čísel ve sloupci "number", použijeme namísto funkce *row_number* funkci *rank*. Rozdíl je patrný pro řádky, kde hodnota sloupce "word" je rovna "one" a hodnota sloupce "number" odpovídá 10. Zatímco u *row_number* měl u sebe jeden řádek jedničku a druhý dvojku, v případě *rank* mají jedničku oba. Následující řádek má ale trojku, tj. pořadí 2 není. 

In [7]:
window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "rank",
    f.rank().over(window_definition)
).show()

+-----+------+-----------+
| word|number|rank_number|
+-----+------+-----------+
|  two|    60|          1|
|  two|    70|          2|
|  two|    80|          3|
|  two|    90|          4|
|  one|    10|          1|
|  one|    10|          1|
|  one|    30|          3|
|  one|    40|          4|
|  one|    50|          5|
|three|   100|          1|
+-----+------+-----------+



Pokud by nám to vadilo, použijeme namísto *rank* funkci *dense_rank*.

In [8]:
window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "dense_rank",
    f.dense_rank().over(window_definition)
).show()

+-----+------+----------+
| word|number|dense_rank|
+-----+------+----------+
|  two|    60|         1|
|  two|    70|         2|
|  two|    80|         3|
|  two|    90|         4|
|  one|    10|         1|
|  one|    10|         1|
|  one|    30|         2|
|  one|    40|         3|
|  one|    50|         4|
|three|   100|         1|
+-----+------+----------+



Pokud potřebujeme mít sloupec, který bude o jednu či více řádek posunutý níže, aplikujeme funkci *lag*. Kde nebude co dát, tam se objeví null. Pokud bychom chtěli mít sloupec posunout opačným směrem (tj. o N řádků výše), použijeme funkci *lead*.

In [13]:
window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "lagged_column",
    f.lag("number", 1).over(window_definition)
).show()

+-----+------+-------------+
| word|number|lagged_column|
+-----+------+-------------+
|  two|    60|         null|
|  two|    70|           60|
|  two|    80|           70|
|  two|    90|           80|
|  one|    10|         null|
|  one|    10|           10|
|  one|    30|           10|
|  one|    40|           30|
|  one|    50|           40|
|three|   100|         null|
+-----+------+-------------+



Pakliže potřebujeme kumulativní sumu, aplikujeme *functions.sum*.

In [14]:
window_definition  = Window.partitionBy("word").orderBy("number")

for_window_frame.withColumn(
    "cumulative_sum",
    f.sum("number").over(window_definition)
).show()

+-----+------+-------------+
| word|number|lagged_column|
+-----+------+-------------+
|  two|    60|           60|
|  two|    70|          130|
|  two|    80|          210|
|  two|    90|          300|
|  one|    10|           20|
|  one|    10|           20|
|  one|    30|           50|
|  one|    40|           90|
|  one|    50|          140|
|three|   100|          100|
+-----+------+-------------+



### SQL přístup
Doposud jsme si ukazovali práci s PySparkem, která se svým modem operandi dosti podobala práci s pandami. Nicméně existuje ještě jeden přístup, který se podobá zacházení s SQL databázemi.  
Nejprve musíme na dataframu provolat metodu *createOrReplaceTempView*. Té předáme jen jeden parametr - jméno onoho dočasného pohledu.

In [7]:
table_from_csv.createOrReplaceTempView("some_sql_table")

Tento pohled můžeme posléze používat v sql dotazech, které vložíme jako parametr do metody *sql*, která je napojená na samotnou pysparkovou sešnu. Výsledkem je další pysparkový dataframe, na který, pokud chceme vidět jeho obsah, musíme napojit metodu *show*.

In [10]:
spark.sql("select * from some_sql_table").show()

+---+----------+----------+----+------+
| id|first_name| last_name| age|gender|
+---+----------+----------+----+------+
|100|    Victor|      Hugo|25.0|     M|
|200|      Mary|    Shelly|30.0|     F|
|300|    Johann|    Geothe|75.0|     M|
|400|    Albert|     Camus|null|     M|
|500|   William|Shakespear|38.0|     M|
+---+----------+----------+----+------+



### Cachování
Při psaní této sekce jsem extenzivně čerpal  [odsud](https://towardsdatascience.com/best-practices-for-caching-in-spark-sql-b22fb0f02d34?gi=407c957f2bcb) - doporučuji přečíst.  
Představme si, že bychom s pomocí selectování a filtrování vyrobili z nějakého dataframu dataframe menší a ten umístili do nové proměnné. Když bychom na této nové tabulce začali provádět nějaké akce, dojde bohužel k tomu, že se vytváření onoho redukovaného dataframu provede pro každou z těchto akcí zvlášť. Aby se zbytečně neplýtvalo výpočetními prostředky, existuje možnost si dataframe nacachovat, tj. uložit do paměti/na harddisk pro budoucí využití.
Cachování se realizuje provoláním dataframové metody *cache* resp. *persist*.

In [5]:
table_from_csv.cache()

DataFrame[id: string, first_name: string, last_name: string, age: string, gender: string]

Rozdíl mezi nimi spočívá jen v tom, že u persistu lze specifikovat, kam přesně budou data uložena. Nicméně ve většině případů si člověk vystačí s defaultním MEMORY_AND_DISK, tj. data se prioritně ukládají do RAMky a když ta dojde, míří zbytek na disk.  
V případě panda-like přístupu je cachování lazy transformace. U SQL přístupu, který má podobu

In [13]:
spark.sql("cache table some_sql_table")

DataFrame[]

to ale neplatí - zde máme co do činění s eager operací, tj. potřebná posloupnost transformací ihned proběhne a umístí data do úložiště. Nicméně toto defaultní chování máme možnost změnit na lazy s použitím odpovídajícího klíčového slova 

In [14]:
spark.sql("cache lazy table some_sql_table")

DataFrame[]

Pro zjištění, zda je dataframe nacachovaný, se použije atribut *is_cached*:

In [14]:
table_from_csv.is_cached

False

Pokud bychom ručně chtěli data z cache odstanit, použijeme

In [13]:
table_from_csv.unpersist()

DataFrame[id: string, first_name: string, last_name: string, age: string, gender: string]

resp. 

In [16]:
spark.sql("uncache table some_sql_table")

DataFrame[]

Předejdeme tak situaci, kdy dojde paměť a Spark samotný začne vyhazovat tabulky na základě míry jejich nepoužívanosti. Navíc čím víc je volné paměti, tím operace probíhají rychleji (není nutné swapovat na disk). Ze stejného důvodu může být nastaven administrátorem PySparkového clusteru timeout (k nalezení v případě Cloudery na XXX) - pokud se na dataframe nesáhlo po dobu delší než je tento časový úsek, je onen dataframe z cache automaticky odstraněn.  
Stejnou motivaci má i rada cachovat nikoli celé dataframy, ale pouze ty jejich části, které potřebujeme.  
Zmiňme nakonec, že ne vždy cachování vede k rychlejšímu běhu programu. Pokud bychom měli relativně velký parquet soubor a prováděli na něm filtrování, asi tato operace odběhne rychleji, než kdyby se musel napřed celý načíst do paměti (nebo dokonce swapovat na disk).

In [21]:
spark.stop()