# Wprowadzenie do Spark RDD

## RDD = Resilient Distributed Datasets
Rozproszone obiekty RDD są reprezentacją rozproszonego zbioru danych.

Na obiektach RDD dozwolone są dwa typy operacji:

1. Transformacje: operacje, które tworzą nowe zbiory danych z istniejących RDD. 

Przykłady to `map`, `filter`, `flatMap`, `groupByKey`, `reduceByKey`, `sample`, `union`, `intersection`, `distinct`, `coalesce`, `repartition`.


2. Akcje: operacje, które zwracają wartość do sterownika programu po przetworzeniu danych.

Przykłady to `reduce`, `collect`, `count`, `first`, `take`, `takeSample`, `takeOrdered`, `saveAsTextFile`, `countByKey`, `foreach`.

W tym zeszycie skupimy się na podstawowych transformacjach i akcjach na obiektach RDD.


## Inicjowanie Sparka
Pracę ze Sparkiem zaczynamy od zainicjowania sesji Sparka.
Aby zainicjować Sparka, musimy zaimportować pakiet `findspark` i uruchomić metodę `init()`:

In [1]:
import findspark
findspark.init() 

Następnie tworzymy obiekt sesji Sparka. Zwróć uwagę na ustawienie nazwy aplikacji Sparka: 

In [3]:
from pyspark.sql import SparkSession

In [4]:
# obiekt sesji zwykle ma nazwę "spark"
spark = SparkSession.builder.appName("DataScience").getOrCreate() 
spark

24/04/27 11:27:29 WARN Utils: Your hostname, Pablos-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.94 instead (on interface en0)
24/04/27 11:27:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/27 11:27:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Mając obiekt sesji możemy wyciągnąć z niego tzw. kontekst Sparka, który pozwala na bezpośrednią pracę z kolekcjami obiektów RDD oraz podstawowymi usługami Sparka.

In [7]:
sc = spark.sparkContext
sc

## Tworzenie obiektów RDD

W pierwszym ćwiczeniu spróbujemy stworzyć obiekty RDD z różnych kolekcji danych.

W tym celu wykonamy następujące kroki:
- utworzymy obiekt RDD z listy liczb
- utworzymy obiekt RDD z pliku tekstowego

### Utworzenie obiektu RDD z listy liczb

In [5]:
rdd = sc.parallelize([1, 2, 3, 4, 5])       # parallelize() jest tranformacją, która tworzy obiekt RDD z listy
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

In [6]:
rdd.collect()   # collect() jest akcją, która zwraca wartość obiektu RDD do sterownika programu

[1, 2, 3, 4, 5]

Transformacja `parallelize` przyjmuje nie tylko proste listy, ale również bardziej złożone struktury danych, takie jak listy zagnieżdżone, słowniki, itp.

In [7]:
rdd = sc.parallelize([1, 2, [3, 4], 5, {"a": 1, "b": 2}])
rdd.collect()

[1, 2, [3, 4], 5, {'a': 1, 'b': 2}]

Może przyjmować również inne kolekcje danych, np. generator:

In [8]:
rdd = sc.parallelize(range(10))
rdd.collect()

                                                                                

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

### Utworzenie obiektu RDD z plików

RDD może czytać dane z plików w formatach binarnych oraz tekstowych. Każdy odczyt jest transformacją.

Poniżej użyliśmy textFile() do odczytania tekstu książki:

In [23]:
book_rdd = sc.textFile("../../data/books/ulysses.txt") # textFile() jest tranformacją tworzącą obiekt RDD z pliku tekstowego
book_rdd

../../data/books/ulysses.txt MapPartitionsRDD[34] at textFile at NativeMethodAccessorImpl.java:0


RDD nie posiada wbudowanych funkcji do czytania plików CSV czy JSON. Obróbkę można zrobić samemu lub skorzystać z zewnętrznych bibliotek. W praktyce korzysta się z modułu DataFrame. 

In [24]:
stock_rdd = sc.textFile("../../data/stock/stock.csv")  # przeczyta plik CSV jak zwykły plik tekstowy
stock_rdd

../../data/stock/stock.csv MapPartitionsRDD[36] at textFile at NativeMethodAccessorImpl.java:0

## Podstawowe akcje na obiektach RDD
Akcja to operacja, która zwraca wartość do sterownika programu po przetworzeniu danych.

In [25]:
rdd.collect()   # collect() jest akcją, która zwraca całą zawartość kolekcji RDD

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [44]:
rdd.count()     # count() jest akcją, która zwraca ilość elementów obiektu RDD

10

In [27]:
rdd.first()   # first() jest akcją zwracającą pierwszy element obiektu RDD

