# Hurtownie Danych i Big Data
## Laboratorium 13 - Uruchomienie przykładowych programów dla danych masywnych

Celem laboratoriów jest zapoznanie się z **koncepcją RDD** oraz podstawowymi rodzajami operacjami - _transformacjami_ i _akcjami_.

### Agenda
1. Czym jest RDD?
2. Tworzenie RDD (`parallelize()`, `textFile()`)
3. Podstawowe operacje (`filter()`, `map()`, `collect()`)
4. Próbkowanie (`sample()`, `takeSample()`)
5. Operacje na zbiorach (`union()`, `intersect()`, `subtract()`, `distinct()`, `cartesian()`)
6. Agregacje (`reduce()`, `fold()`, `aggregate()`)
7. Operacje klucz/wartość

### 1. Czym jest RDD?
_Resilient Distributed Datasets_ (RDD) są podstawową abstrakcją służącą do przechowywania danych we frameworku Apache Spark. Ich wykorzystanie gwarantuje przede wszystkim poprawność obsługiwania błędów (ang. _fault tolerant_) w przypadku awarii lub błędów jednego z węzłów. W praktyce wejściowy zbiór danych dzielony jest na mniejsze partycje, z których każda jest stała i niemodyfikowalna (ang. _immutable_). Spark pamięta kolejność wykonywania poszczególnych operacji na wejściowym zbiorze danych i jest w stanie dynamicznie odtworzyć każdą partycję danych (tworząc w pamięci graf operacji, tzw. _DAG_).

#### Dotychczasowe podejście - Hadoop MapReduce
MapReduce jest popularnym podejściem do radzenia sobie z przetwarzaniem dużej ilości danych. Z biegiem czasu powstało wiele wzorców projektowych usprawniających cały proces. Największym **problemem jest jednak sekwencyjne łączenie kolejnych zadań**, np. filtrowanie -> analiza, (tzw. *job-chaining*), które jest mało efektywne.

Wąskim gardłem okazują się być szczególnie:
- odczyt/zapis danych na dysku `I/O` (Hadoop po każdym zadaniu `"job"` zapisuje dane w zewnętrznym systemie plików (zazwyczaj jest to HDFS). Następnie, przy kolejnym zadaniu te dane są ponownie odczytywane co jest dosyć kosztowe).
- transfer/replikacja danych na inne węzły (`network I/O`),
- serializacja obiektów (`CPU`)

> Szacuje się, że ok **90%** czasu zadania przeznaczane jest na operacje I/O.

