# Wprowadzenie do Apache Spark

**Apache Spark** to og√≥lnego przeznaczenia, przetwarzajƒÖcy dane w pamiƒôci silnik obliczeniowy. Spark mo≈ºe byƒá u≈ºywany wraz z Hadoopem, Yarnem i innymi komponentami Big Data, aby w pe≈Çni wykorzystaƒá jego mo≈ºliwo≈õci oraz poprawiƒá wydajno≈õƒá aplikacji. Oferuje wysokopoziomowe interfejsy API w jƒôzykach Scala, Java, Python, R i SQL.

## Architektura Spark

Apache Spark dzia≈Ça w architekturze mistrz-podw≈Çadny (master-slave), gdzie g≈Ç√≥wny wƒôze≈Ç nazywany jest ‚ÄûDriver‚Äù, a wƒôz≈Çy podrzƒôdne to ‚ÄûWorkers‚Äù. Punktem startowym aplikacji Spark jest `sc`, czyli instancja klasy SparkContext, kt√≥ra dzia≈Ça wewnƒÖtrz Drivera.

![Architektura Spark 1](https://miro.medium.com/v2/resize:fit:1192/0*XzNeTtwEgIy5yWR_)

![Architektura Spark 2](https://miro.medium.com/v2/resize:fit:1266/0*-PltnPR9row8iUDo)

## G≈Ç√≥wne komponenty Apache Spark

### **Spark Core**

Czƒôsto nazywany r√≥wnie≈º samym ‚ÄûSpark‚Äù. Podstawowym elementem Sparka jest **RDD (Resilient Distributed Dataset)** ‚Äî odporna na b≈Çƒôdy, rozproszona kolekcja danych, przetwarzana r√≥wnolegle na wielu wƒôz≈Çach klastra.

### **Spark SQL**

W tej bibliotece dane reprezentowane sƒÖ jako **DataFrame**, czyli struktura danych podobna do tabeli relacyjnej. Spark SQL umo≈ºliwia analizƒô danych z u≈ºyciem sk≈Çadni podobnej do SQL oraz funkcji przetwarzajƒÖcych dane.

### **Spark Streaming**

Biblioteka ta wprowadza pojƒôcie **D-Stream (Discretized Stream)** ‚Äî strumienia danych dzielonego na ma≈Çe porcje (mikropartie), kt√≥re mo≈ºna przetwarzaƒá niemal w czasie rzeczywistym.

### **MLlib**

Biblioteka do uczenia maszynowego, zawierajƒÖca popularne algorytmy, takie jak filtrowanie kolaboracyjne, klasyfikacja, klasteryzacja czy regresja.

### **GraphX**

Biblioteka s≈Çu≈ºƒÖca do przetwarzania graf√≥w. Umo≈ºliwia rozwiƒÖzywanie problem√≥w z zakresu teorii graf√≥w, takich jak PageRank, komponenty sp√≥jne i inne.


## Uruchomienie Apache Spark
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lab6").getOrCreate()
```

```python
spark

SparkSession - in-memory

SparkContext

...
```

### Przyk≈Çad 1 - dane ralizujƒÖce szereg czasowy

```python
czujnik_temperatury = ((12.5, "2019-01-02 12:00:00"),
(17.6, "2019-01-02 12:00:20"),
(14.6,  "2019-01-02 12:00:30"),
(22.9,  "2019-01-02 12:01:15"),
(17.4,  "2019-01-02 12:01:30"),
(25.8,  "2019-01-02 12:03:25"),
(27.1,  "2019-01-02 12:02:40"),
)
```
Dane realizujƒÖce pomiar temperatury w czasie.

Aby wygenerowaƒá DataFrame nale≈ºy u≈ºyƒá metod `createDataFrame`. 
Jednak nale≈ºy pamiƒôtaƒá aby zdefiniowaƒá typy danych. 

W nastƒôpnych laboratoriach szerzej opiszemy typy danych w Sparku.

Zdefiuniujmy schemat naszych danych.
```python
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("temperatura", DoubleType(), True),
    StructField("czas", StringType(), True),
])
```
Jak widaƒá praktycznie wszystkie elementy poza nazwƒÖ kolumny oraz parametrem True sƒÖ przedstawione jako obiekty. 

```python
df = (spark.createDataFrame(czujnik_temperatury, schema=schema)
      .withColumn("czas", to_timestamp("czas")))
```

Sprawd≈∫my jak wyglƒÖda schemat utworzonej tabeli.
```python
df.printSchema()

root
 |-- temperatura: double (nullable = true)
 |-- czas: timestamp (nullable = true)