0

In [28]:
rdd.take(5)     # take() jest akcją, która zwraca pierwsze N elementów obiektu RDD

[0, 1, 2, 3, 4]

In [29]:
rdd.takeOrdered(5)      # takeOrdered() jest akcją, która zwraca N najmniejszych elementów obiektu RDD

[0, 1, 2, 3, 4]

In [30]:
rdd.top(5)      # top() jest akcją, która zwraca N największych elementów obiektu RDD

[9, 8, 7, 6, 5]

In [31]:
# to samo co takeOrdered(5) można uzyskać za pomocą dodatkowego argumentu w metodzie top()
rdd.top(5, key=lambda x: -x)      # top() jest akcją, która zwraca N najmniejszych elementów obiektu RDD

[0, 1, 2, 3, 4]

In [32]:
rdd.takeSample(False, 5)                    # zwraca 5 losowych elementów obiektu RDD

[6, 3, 5, 8, 0]

In [9]:
result = rdd.collect()  # collect() jest akcją, która zwraca wartość obiektu RDD do sterownika programu
result[:5]             # wyświetlenie pierwszych 5 elementów obiektu RDD - za pomocą wbudowanej operacji języka Python

[0, 1, 2, 3, 4]

## Akcje agregujące

Akcje agregujące to operacje, które łączą elementy obiektu RDD za pomocą funkcji.
Przykłady to `reduce`, `fold`, `aggregate`, `countByKey`, `countByValue`, `sum`, `mean`, `max`, `min`, `stdev`, `variance`.

In [16]:
rdd = sc.parallelize([1, 2, 3, 4, 5])

### reduce()
reduce() ma za zadanie zredukować elementy obiektu RDD do jednego elementu za pomocą funkcji.

Przykładem może być suma lub maksimum elementów w RDD. Kluczową rzeczą jest to, że funkcja przekazywana do reduce musi być przemienna (wynik operacji nie zależy od kolejności argumentów) oraz łączna (wynik jest taki sam niezależnie od grupowania operacji).


In [22]:
rdd.reduce(lambda x, y: x + y)

15

In [26]:
rdd.reduce(lambda x, y: x * y)

120

### fold()

fold() działa podobnie do reduce(), ale wymaga wartości początkowej.

 Wartość początkowa jest używana jako początkowy wynik oraz jako argument dla operacji w przypadku pustych partycji RDD. To sprawia, że fold jest bardziej ogólna niż reduce. 
 Wartość początkowa oraz funkcja użyta w fold również muszą być przemienne i łączne, aby zagwarantować poprawność wyniku w rozproszonym środowisku obliczeniowym.
 

In [23]:
rdd.fold(0, lambda x, y: x + y)

15

In [25]:
rdd.fold(1, lambda x, y: x * y)

120

### aggregate()

aggregate() jest bardziej ogólna niż reduce() i fold(). Pozwala na zdefiniowanie dwóch funkcji: funkcji agregującej oraz funkcji łączącej.

Funkcja agregująca jest wywoływana na każdej partycji obiektu RDD i zwraca wartość typu U. Funkcja łącząca łączy wyniki funkcji agregującej z różnych partycji.



In [27]:
rdd.aggregate((0, 0), 
              (lambda acc, value: (acc[0] + value, acc[1] + 1)), 
              (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))


(15, 5)

In [28]:
rdd.aggregate((1, 1), 
              (lambda acc, value: (acc[0] * value, acc[1] + 1)), 
              (lambda acc1, acc2: (acc1[0] * acc2[0], acc1[1] + acc2[1]))
             )

(120, 16)

### countByKey()

countByKey() zlicza liczbę wystąpień każdego klucza w obiekcie RDD, który jest zbiorem par klucz-wartość.


In [31]:
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
rdd.countByKey()

defaultdict(int, {'a': 2, 'b': 2, 'c': 1})

### countByValue()

countByValue() zlicza liczbę wystąpień każdej wartości w obiekcie RDD.

In [33]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 1, 2, 3, 4, 5])
rdd.countByValue()

defaultdict(int, {1: 2, 2: 2, 3: 2, 4: 2, 5: 2})

### sum()

sum() zwraca sumę elementów obiektu RDD.


In [35]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.sum()

15

### mean(), max(), min(), stdev(), variance()

mean(), max(), min(), stdev(), variance() zwracają odpowiednio średnią, maksimum, minimum, odchylenie standardowe oraz wariancję elementów obiektu RDD.

In [38]:
rdd.mean()

3.0

In [40]:
rdd.max()

5

In [41]:
rdd.min()

1

## Transformacje na obiektach RDD