![Iterative operations on MapReduce](https://www.tutorialspoint.com/apache_spark/images/iterative_operations_on_mapreduce.jpg)

#### Jak Spark może pomóc?
Apache Spark rozwiązuje ten problem wykorzystujac koncepcję przechowywanie danych między poszczególnymi zadaniami w pamięci RAM poszczególnych maszyn. W ten sposób możliwe jest **przyspieszenie operacji 10-100x** (zredukowane użycie dysku oraz sieci).

> Dopiero w przypadku wyczerpania pamięci RAM dane będą zapisywane na dysku (domyślne ustawienie).

![Iterative operations on Apache Spark](https://www.tutorialspoint.com/apache_spark/images/iterative_operations_on_spark_rdd.jpg)

#### Cechy RDD
- niemodyfikowalna (`immutable` / `read-only`) kolekcja dowolnych obiektów (pożądana cecha w przypadku rozproszonego przetwarzania danych, skutecznie redukuje ryzyko związane z niespójnością danych),
- dzielenie kolekcji na części (`partitioning`) i ich efektywna dystrybucja w klastrze (rozbudowane możliwości personalizacji),
- odporność na błędy (`fault-tolerant`) - zapamiętanie sekwencji operacji (DAG) umożliwiające późniejsze odtworzenie stanu dowolnej partycji

### 2. Tworzenie RDD

RDD można utworzyć na 2 sposoby:
- na podstawie aktualnej kolekcji (metoda `parallelize()`) - w sterowniku aplikacji (`driver`),
- na podstawie zewnętrznych danych (np. HDFS, S3, HBase, ...)

Poniżej przedstawiono przykład pierwszego sposobu.

Zdefiniujmy listę (podstawowa kolekcja danych w Python-ie) ze 100 elementami będącymi kolejnymi liczbami naturalnymi.

In [0]:
a = range(0, 100)

Transformacja do RDD odbywa się za pomocą funkcji `parallelize()` wykonanej na obiekcie kontekstu - `sc`. Ponieważ bezpośrednie odwołanie do tego obiektu jest odradzane (zaszłość z wcześniejszych wersji frameworka utrzymywana w celu zapewnienia wstecznej kompatybilności), skorzystamy ze zmiennej `spark` która agreguje w sobie ww. kontekst jako atrybut.

In [0]:
# Poniższe wywołanie jest także poprawne jednak odradzane
# a_rdd = sc.parallelize(a)

a_rdd = spark.sparkContext.parallelize(a)

Następnie możemy wywołać naszą pierwszą **akcję** - `count()` - zwracającą ilości elementów w kolekcji.

> Wywołanie **akcji** sygnalizuje rozpoczęcie procesu przetwarzania danych. Do tego momentu wszystkie **operacje** konstruują graf przetwarzania danych - ale **nie uruchamiają** faktycznych obliczeń. Dopiero wywołanie akcji w sterowniku daje sygnał do rozpoczęcia przetwarzania, wysłania danych na osobne maszyny, a następnie ich wtórną agregację.

In [0]:
# Wywołanie akcji
a_rdd.count()

Metoda `count` jest jedną z wielu innych dostępnych akcji. Ten [link](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) zawiera odnośnik do dokumentacji przedstawiającej wszystkie dostępne metody w aktualnej wersji.

Framework liczy elementy dla każdej partycji zlokalizowanej na każdym węźle osobno, a następnie przesyła cząstkowe wyniki do sterownika, gdzie są one ostatecznie sumowane w celu podania ostatecznego wyniku.

Innym przykładem akcji jest metoda `collect()`. Przesyła ona elementy przetworzone elementy RDD z powrotem do sterownika.

W naszym przykładzie dane są małe i jej wywołanie nie stanowi problemu. Jednak w przypadku operacji na "dużych danych", przekierowanie takiej ilości do jednej maszyny prawdopodobnie nie jest dobrym pomysłem. Należy wcześniej oszczacować ilość przesłanych danych - inaczej prawdopodobnie przepełnimy pamięć sterownika.

Lepszym sposobem jest np. przesłanie kilku pierwszych elementów korzystając z akcji `take(num)`, gdzie `num` jest liczbą próbek.

In [0]:
# Należy być ostrożnym. `collect()` zwróci wszystkie elementy
# a_rdd.collect()

# Natomiast `take(5)` zwróci 5 pierwszych elementów z RDD 
a_rdd.take(5)

In [0]:
# W przypadku losowych próbek możemy wykorzystać `takeSample()`, gdzie pierwszy argument określa czy wylosowana liczba może trafić z powrotem do puli

a_rdd.takeSample(False, 5)

Załadujmy teraz nieco ciekawszy zbiór danych - *KDD Cup 1999*. Opisuje on około pół miliona sieciowych interakcji podzielonych na *"normalny"* oraz *"podejrzany"* ruch. Dokładny opis poszczególnych pól znajduje się [tutaj](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99).

Jest to typowy plik CSV, który po rozpakowaniu zajmuje ok. 75 MB. Każda linia pliku zawiera osobny wiersz danych odseparowanych znakiem `,`.

Załadujmy go jako RDD wykorzystując metodę `textFile()`.

> Spark automatycznie radzi sobie ze skompresowanymi plikami. Jednak w tym przypadku zbiór danych nie jest dzielony na partycje (używana jest 1 partycja, więc dane nie będą przetwarzane na wielu maszynach). Dobrą praktyką jest jawne zadeklarowanie partycjonowania - np. poprzez wywołanie operacji `repartition()`.

In [0]:
# Nalezy zmienić tą linię w zależności od lokalizacji pliku z danymi
local_file_path = '/FileStore/tables/kddcup_data_10_percent-d8e1d.gz'

# Tworzenie RDD z pliku
raw_data_rdd = sc.textFile(local_file_path)

Sprawdźmy czy dane zostały poprawnie załadowane wyświetlając całkowitą liczbę rekordów (w tym przypadku linii).

In [0]:
# Wywołanie akcji `count`
raw_data_rdd.count()

Sprawdzmy czy faktycznie, dane nie są podzielone na partycje

In [0]:
# `getNumPartitions()` zwraca liczbę partycji na którą podzielone są dane w RDD
raw_data_rdd.getNumPartitions()

Zwiększymy liczbę partycji do 5. Aby dowiedzieć się więcej jak działa mechanizm dzielenia danych w RDD na partycje zachęcam do zapoznania się z moim publicznym [artykułem](https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0).

In [0]:
raw_data_rdd = raw_data_rdd.repartition(5)
raw_data_rdd.getNumPartitions()

### 3. Podstawowe operacje
Zobaczmy jak wykorzystać 3 podstawowe operacje - `filter()` (transformacja), `map()` (transformacja) oraz `collect()` (akcja).

Zanim przejdziemy do wykonywania operacji dobrą praktyką jest zapoznanie się z danymi.

Rzućmy okiem na pierwsze dwa rekordy.

In [0]:
raw_data_rdd.take(2)

Wynikiem jest tablica zawierająca 2 elementy które są ciągami znaków, większość danych jest numeryczna, ostatnie pole określa typ interakcji (tutaj jest to `normal`).

#### Transformacja `filter()`
Transformacja `filter()` wykonuje zdefiniowaną _funkcję_ określającą czy dany element spełnia określone warunki zwracając nowy RDD.

Niżej przedstawiono przykład pobrania jedynie wierszy zawierających słowo "*normal*".

In [0]:
# Funkcja zwracająca `True` jeśli w danym elemencie znajduje się ciąg znaków "normal"
def is_normal_interaction(row):
  return 'normal' in row

normal_raw_data_rdd = raw_data_rdd.filter(is_normal_interaction)

Python umożliwia zapisanie tej samej funkcji w skondensowanej formie wykorzystując tzw. [wyrażenia lambda](http://stackoverflow.com/questions/890128/why-are-python-lambdas-useful) (anonimowa funkcja). W przypadku krótkich operacji jest to dobra metoda do zwiększenia przejszystości kodu.

In [0]:
# Analogiczny kod, jednak bardziej czytelny
normal_raw_data_rdd = raw_data_rdd.filter(lambda row: 'normal' in row)

Istotnym w tym przypadku jest to, że samo wywołanie funkcji, która jest _transformacją_ **nie powoduje** rozpoczęcia żadnych obliczeń. Spark dopisuje żądanie wykonania danej operacji na danej partycji, a następnie czeka do momentu wywołania pierwszej akcji (która uruchamia cały ciąg przetwarzania).

Możemy łatwo to zrobić wywołując wcześniej poznaną akcję `count()` zliczającą na świeżo utworzonym RDD `normal_raw_data_rdd`.

In [0]:
# Dopiero wywołanie akcji powoduje wykonanie kodu odpowiedzialnego za filtrowanie
normal_raw_data_rdd.count()

Liczba zwróconych rekordów jest mniejsza niż tych w początkowym RDD, co świadczy o tym że funkcja została faktycznie wykonana.

#### Transformacja `map()`
Transformacja `map()` wykonuje zadaną funkcję na każdym elemencie RDD zwracając nowy RDD. Funkcja ta może wykonywać dowolną operację na elemencie przekształcając go wedle uznania.

> Np. jeśli RDD przechowuje liczby funkcja może mnożyć każdą z nich przez określoną stałą, jeśli przechowywane są ciągi znaków możemy transformować je do małych liter.

W naszym przypadku każdy wiersz danych reprezentowany jest jako ciąg danych odseparowany znakiem `,` (czyli standardowy plik CSV). Wywołamy na nim standardową funkcję `split()` która podzieli go na części (na podstawie danego znaku) tworząc _listę_, gdzie każda komórka jest osobnym polem.

> Co chcemy uzyskać: `'a,b,c' => ['a', 'b', 'c']`

Tak przetworzone dane będą łatwiej dostępne w późniejszych obliczeniach.

In [0]:
# Zadeklarowanie transformacji `map()` na wszystkich danych
csv_data_rdd = raw_data_rdd.map(lambda row: row.split(','))

Zweryfikujmy poprawność operacji badając pierwszy element - akcja `first()` (analogiczna do `take(1)`).

In [0]:
print(csv_data_rdd.first())

Jak widać cel został osiągnięty. Wciąż nie jest to idealne rozwiązanie, ponieważ każde pole jest reprezentowane jako ciąg znaków. Docelowo pewnie chcielibyśmy rozpoznawać typy (np. liczby całkowite czy zmiennoprzecinkowe). 

Poniżej zdefiniujemy inne mapowanie. Będzie ono zwracało  "krotkę" (*tuple*) w postaci `<klucz, wartość>`. Klucz określa typ danej interakcji (normalna lub złośliwa), drugi oryginalny rekord. Badając dane wiemy, że typ interakcji zawsze występuje na tej samej pozycji (41 element).

> Reprezentacja elementów RDD w postacji `<klucz, wartość>`, jest bardzo popularna i umożliwia dostęp do szerokiej dedykowanej puli osobnych operacji.

In [0]:
def parse_interaction(line):
  elems = line.split(',') # podział wiersza na tablicę
  key = elems[41] # rodzaj interakcji, zawsze na tej pozycji
  vals = elems[:40] # reszta danych
  return (key, vals) # zwracamy dwu-elementową krotkę

csv_data_2_rdd = raw_data_rdd.map(parse_interaction)

print(csv_data_2_rdd.first())

#### Akcja `collect()`
Dotychczas używaliśmy jedynie akcji `count()`, `take()` i `first()`

> Wywołanie funkcji `collect()` sprawi pobranie **wszystkich** elementów RDD do pamięci sterownika (`driver`). **Należy używać tej funkcji ze szczególną ostrożnością**.

Jej użycie powinno być wykonane w momencie zbieranie finalnych danych statystycznych, które nie będą miały znaczącego wpływu na wydajność maszyny obsługującą sterownik.

In [0]:
# Pobranie wszystkich danych do lokalnej zmiennej
local_csv = csv_data_2_rdd.collect()

Możemy sprawdzić typ otrzymanej zmiennej i kilka przykładowych wartości

In [0]:
import sys

print('Typ zmiennej: {}'.format(type(local_csv)))
print('Ilość elementów: {}'.format(len(local_csv)))
print('Rozmiar danych: ~{} MB'.format(sys.getsizeof(local_csv) / 1048576))
print('Typ pojedynczego elementu: {}'.format(type(local_csv[0])))
print('Klucz pierwszego elementu: {}'.format(local_csv[0][0]))
print('Wartość pierwszego elementu: {}'.format(local_csv[0][1]))

### 4. Próbkowanie
Próbkowanie jest operacją zwracającą losowe elementy ze zbioru danych z określonym prawdopodobieństwem. Przydaje się szczególnie podczas szybkiej analizy dużych zbiorów danych, gdzie np. 10% wszystkich danych jest w stanie reprezentować całość populacji z dobrym przybliżeniem.

Spark udostępnia 2 metody dla próbkowania - transformację `sample()` i akcję `takeSample()`. Wywołanie akcji skutkuje oczywiście pobraniem danych do aktualnej maszyny (jak w przypadku `collect()`).

#### Transformacje `sample()`
Poniższy eksperyment polega na wybraniu 10% wszystkich elementów i sprawdzniu jaka część wierszy reprezentuje poprawną transmisję danych.

Na początku zdefiniujemy funkcję która dla danego RDD obliczy dwie wartości - liczbę wszystkich interakcji i liczbę normalnych interakcji.

In [0]:
def count_interactions(rdd):
  total_count = rdd.count()
  normal_count = rdd.filter(lambda x: 'normal' in x).count()
  return total_count, normal_count

W przypadku transformacji `sample()` należy podać 3 parametry:
1. czy próbkować ze zwracaniem elementów z powrotem do zbioru (typ `bool`),
2. jaką część oryginalnego zbioru chcemy otrzymać (typ `float`),
3. liczbę na podstawie której zainicjalizowany będzie generator liczb pseudolosowych (tzw. _seed_, typ `int`)

In [0]:
SAMPLE_PORTION = 0.1
SEED = 123

# Tworzymy nowy RDD jedynie z części danych
sample_raw_data_rdd = raw_data_rdd.sample(False, SAMPLE_PORTION, SEED)

# Wywołanie funkcji `count_interactions()`
sampled_count, sampled_normal_count = count_interactions(sample_raw_data_rdd)

print("{} / {} = {:.2f}% normalnych interakcji w zbiorze danych".format(sampled_normal_count, sampled_count, sampled_normal_count*100.0/sampled_count))

Zobaczmy jak wyglądają te wartości dla całej populacji

In [0]:
# Wywołanie funkcji `count_interactions()`
all_count, all_normal_count = count_interactions(raw_data_rdd)

print("{} / {} = {:.2f}% normalnych interakcji w zbiorze danych".format(all_normal_count, all_count, all_normal_count*100.0/all_count))

W tym przypadku wartości są bardzo zbliżone a operacje wykonywane były na 10x mniejszym zbiorze.

#### Akcja `takeSample()`
Akcja `takeSample()` pobiera wyznaczoną ilość rekordów do sterownika. Jest też często uznawana za bezpieczniejszy wariant `collect()` ponieważ możemy z góry oszacować jak dużo danych zostanie przesłanych.

Należy podać 3 parametry:
1. czy próbkować ze zwracaniem elementów z powrotem do zbioru (typ `bool`),
2. ile losowych elementów ma zostać zwróconych (typ `int`),
3. liczbę na podstawie której zainicjalizowany będzie generator liczb pseudolosowych (typ `int`)

Przykład niżej pobiera 5 losowych próbek z całego zbioru danych

In [0]:
rand_raw_data = raw_data_rdd.takeSample(False, 5, SEED)

In [0]:
for row in rand_raw_data:
  print(row)

### 5. Operacje na zbiorach
Spark umożliwia także proste operacje na zbiorach. Poniżej przedstawiono przykładowe wywołania w celu zrozumienia ogólnej idei. W celu stworzenie RDD używamy metody `parallelize()` przekazując jako argument tablicę elementów.

#### Suma - `union()`

In [0]:
rdd_1 = spark.sparkContext.parallelize(['A','B','C'])
rdd_2 = spark.sparkContext.parallelize(['G','H','I'])

spark.sparkContext.union([rdd_1, rdd_2]).collect()

#### Część wspólna - `intersect()`

In [0]:
rdd_1 = spark.sparkContext.parallelize(['A','B','C'])
rdd_2 = spark.sparkContext.parallelize(['G','B','C'])

rdd_1.intersection(rdd_2).collect()

#### Różnica - `subtract()`

In [0]:
rdd_1 = spark.sparkContext.parallelize(['A','B','C','D','E'])
rdd_2 = spark.sparkContext.parallelize(['A','B'])

rdd_1.subtract(rdd_2).collect()

#### Unikalne wartości - `distinct()`

In [0]:
rdd_1 = spark.sparkContext.parallelize(['A','B','C','A','C','B','A'])

rdd_1.distinct().collect()

#### Iloczyn kartezjański - `cartesian()`

In [0]:
rdd_1 = spark.sparkContext.parallelize(['A','B','C'])
rdd_2 = spark.sparkContext.parallelize(['1','2', '3'])

rdd_1.cartesian(rdd_2).collect()

### 6. Agregacje
Spark udostępnia 3 niskopoziomowe akcje służące agregowaniu danych numerycznych - `reduce()`, `fold()` i `aggregate()`

#### Akcje `reduce()` i `fold()`
Akcje `reduce()` i `fold()` są do siebie bardzo zbliżone. Obie pobierają jako argument funkcję agregującą dwa elementy. Akcja `fold()` pozwala dodatkowo określić wartość początkową dla skumulowanej operacji (w przypadku `reduce()` jest to domyślnie zero).

Poniższy przykład zademonstruje działanie funkcji `reduce()` w celu określenia długości trwania wszystkich normalnych i podejrzanych interakcji.

Na początek wyodrębnijmy dwa RDD - jeden zawierające normalne interakcje, drugi pozostałe.

In [0]:
# wyodrębnienie osobnych RDD na podstawie wcześniej obliczonego RDD `csv_data`
normal_data_rdd = csv_data_rdd.filter(lambda row: 'normal' in row[41])
attack_data_rdd = csv_data_rdd.filter(lambda row: 'normal' not in row[41])

Zdefiniujmy funkcję pomocniczą która:
1. wyłuska pierwszy pole z każdego elementu (to jest długość trwania interakcji) i przekształci je na typ liczbowy (rzutowanie `int()`),
2. na tak uzyskanym numerycznym RDD wykonamy akcję `reduce()` które doda kolejne elementy do siebie

In [0]:
def count_interaction_stats(rdd):
  interactions_count = rdd.count() # liczba interakcji
  total_interactions_duration = rdd.map(lambda row: int(row[0])).reduce(lambda x, y: x+y) # suma długości trwania wszystkich interakcji
  return interactions_count, total_interactions_duration

Zobaczmy czy istnieje różnica pomiędzy średnią długością interakcji pomiędzy normalnymi i podejrzanymi zdarzeniami.

In [0]:
# zlecenie rozproszonych obliczeń
normal_count, normal_total_duration = count_interaction_stats(normal_data_rdd)
attack_count, attack_total_duration = count_interaction_stats(attack_data_rdd)

# prezentacja wyników
print("Średni czas normalnej interakcji:\t{:.2f}".format(normal_total_duration/normal_count))
print("Średni czas podejrzanej interakcji:\t{:.2f}".format(attack_total_duration/attack_count))

Jak widać już wstępna i pobieżna analiza zachęca do dalszej eksploatacji danych.

#### Akcja `aggregate()`
Innym sposobem jest użycie akcji `aggregate()` (która jest wykorzystywania niskopoziomowo zarówno przez `reduce()` jak i `fold()`). Różnica polega na tym, że posiadamy jeszcze większą kontrolę nad procesem.

W przypadku funkcji `reduce()` oraz `fold()` mogliśmy decydować o tym jak łączone ze sobą były dwa elementy (ang. _combiner_).

Funkcja `aggregate()` daje dodatkowo możliwość tworzenia tzw. "_akumulatora_" przechowującego lokalnie dane. Użytkownik musi  sprecyzować:
- wartość początkową akumulatora (jak w przypadku `fold()`),
- funkcję dodającą wartości do akumulatora,
- funkcję dodającą do siebie dwa akumulatory

Zasadniczą korzyścią jest to, że **ostatecznie zwracany typ elementów nie musi być tego samego typu co elementy w początkowym RDD**.

Poniższa funkcja realizuje ten sam problem co powyżej - obliczaja średni czas trwania poszczególnych typów interakcji.

> Zwróć uwagę, że zwrócony typ danych jest dwu-elementową krotką (*tuple*) przechowującą informacje o:
1. całkowitym czasie ataku
2. ilości ataków

In [0]:
def count_interaction_stats_2(rdd):
  duration, count = rdd.aggregate(
    (0,0), # początkowa wartość akumulatora (sumaryczny czas trwania, ilość interakcji)
    (lambda acc, row: (acc[0] + int(row[0]), acc[1] + 1)),  # dodanie danych do akumulatora
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))  # dodanie dwóch akumulatorów
  )
  return duration, count

In [0]:
# przetworzenie danych
normal_total_duration, normal_count = count_interaction_stats_2(normal_data_rdd)
attack_total_duration, attack_count = count_interaction_stats_2(attack_data_rdd)

# prezentacja wyników
print("Średni czas normalnej interakcji:\t{:.2f}".format(normal_total_duration/normal_count))
print("Średni czas podejrzanej interakcji:\t{:.2f}".format(attack_total_duration/attack_count))

Jak widać wyniki są identyczne jak w przypadku funkcji `reduce()`. Co istotne wejściowy RDD przechowywał dane w formacie tekstowym, a rezultat operacji to krotka z dwoma wartościami numerycznymi. Udało nam się przenieść logikę odpowiedzialną za przekształcenie elementu, do funkcji dodającej dane do akumulatora.

### 7. Operacje klucz/wartość
Szczególnym typem operacji są operacje typu *klucz/wartość*. W Pythonie realizowane są za pomocą **dwu-elementowych krotek**. Na pierwszej pozycji znajduje się klucz rekordu, na drugim wartość (dowolnego typu). Taka reprezentacja danych umożliwia logiczny podział danych i sprawniejsze przetwarzanie (w szczególności z użyciem partycjonowania).

> Należy zwrócić uwagę na to aby dane były równomiernie rozmieszczone w kluczach (uniknięcie sytuacji kiedy jeden klucz zawiera 90% wszystkich danych).

Aby reprezentować zbiór danych w takiej formie wystarczy zmapować go do krotki. W poniższym przykładzie użyjemy RDD `csv_data_rdd`, gdzie kluczem będzie typ interakcji, a wartością cały wiersz rekordu.

In [0]:
# Dane reprezentowane w postaci krotki - <klucz, wartość> => <interakcja, oryginalny wiersz>
key_value_data_rdd = csv_data_rdd.map(lambda x: (x[41], x))
key_value_data_rdd.take(1)

Mając dane w tej postaci Spark udostępnia szerg metod specjalnie zoptymalizowanych pod ten typ danych, np. akcja `countByKey()` zlicza ilość wystąpień elementów każdego klucza.

In [0]:
# Zliczenie elementów dla każdego klucza
interaction_counts = key_value_data_rdd.countByKey()

# prezentacja wyników
for interaction, count in interaction_counts.items():
  print("{} => {}".format(interaction, count))

Inną akcją jest `reduceByKey()` gdzie możemy wykonać określoną funkcję tylko na elementach z określonym kluczem.

Poniższy przykład zlicza całkowity czas interakcji.

In [0]:
# Dane mapowane do krotki (typ interakcji, czas interakcji)
key_value_duration_rdd = csv_data_rdd.map(lambda row: (row[41], float(row[0])))

# Redukcja względem każdego klucza
durations_by_key_rdd = key_value_duration_rdd.reduceByKey(lambda x,y: x + y)

# Wyświetlenie wyników
durations_by_key_rdd.collect()

Inna funkcja agregująca - `combineByKey()` może zostać użyta do obliczenia krotki - czas trwania i ilość wystąpień dla każdego klucza. Działa ona na podobnej zasadzie co `aggregate()` z przykładu wyżej.

In [0]:
sum_counts_rdd = key_value_duration_rdd.combineByKey(
  (lambda x: (x, 1)), # wartość początkowa akumulatora (czas trwania interakcji, ilość elementów)
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
)

# Zapisanie rezultatu w postaci mapy
sum_counts_rdd.collectAsMap()

Posiadając dane w tej postaci możemy obliczyć średni czas trwania każdej interakcji.

In [0]:
duration_means_by_type = sum_counts_rdd \
  .map(lambda row: (row[0], round(row[1][0]/row[1][1], 3))) \
  .collectAsMap()

duration_means_by_type

Dla przejrzystości przesortujmy jeszcze dane

In [0]:
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
  print("{}: {}".format(tag, duration_means_by_type[tag]))

## Ćwiczenia
Efektem każdego podpunktu powinien być osobny obiekt RDD (który jest wejściem do kolejnego punktu).

1. Należy zaimportować tablicę ze zmiennej `rawText` do RDD.
2. Należy przetransformować wszystkie elementy do wielkich liter. Pomoc: [metoda `upper()`](https://docs.python.org/3/library/stdtypes.html#str.upper),  [transformacja `map()`](https://spark.apache.org/docs/latest/programming-guide.html#transformations)
3. Należy podzielić poszczególny wiersz na poszczególne słowa tworząc z nich nowy RDD. Pomoc: [metoda `split()`](https://docs.python.org/3/library/stdtypes.html#str.split), [transformacja `flatMap()`](https://spark.apache.org/docs/latest/programming-guide.html#transformations)
4. Należy odrzucić słowa zawierające mniej niż 3 znaki. Pomoc: [słowo kluczowe `len()`](https://docs.python.org/3/library/functions.html#len), [transformacja `filter()`](https://spark.apache.org/docs/latest/programming-guide.html#transformations)
5. Należy utworzyć RDD typu klucz/wartość, gdzie kluczem jest pierwsza litera, a wartością długość słowa. Pomoc: [transformacja `map()`](https://spark.apache.org/docs/latest/programming-guide.html#transformations),
6. Należy obliczyć średnią długość wyrazu rozpoczynającego się daną literą. Pomoc: [transformacja `combineByKey()`](https://spark.apache.org/docs/latest/programming-guide.html#transformations)

In [0]:
raw_text = [
  'All transformations in Spark are lazy in that they do not compute their results right away',
  'Spark can create distributed datasets from any storage source supported by Hadoop',
  'At a high level, every Spark application consists of a driver program that runs the users main function and executes various parallel operations on a cluster',
  'The Shuffle is an expensive operation since it involves disk IO data serialization and network IO'
]

In [0]:
# rdd_1 = ...