``` 

Nastƒôpnie mo≈ºemy sprawdziƒá jak przedstawia siƒô sama tabela.
```python
df.show()
+-----------+-------------------+
|temperatura|               czas|
+-----------+-------------------+
|       12.5|2019-01-02 12:00:00|
|       17.6|2019-01-02 12:00:20|
|       14.6|2019-01-02 12:00:30|
|       22.9|2019-01-02 12:01:15|
|       17.4|2019-01-02 12:01:30|
|       25.8|2019-01-02 12:03:25|
|       27.1|2019-01-02 12:02:40|
+-----------+-------------------+
```

### Spark jako SQL

Ramki danych w sparku pozwalajƒÖ wykorzystaƒá jƒôzyk sql:
```python
df.createOrReplaceTempView("czujnik_temperatury")
spark.sql("SELECT * FROM czujnik_temperatury where temperatura > 21").show()
+-------------------+-----------+
|               czas|temperatura|
+-------------------+-----------+
|2019-01-02 12:01:15|       22.9|
|2019-01-02 12:03:25|       25.8|
|2019-01-02 12:02:40|       27.1|
+-------------------+-----------+
```

### Grupowanie danych

Standardowy grupowanie danych w sparku po zmiennej "czas" wygeneruje nam liczbƒô wierszy w ka≈ºdym grupie. Ze wzglƒôdu, i≈º zmienne czasowe majƒÖ r√≥≈ºne warto≈õci,  ilo≈õƒá otrzymanych grup bƒôdzie r√≥wna ilo≈õci wierszy w tabeli.
 
```python
df2 = df.groupBy("czas").count()
df2.show()
```

WykorzystujƒÖc funkcjƒô `window` mo≈ºemy wygenerowaƒá grupy czasowe w zale≈ºno≈õci od wybranego okna czasowego. 

```python
# Thumbling window

import pyspark.sql.functions as F

df2 = df.groupBy(F.window("czas","30 seconds")).count()
df2.show(truncate=False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2019-01-02 12:00:00, 2019-01-02 12:00:30}|2    |
|{2019-01-02 12:00:30, 2019-01-02 12:01:00}|1    |
|{2019-01-02 12:01:00, 2019-01-02 12:01:30}|1    |
|{2019-01-02 12:01:30, 2019-01-02 12:02:00}|1    |
|{2019-01-02 12:03:00, 2019-01-02 12:03:30}|1    |
|{2019-01-02 12:02:30, 2019-01-02 12:03:00}|1    |
+------------------------------------------+-----+
```
Sprawd≈∫my schemat
```python
df2.printSchema()
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)
``` 
Podstawowa r√≥≈ºnica miƒôdzy pandasowymi ramkami danych i sparkowymi jest taka, ≈ºe w kom√≥rce danych sparkowych mo≈ºna u≈ºywaƒá typ√≥w z≈Ço≈ºonych - np `struct`.

# üîå ≈πr√≥d≈Ça danych w Spark Structured Streaming

Spark Structured Streaming pozwala na przetwarzanie danych w czasie rzeczywistym z r√≥≈ºnych ≈∫r√≥de≈Ç strumieniowych. Najpopularniejsze z nich to:

### ‚úÖ rate ‚Äî ≈∫r√≥d≈Ço testowe

-	Automatycznie generuje dane: co sekundƒô dodaje wiersz.
-	Ka≈ºdy wiersz zawiera:
-	timestamp ‚Äì znacznik czasu,
-	value ‚Äì licznik rosnƒÖcy (0, 1, 2, ‚Ä¶).
-	U≈ºywane do testowania logiki strumieniowania bez konieczno≈õci podpinania zewnƒôtrznych ≈∫r√≥de≈Ç.

```python
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
```
### üì° Inne ≈∫r√≥d≈Ça strumieni:

`Socket` (do test√≥w np. nc -lk 9999): To ≈∫r√≥d≈Ço nas≈Çuchuje na wskazanym porcie gniazda (socket) i wczytuje dowolne dane do Spark Streaming. R√≥wnie≈º s≈Çu≈ºy wy≈ÇƒÖcznie do cel√≥w testowych.

`Plik` (File): Nas≈Çuchuje okre≈õlonego katalogu i traktuje pojawiajƒÖce siƒô tam pliki jako dane strumieniowe. Obs≈Çuguje formaty takie jak CSV, JSON, ORC oraz Parquet (np. .csv, .json, .parquet).

`Kafka`: Odczytuje dane z Apache Kafka¬Æ i jest kompatybilne z brokerami w wersji 0.10.0 lub wy≈ºszej.


# üì§ Output Modes ‚Äì tryby wypisywania wynik√≥w

outputMode okre≈õla jak Spark wypisuje dane po ka≈ºdej mikroserii (micro-batch). Dostƒôpne tryby to:

`append` Wypisuje tylko nowe wiersze, kt√≥re zosta≈Çy dodane w tej mikroserii. Najczƒô≈õciej u≈ºywany.

`update` Wypisuje zmienione wiersze - czyli zaktualizowane agregaty.

`complete` Wypisuje ca≈ÇƒÖ tabelƒô agregacji po ka≈ºdej mikroserii. Wymaga pe≈Çnej agregacji (np. groupBy).





Utw√≥rzmy nasz pierwszy strumieniowy DataFrame w Sparku, korzystajƒÖc ze ≈∫r√≥d≈Ça danych typu `rate`. 

Mo≈ºemy zrealizowaƒá kod skryptu
```python
%%file streamrate.py
## uruchom przez spark-submit streamrate.py

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)


query = (df.writeStream 
    .format("console") 
    .outputMode("append") 
    .option("truncate", False) 
    .start()
) 

query.awaitTermination()
```

Albo uruchomiƒá kod w notatniku 
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id == tstop:
        df.stop()


df = (spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .load()
)

query = (df.writeStream 
    .format("console") 
    .outputMode("append")
    .foreachBatch(process_batch)
    .option("truncate", False) 
    .start()
)
```