Transformacje to operacje, które tworzą nowe zbiory danych z istniejących RDD.

### map()

map() jest najbardziej podstawową transformacją. Przyjmuje funkcję, która jest stosowana do każdego elementu obiektu RDD i zwraca nowy obiekt RDD.


In [42]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.map(lambda x: x * 2).collect()

[2, 4, 6, 8, 10]

In [43]:
rdd.map(lambda x: x ** 2).collect()

[1, 4, 9, 16, 25]

### flatMap()

flatMap() działa podobnie do map(), ale zwraca listę wyników dla każdego elementu obiektu RDD.


In [44]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.flatMap(lambda x: [x, x * 2]).collect()

[1, 2, 2, 4, 3, 6, 4, 8, 5, 10]

### filter()

filter() zwraca nowy obiekt RDD, który zawiera tylko elementy spełniające warunek podany w funkcji.


In [45]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()

[2, 4]

### groupByKey()

groupByKey() grupuje elementy obiektu RDD według klucza.

In [48]:
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
result = rdd.groupByKey().collect()
for key, value in result:
    print(key, list(value))

b [2, 4]
c [5]
a [1, 3]


In [49]:
rdd.groupByKey().mapValues(list).collect()

[('b', [2, 4]), ('c', [5]), ('a', [1, 3])]

In [50]:
rdd.groupByKey().mapValues(lambda x: sum(x)).collect()

[('b', 6), ('c', 5), ('a', 4)]

In [51]:
rdd.groupByKey().mapValues(lambda x: sum(x) / len(x)).collect()

[('b', 3.0), ('c', 5.0), ('a', 2.0)]

### reduceByKey()

reduceByKey() działa podobnie do groupByKey(), ale zamiast grupować elementy, redukuje je za pomocą funkcji przekazanej jako argument.


In [52]:
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
rdd.reduceByKey(lambda x, y: x + y).collect()

[('b', 6), ('c', 5), ('a', 4)]

In [53]:
rdd.reduceByKey(lambda x, y: x * y).collect()

[('b', 8), ('c', 5), ('a', 3)]

### sortByKey()

sortByKey() sortuje elementy obiektu RDD według klucza.


In [54]:
rdd = sc.parallelize([("b", 2), ("a", 1), ("c", 3)])
rdd.sortByKey().collect()

[('a', 1), ('b', 2), ('c', 3)]

### join()

join() łączy dwa obiekty RDD na podstawie klucza.


In [55]:
rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 4), ("b", 5), ("c", 6)])
rdd1.join(rdd2).collect()


[('b', (2, 5)), ('c', (3, 6)), ('a', (1, 4))]

### union()

union() łączy dwa obiekty RDD w jeden obiekt RDD.

In [56]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd1.union(rdd2).collect()

[1, 2, 3, 4, 5, 6]

### intersection()

intersection() zwraca obiekt RDD, który zawiera elementy wspólne dwóch obiektów RDD.

In [61]:
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.intersection(rdd2).collect()

[4, 5]

### distinct()

distinct() zwraca obiekt RDD bez duplikatów.


In [57]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 1, 2, 3, 4, 5])
rdd.distinct().collect()

[1, 2, 3, 4, 5]

## Partycje

Partycje to podstawowa jednostka obliczeniowa w Sparku. Każda partycja to fragment danych, który jest przetwarzany przez pojedynczy wątek.
Transformacje na obiektach RDD są wykonywane na partycjach, a nie na pojedynczych elementach.

In [58]:
rdd.getNumPartitions()   # zwraca ilość partycji obiektu RDD

10

Domyślnie, Spark tworzy tyle partycji, ile jest rdzeni w klastrze. Możemy zmienić liczbę partycji za pomocą metody `repartition()` lub `coalesce()`.
Różnica między nimi polega na tym, że `repartition()` zawsze tworzy nowe partycje, podczas gdy `coalesce()` może łączyć istniejące partycje.

In [59]:
rdd = rdd.repartition(4)   # repartition() jest transformacją, która zmienia liczbę partycji obiektu RDD
rdd.getNumPartitions()

4

In [60]:
rdd = rdd.coalesce(2)     # coalesce() jest transformacją, która zmienia liczbę partycji obiektu RDD
rdd.getNumPartitions()

2

## Zapisywanie obiektów RDD

Obiekty RDD można zapisać do plików w formatach tekstowym oraz binarnym.
Tworzony jest katalog, w którym zapisywane są pliki z danymi.


In [62]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.saveAsTextFile("output")   # zapisuje obiekt RDD do pliku tekstowego

In [64]:
rdd.saveAsPickleFile("output2")   # zapisuje obiekt RDD do pliku binarnego