# Lab 7 - PySpark i SQL, wiaderkowanie i partycjonowanie plików oraz zapis w hurtowni danych Hive.

In [1]:
# dla instalacji lokalnej

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_HOME'] = sys.executable

In [2]:
from pyspark.sql import SparkSession

# ścieżka do bazy danych hurtowni danych oraz plików
# należy dostosować do ścieżki względnej, w której umieszczony został bieżący notebook
warehouse_location = './data/metastore'

# utworzenie sesji Spark, ze wskazaniem włączenia obsługi Hive oraz
# lokalizacją przechowywania hurtowni danych
spark = SparkSession\
        .builder\
        .master("local[2]")\
        .appName("Apache SQL and Hive")\
        .config("spark.memory.offHeap.enabled","true")\
        .config("spark.memory.offHeap.size","4g")\
        .config("spark.executor.memory", "2g") \
        .config("spark.driver.memory", "1g") \
        .config("spark.driver.host", "localhost")\
        .enableHiveSupport()\
        .config("spark.sql.warehouse.dir", warehouse_location)\
        .getOrCreate()

In [3]:
spark.sparkContext

## 1. Spark i SQL

Spark umożliwia zarejestrowanie obiektu DataFrame jako widoku, co umożliwia korzystanie z niego w sposób bardzo zbliżony do pracy z językiem SQL. Poniżej przykład.

In [4]:
# dostosuj ścieżkę do pliku do swoich danych, tutaj został utworzony mniejszy zbiór niż w poprzednim labie
df = spark.read.csv('./data/employee.csv', header=True, inferSchema=True)

In [6]:
# tworzymy widok tymczasowy w pamięci węzła
df.createOrReplaceTempView("EMPLOYEE_DATA")

In [7]:
# wypisanie tabeli, zwróć uwagę na to, czy stworzona tabela jest tymczasowa czy trwała
spark.catalog.listTables()

[Table(name='EMPLOYEE_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [8]:
# pobranie danych jak z tabeli SQL
spark.sql("Select * from EMPLOYEE_DATA limit 4").show()
spark.sql("select firstname from EMPLOYEE_DATA").show(10)

+---+---------+-------------------+---+--------+
| id|firstname|           lastname|age|  salary|
+---+---------+-------------------+---+--------+
|  1| Zbigniew|           Barański| 46| 8797.82|
|  2|Krzysztof|           Kowalski| 33| 7441.71|
|  3|Krzysztof|Brzęczyszczykiewicz| 60| 8502.79|
|  4|     Adam|         Wróblewski| 36|10258.55|
+---+---------+-------------------+---+--------+

+----------+
| firstname|
+----------+
|  Zbigniew|
| Krzysztof|
| Krzysztof|
|      Adam|
|   Wisława|
|Aleksandra|
|     Agata|
| Krzysztof|
|Mieczysław|
| Krzysztof|
+----------+
only showing top 10 rows



In [9]:
spark.sql("select firstname, count(firstname), avg(salary) from EMPLOYEE_DATA group by firstname").show()

+----------+----------------+-----------------+
| firstname|count(firstname)|      avg(salary)|
+----------+----------------+-----------------+
|   Wisława|         2000553|7849.951081895842|
|Mieczysław|         2000421|7849.003746451405|
|     Agata|         2001261|7849.029105189236|
| Krzysztof|         1999791|7850.056622832048|
|     Marek|         2000754|7851.120718589093|
|      Adam|         1997466|7849.711405380585|
| Katarzyna|         2001252|  7850.3247199353|
|  Wojciech|         2000286|7851.790970511204|
|  Zbigniew|         1999992|7850.794013165997|
|Aleksandra|         1998224|7849.870652984829|
+----------+----------------+-----------------+



In [11]:
the_raise = 0.1 # 10% podwyżki
spark.sql(f"select firstname, lastname, salary, round(salary + salary * {the_raise},2) as with_raise from EMPLOYEE_DATA").show(5)

+---------+-------------------+--------+----------+
|firstname|           lastname|  salary|with_raise|
+---------+-------------------+--------+----------+
| Zbigniew|           Barański| 8797.82|    9677.6|
|Krzysztof|           Kowalski| 7441.71|   8185.88|
|Krzysztof|Brzęczyszczykiewicz| 8502.79|   9353.07|
|     Adam|         Wróblewski|10258.55|   11284.4|
|  Wisława|           Barański|  9006.9|   9907.59|
+---------+-------------------+--------+----------+
only showing top 5 rows



## 2. Apache Hive

https://hive.apache.org/


Apache Hive, który pierwotnie został stworzony w 2007 przez Facebooka, a następnie w 2008 przekazany pod skrzydła Apache Foundation, jest nazywany hurtownią danych. Dane przechowywane są głównie w systemie **HDFS** (**Hadoop Distributed File System**), ale Hive integruje się również z innymi silnikami baz danych.

Dostęp do danych jest realizowany przez **Hive QL**, który bardzo przypomina język SQL i taki sposób obsługi różnorodnych danych był jedną z głównych motywacji powstania Hive.

Za pomocą zapytań Hive QL (HQL) można wykonać takie zapytania jak:
* tworzenie i zmiana struktur tabel,
* import i export danych,
* agregacja danych, filtrowanie i złączenia danych.

Apache Hive jest wykorzystywany w dużych ekosystemach i mimo wymienionych wyżej zalet posiada również kilka ograniczeń:
* opóźnienie w czasie przetwarzania ze zwględu na wsadową naturę przetwarzania,
* brak możliwości przetwarzania real-time,
* język HQL nie daje możliwości wykonania takich operacji jak modyfikacja danych na poziomie wiersza,
* brak możliwości przeprowadzenia zaawansowanych analiz jak współczesne nowoczesne bazy SQL.

Alternatywne technologie:

* Presto
* Snowflake
* Apache Impala
* IBM Db2
* Google BigQuery
* Amazon Redshift
* ClickHouse
* Apache Hadoop
* Apache HBase
* Oracle Exadata
* Teradata Vantage
* Cloudera Impala

### 2.1 Hive QL

> Dokumentacja Apache Hive QL (dość archaiczna) jest dostępna pod adresem: https://cwiki.apache.org/confluence/display/Hive/LanguageManual

In [12]:
spark.catalog.currentCatalog()

'spark_catalog'

In [13]:
# dla zrealizowania kolejnych przykładów dokonamy kilku modyfikacji pliku employee
# 1. dodanie kolumny ID - indeksu
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("ID", monotonically_increasing_id())

In [14]:
df.show(10)

+---+----------+-------------------+---+--------+
| ID| firstname|           lastname|age|  salary|
+---+----------+-------------------+---+--------+
|  0|  Zbigniew|           Barański| 46| 8797.82|
|  1| Krzysztof|           Kowalski| 33| 7441.71|
|  2| Krzysztof|Brzęczyszczykiewicz| 60| 8502.79|
|  3|      Adam|         Wróblewski| 36|10258.55|
|  4|   Wisława|           Barański| 43|  9006.9|
|  5|Aleksandra|         Wróblewski| 38| 8796.75|
|  6|     Agata|               Glut| 64| 9252.93|
|  7| Krzysztof|             Wlotka| 58| 8470.38|
|  8|Mieczysław|              Pysla| 51|10216.48|
|  9| Krzysztof|Brzęczyszczykiewicz| 48| 7853.63|
+---+----------+-------------------+---+--------+
only showing top 10 rows



In [15]:
# dokonamy podziału danych i zapisania w różnych formatach
splits = df.randomSplit(weights=[0.3, 0.7], seed=19)

In [16]:
splits[0].count(), splits[1].count()

(5998745, 14001255)

In [None]:
# to dość dziwne zjawisko niezbyt równego podziału danych jest opisane w artykułach:
# https://medium.com/udemy-engineering/pyspark-under-the-hood-randomsplit-and-sample-inconsistencies-examined-7c6ec62644bc
# oraz
# https://www.geeksforgeeks.org/pyspark-randomsplit-and-sample-methods/

In [17]:
# większa część trafi do nowej tymczasowej tabeli
splits[1].createOrReplaceTempView("EMPLOYEE_DATA_SPLIT_1")

In [18]:
spark.catalog.listTables()

[Table(name='EMPLOYEE_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='EMPLOYEE_DATA_SPLIT_1', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [19]:
# a mniejsza do plików JSON
splits[0].write.json('./data/json/employee', mode='overwrite')

In [20]:
!ls ./data/json/employee/*.json

./data/json/employee/part-00000-a64d7312-c16a-4c93-b286-0301d9ceda44-c000.json
./data/json/employee/part-00001-a64d7312-c16a-4c93-b286-0301d9ceda44-c000.json
./data/json/employee/part-00002-a64d7312-c16a-4c93-b286-0301d9ceda44-c000.json
./data/json/employee/part-00003-a64d7312-c16a-4c93-b286-0301d9ceda44-c000.json
./data/json/employee/part-00004-a64d7312-c16a-4c93-b286-0301d9ceda44-c000.json
./data/json/employee/part-00005-a64d7312-c16a-4c93-b286-0301d9ceda44-c000.json


In [21]:
# aby móc wykorzystać dane w przykładach ze złączaniem, zapiszemy jeszcze próbkę danych z głównej ramki
# z identyfikatorami oraz dodatkową kolumną z podwyżką
from pyspark.sql.functions import col, lit, round

lucky_guys = spark.sql("select * from EMPLOYEE_DATA").sample(0.01)\
.withColumn('raise', lit('10%')).withColumn('salary after raise', round(col('salary') * 1.1, 2))

In [22]:
# zapisujemy szczęściarzy do oddzielnej tabeli w hurtowni
lucky_guys.write.mode('overwrite').saveAsTable("lucky_employees", format='parquet')

In [23]:
spark.catalog.listTables()

[Table(name='lucky_employees', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='EMPLOYEE_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='EMPLOYEE_DATA_SPLIT_1', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

#### Złączenie danych z różnych źródeł

In [24]:
!ls ./data/metastore/lucky_employees/*.parquet

./data/metastore/lucky_employees/part-00000-a4cbca38-d881-4932-8d1a-0c6080487897-c000.snappy.parquet
./data/metastore/lucky_employees/part-00001-a4cbca38-d881-4932-8d1a-0c6080487897-c000.snappy.parquet
./data/metastore/lucky_employees/part-00002-a4cbca38-d881-4932-8d1a-0c6080487897-c000.snappy.parquet
./data/metastore/lucky_employees/part-00003-a4cbca38-d881-4932-8d1a-0c6080487897-c000.snappy.parquet
./data/metastore/lucky_employees/part-00004-a4cbca38-d881-4932-8d1a-0c6080487897-c000.snappy.parquet
./data/metastore/lucky_employees/part-00005-a4cbca38-d881-4932-8d1a-0c6080487897-c000.snappy.parquet


In [26]:
# przykład złączania danych na różnych źródłach
# zapytanie SQL bezpośrednio na plikach - tutaj zapisanych wcześniej JSON-ach oraz parquet
query = """
SELECT ed.ID, ed.firstname, ed.lastname, ed.salary, lucky.raise, lucky.`salary after raise`
FROM json.`./data/json/employee/` as jtable 
join EMPLOYEE_DATA ed on jtable.ID=ed.ID 
join parquet.`./data/metastore/lucky_employees/` as lucky on ed.ID=lucky.ID
"""
df_from_json = spark.sql(query).show(10)

+-----+----------+-------------------+-------+-----+------------------+
|   ID| firstname|           lastname| salary|raise|salary after raise|
+-----+----------+-------------------+-------+-----+------------------+
| 2764|      Adam|              Pysla|8391.06|  10%|           9230.17|
| 8037|  Zbigniew|             Wlotka| 6483.3|  10%|           7131.63|
|11574|Mieczysław|              Pysla|8753.58|  10%|           9628.94|
|12300|  Wojciech|               Glut|9688.33|  10%|          10657.16|
|20584|   Wisława|           Kowalski|8093.44|  10%|           8902.78|
|22784| Krzysztof|           Kowalski|6608.03|  10%|           7268.83|
|26282|     Marek|              Pysla|8264.48|  10%|           9090.93|
|27120|     Marek|             Szczaw|8055.54|  10%|           8861.09|
|31326|  Wojciech|Brzęczyszczykiewicz|7976.06|  10%|           8773.67|
|32890| Katarzyna|           Barański|8956.59|  10%|           9852.25|
+-----+----------+-------------------+-------+-----+------------

#### Dzielenie danych na wiaderka (ang. buckets) i partycje

Dzielenie danych na wiaderka jest rozwiązaniem, które stosowane jest do podziału danych na mniejsze części w sposób, który może przyspieszyć obliczenia poprzez zredukowanie liczby operacji przetasowania danych (ang. shuffle, a w kontekście Sparka mówimy o operacji exchange), które są bardzo kosztowne, gdyż wykonywane są często między węzłami (workerami).

In [27]:
# ten przykład pokazuje podział na 16 wiaderek danych bazując na podziale po kolumnie ID (tu używane jest hashowanie)
# dane posortowane są w każdym buckecie po kolumnie salary
# dane zapisywane są do hurtowni Hive, a informacje o zapisanych tam tabelach przechowywane są w
# Hive metastore (domyślnie jest do baza danych Derby)
df.write.bucketBy(16, 'ID').mode('overwrite').sortBy('salary').saveAsTable('employee_id_bucketed')

In [28]:
!ls ./data/metastore/employee_id_bucketed/*.parquet

./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00000.c000.snappy.parquet
./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00001.c000.snappy.parquet
./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00002.c000.snappy.parquet
./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00003.c000.snappy.parquet
./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00004.c000.snappy.parquet
./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00005.c000.snappy.parquet
./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00006.c000.snappy.parquet
./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00007.c000.snappy.parquet
./data/metastore/employee_id_bucketed/part-00000-2626a2cb-d44d-4e75-855e-7fc33daf774d_00008.c000.snappy.

In [31]:
spark.table('employee_id_bucketed').show(10)

+-------+---------+-------------------+---+-------+
|     ID|firstname|           lastname|age| salary|
+-------+---------+-------------------+---+-------+
|2422785|     Adam|Brzęczyszczykiewicz| 55|3121.65|
|1580567|Krzysztof|Brzęczyszczykiewicz| 41|3547.67|
|3397177|    Marek|Brzęczyszczykiewicz| 20|3630.98|
| 974730| Zbigniew|       Mieczykowski| 59|3640.82|
|1505525|    Agata|         Malinowski| 62|3681.82|
| 153754|Krzysztof|       Mieczykowski| 27|3736.18|
|2775459|  Wisława|Brzęczyszczykiewicz| 65|3755.73|
|1648207|     Adam|       Mieczykowski| 38|3765.32|
|1296355|    Marek|           Barański| 25|3783.08|
|2188892|     Adam|       Mieczykowski| 18|3785.55|
+-------+---------+-------------------+---+-------+
only showing top 10 rows



In [32]:
# wypisanie tabeli
spark.catalog.listTables()

[Table(name='employee_id_bucketed', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='lucky_employees', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='EMPLOYEE_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='EMPLOYEE_DATA_SPLIT_1', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [33]:
# usunięcie tabeli
spark.sql('DROP TABLE employee_id_bucketed')

DataFrame[]

In [34]:
# wypisanie tabeli
spark.catalog.listTables()

[Table(name='lucky_employees', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='EMPLOYEE_DATA', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='EMPLOYEE_DATA_SPLIT_1', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [35]:
# jeżeli dane, z którymi pracujemy zawierają stosunkowo niewiele różnorodnych wartości w danych kolumnach
# lub filtrowanie i obliczenia często odbywają się na podgrupach danych to lepsze efekty uzyskamy
# poprzez wykorzystanie możliwości partycjonowania tych danych, które to partycjonowanie
# będzie również odzwierciedlone w fizycznej strukturze plików na dysku twardym w hurtowni danych

# zobaczmy przykład poniżej

df.write.partitionBy("lastname").mode('overwrite').saveAsTable("employees_partitioned_lastname")

In [36]:
# dobrym pomysłem jest też określenie ilości bucketów wynikających z danych w konkretnej kolumnie
# i wykorzystanie do podziału
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.bucketBy.html
buckets = spark.sql("select distinct firstname from EMPLOYEE_DATA").count()
buckets

10

In [37]:
# widok danych podzielonych na partycję z punktu widzenia systemu plików
!ls ./data/metastore/employees_partitioned_lastname

_SUCCESS
lastname=BaraĹ„ski
lastname=BrzÄ™czyszczykiewicz
lastname=Glut
lastname=Kowalski
lastname=Malinowski
lastname=Mieczykowski
lastname=Pysla
lastname=Szczaw
lastname=Wlotka
lastname=WrĂłblewski


In [38]:
df.filter(df.lastname == 'Pysla').groupby('lastname').agg({'salary': 'avg'}).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#19], functions=[avg(salary#21)])
   +- Exchange hashpartitioning(lastname#19, 200), ENSURE_REQUIREMENTS, [plan_id=820]
      +- HashAggregate(keys=[lastname#19], functions=[partial_avg(salary#21)])
         +- Filter (isnotnull(lastname#19) AND (lastname#19 = Pysla))
            +- FileScan csv [lastname#19,salary#21] Batched: false, DataFilters: [isnotnull(lastname#19), (lastname#19 = Pysla)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/Krzysztof/__projects/__pyspark_local_aiids/data/employe..., PartitionFilters: [], PushedFilters: [IsNotNull(lastname), EqualTo(lastname,Pysla)], ReadSchema: struct<lastname:string,salary:double>




In [39]:
%%time
df.filter(df.lastname == 'Pysla').groupby('lastname').agg({'salary': 'avg'}).show(10)

+--------+-----------------+
|lastname|      avg(salary)|
+--------+-----------------+
|   Pysla|7848.517332754915|
+--------+-----------------+

CPU times: total: 0 ns
Wall time: 10.8 s


In [40]:
spark.sql("select lastname, avg(salary) from employees_partitioned_lastname where lastname='Pysla' group by lastname").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#684], functions=[avg(salary#683)])
   +- Exchange hashpartitioning(lastname#684, 200), ENSURE_REQUIREMENTS, [plan_id=886]
      +- HashAggregate(keys=[lastname#684], functions=[partial_avg(salary#683)])
         +- FileScan parquet spark_catalog.default.employees_partitioned_lastname[salary#683,lastname#684] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/Krzysztof/__projects/__pyspark_local_aiids/data/metasto..., PartitionFilters: [isnotnull(lastname#684), (lastname#684 = Pysla)], PushedFilters: [], ReadSchema: struct<salary:double>




In [41]:
%%time
spark.sql("select lastname, avg(salary) from employees_partitioned_lastname where lastname='Pysla' group by lastname").show(10)

+--------+-----------------+
|lastname|      avg(salary)|
+--------+-----------------+
|   Pysla|7848.517332754809|
+--------+-----------------+

CPU times: total: 15.6 ms
Wall time: 317 ms


Jak widać, operacja wykonała się szybciej.

In [7]:
spark.sparkContext.stop()

### Zadania

**Zadanie 1**  
Pamiętacie plik zamówienia.txt ?
Plik został umieszczony w folderze z labem w repozytorium.

Wczytaj ten plik za pomocą Sparka do dowolnego typu danych (RDD, Spark DataFrame) i dokonaj transformacji tak aby:
* naprawić problemy z kodowaniem znaków (replace?) w kolumnie Sprzedawca
* poprawić format danych w kolumnie Utarg
* dodać odpowiednie typy danych
* kolumna idZamowienia powinna być traktowana jako klucz (indeks)

**Zadanie 2**  
Po wykonaniu zadania 1, wykorzystaj przykłady z laboratorium i:
* 2.1 wykonaj wiaderkowanie danych i wykonaj dowolne zapytanie agregujące na tych danych vs. dane niepodzielone na wiaderka - porównaj czas
* 2.2 wykonaj partycjonowanie danych i zapisz je w formcie csv (wypróbuj partycjonowanie wg. kraju, nazwiska
* 2.3 wykonaj zapytanie agregujące z filtrowanie po kolumnie, której użyłeś/-aś do partycjonowania na danych oryginalnych oraz partycjonowanych i porównaj czas wykonania

**Zadanie 3**  
Z danych wygeneruj 4 różne podzbiory próbek (wiersze wybrane losowo) i dodaj nową kolumnę w każdym z nich, np. w jednym stwórz kolumnę month wyciągając tylko miesiąc z daty, w drugim wartość netto zamówienia (przyjmując, że vat to 23%), w kolejnym zamień nazwisko na wielkie litery, w kolejnym dodaj kolumnę waluta z wartością PLN.

Następnie zapisz każdy z tych zbiorów tak, że:
* zbiór pierwszy to będzie tymczasowa tabela in-memory Sparka
* zbiór drugi to plik(i) parquet
* zbiór trzeci to plik(i) csv
* zbiór czwarty to plik(i) json

Wykonaj zapytanie złączające jak w przykładzie pobierając dane bezpośrednio z plików i wyświetl idZamowienia, Kraj, Sprzedawcę, Datę, Utarg oraz 4 nowo utworzone kolumny.
