> Instrukcja wykonania projektu
# Projekt Apache Spark

> Instrukcja wykonania projektu
# Wprowadzenie

Wykorzystując ten notatnik jako szablon zrealizuj projekt *Apache Spark* zgodnie z przydzielonym zestawem. 

Kilka uwag:

* W szablonie nie wolno zmieniać paragrafów *markdown*, zawierających *instrukcje wykonania projektu*. <br>
  Każdy taki paragraf jest oznaczony za pomocą `> Instrukcja wykonania projektu`
* Istniejące paragrafy zawierające *kod* uzupełnij w razie potrzeby zgodnie z instrukcjami
    - nie usuwaj ich
    - modyfikuj je tylko w zakresie zgodnym z instrukcjami
* Możesz dodawać nowe paragrafy zarówno zawierające kod jak i komentarze dotyczące tego kodu (markdown)
* Nie możesz zmieniać kolejności paragrafów zawierających instrukcje. Notatnik ma mieć niezmienioną strukturę.
* Utworzony notatnik musi być "samowystarczalny" w kontekście wykorzystywanej platformy przy założeniu dostepu do sieci.<br>
  Nie może wykorzystywać komponentów, które: 
    -  nie są domyślnie dostepne w ramach platformy i/lub
    -  nie są samodzielnie pobierane przez ten notatnik z oficjalnych repozytoriów (*Maven").  

> Instrukcja wykonania projektu

Poniżej w paragrafie markdown umieść numer oraz pełny tytuł przydzielonego zestawu

# Zestaw 15 - food-orders-restaurants

> Instrukcja wykonania projektu

W ponizszym paragrafie określ wartość zmiennej `input_dir` wskazującej miejsce, w którym znajdują się Twoje dane źródłowe (katalogi `datasource1` oraz `datasource4`).

Ze zmiennej tej należy korzystać we wszystkich miejscach, w których odwołujemy się do danych źródłowych. Pełni ona rolę "parametru" naszego notatnika. 

***Pamiętaj*** aby przed rejestracją rozwiązania usunąć zawartość tej zmiennej. Osoba korzystająca z tego notatnika (np podczas oceny) samodzielnie ustawi sobie jej wartość na odpowiednią dla swojej konfiguracji.

In [None]:
input_dir = ""

> Instrukcja wykonania projektu

# Konteksty
W poniższych paragrafach utwórz odpowiednie konteksty. Nasza aplikacja *Apache Spark* ma działać na klastrze z wykorzystaniem platformy *Hadoop (YARN)*. 

## Dodatkowe działania 

W zależności od potrzeb, wykorzystaj poniższe paragrafy do wykonania dodatkowych działań niezbędnych do utworzenia kontekstu.

> Instrukcja wykonania projektu

## Kontekst dla *DataFrame API*

Wykorzystaj poniższy paragraf do utworzenia obiektu kontekstu dla *DataFrame API*. 

Nazwij go `spark`

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Projekt2-Zestaw15-FoodOrders") \
    .master("yarn") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

> Instrukcja wykonania projektu

## Kontekst dla *RDD API*

Wykorzystaj poniższy paragraf do utworzenia obiektu kontekstu dla *RDD API*. 

Nazwij go `sc`. 

Utwórz go z powyżej utworzonego kontekstu. Dzięki temu całość notatnika będzie działała w ramach jednej aplikacji *Apache Spark*

In [None]:
sc = spark.sparkContext

> Instrukcja wykonania projektu

# Część 1 - Spark Core (*RDD API*)

Pamiętaj o czystości Twojego API. Musi być ona zachowana od samego początku implementacji (utworzenia kontekstu), poprzez definicje źródeł i transformacje, aż do samego końca czyli wygenerowanie ostatecznego wyniku. 

Wykorzystaj odpowiedni kontekst utworzony w początkowej części notatnika.

> Instrukcja wykonania projektu

## Dane źródłowe

W poniższych paragrafach utwórz dwa obiekty RDD, których zawartością będą Twoje dane źródłowe. 

Użyj nazw czytelnie odnoszących się zawartości Twoich danych źródłowych. 

In [None]:
orders_raw_rdd = sc.textFile(f"{input_dir}/datasource1/*.csv")

header = "order_id,restaurant_id,order_date,items_count,total_price_usd,payment_type,status"
orders_rdd = orders_raw_rdd.filter(lambda line: line != header and len(line.strip()) > 0)

In [None]:
restaurants_raw_rdd = sc.textFile(f"{input_dir}/datasource4/restaurants.csv")

header_restaurants = "restaurant_id,name,city,country,cuisine"
restaurants_rdd = restaurants_raw_rdd.filter(lambda line: line != header_restaurants and len(line.strip()) > 0)

> Instrukcja wykonania projektu

## Przetwarzanie - etap 1 - "MapReduce" 

### Przetwarzanie

Twoim zadaniem jest utworzenie obiektu RDD o nazwie `mapreduce_RDD`, który w oparciu o: 

- utworzony kontekst, oraz
- pierwszy ze źródłowych RDD (`datasource1`)

będzie zawierał wynik zgodny z oczekiwaniami treści sekcji *Program MapReduce* dla Twojego zestawu. 

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Etap 1: MapReduce - statystyki zamówień dla restauracji i typów płatności

# Parsowanie, wczesna selekcja i projekcja
def parse_and_filter_orders(line):

    fields = line.split(',')
    
    if len(fields) < 7:
        return None
    
    restaurant_id = fields[1].strip()
    total_price = fields[4].strip()
    payment_type = fields[5].strip()
    status = fields[6].strip()
    
    # Wczesna selekcja
    if status != "Completed":
        return None
    
    try:
        # Wczesna projekcja
        return (restaurant_id, payment_type, float(total_price))
    except ValueError:
        return None

# Stosujemy parsowanie i filtrowanie
parsed_orders = orders_rdd \
    .map(parse_and_filter_orders) \
    .filter(lambda x: x is not None)

In [None]:
# Map
# Klucz: (restaurant_id, payment_type) Wartość: (total_price, count)

mapped_orders = parsed_orders.map(
    lambda x: ((x[0], x[1]), (x[2], 1))
)

# Agregacja wielopoziomowa z combinerem
# Combiner: sumujemy na poziomie partycji
def sum_combiner(a, b):
    """Sumuje pary (sum_price, count)"""
    return (a[0] + b[0], a[1] + b[1])

# Reduce: agregacja z combinerem
aggregated_orders = mapped_orders.reduceByKey(sum_combiner)

mapreduce_RDD = aggregated_orders.map(
    lambda x: (
        x[0][0],
        x[0][1],
        x[1][1],
        round(x[1][0] / x[1][1], 6)
    )
)

> Instrukcja wykonania projektu

### Wynik

Pobierz liczbę elementów z `mapreduce_RDD`. 

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
mapreduce_count = mapreduce_RDD.count()
print(f"Liczba grup: {mapreduce_count}")

> Instrukcja wykonania projektu

Posortuj rosnąco elementy `mapreduce_RDD` względem wszystkich wymaganych wynikowych atrybutów (w kolejności określonej w treści sekcji *Program MapReduce* dla Twojego zestawu). 

A następnie pobierz i wyświetl w oddzielnych liniach 10 pierwszych elementów. 

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
# Sortujemy według wszystkich atrybutów: restaurant_id, payment_type
sorted_mapreduce = mapreduce_RDD.sortBy(lambda x: (x[0], x[1]))

top_10 = sorted_mapreduce.take(10)

for item in top_10:
    print(f"{item[0]}\t{item[1]}\t{item[2]}\t{item[3]}")

> Instrukcja wykonania projektu

## Przetwarzanie - etap 2 - "Hive" 

### Przetwarzanie

Twoim zadaniem jest utworzenie obiektu RDD o nazwie `hive_RDD`, który w oparciu o: 

- utworzony powyżej `mapreduce_RDD` 
- drugi ze źródłowych RDD (`datasource4`)

będzie zawierał wynik zgodny z oczekiwaniami treści z sekcji *Program Hive* dla Twojego zestawu. 

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Etap 2: Hive - łączenie danych i agregacja na poziomie krajów i rodzajów kuchni

# Parsowanie danych restaurants z wczesną projekcją
def parse_restaurants(line):

    fields = line.split(',')
    
    if len(fields) < 5:
        return None
    
    restaurant_id = fields[0].strip().strip('"')
    country = fields[3].strip().strip('"')
    cuisine = fields[4].strip().strip('"')
    
    # Wczesna projekcja
    return (restaurant_id, (country, cuisine))

# Stosujemy parsowanie
parsed_restaurants = restaurants_rdd \
    .map(parse_restaurants) \
    .filter(lambda x: x is not None)

In [None]:
# Klucz: restaurant_id, Wartość: (payment_type, orders_count, avg_total_price)
mapreduce_for_join = mapreduce_RDD.map(
    lambda x: (x[0], (x[1], x[2], x[3]))
)

In [None]:
# Join: łączymy dane zamówień z informacjami o restauracjach

joined_data = mapreduce_for_join.join(parsed_restaurants)

country_cuisine_data = joined_data.map(
    lambda x: (
        (x[1][1][0], x[1][1][1]),  # Klucz: (country, cuisine)
        (x[1][0][1], x[1][0][1] * x[1][0][2])  # Wartość: (orders_count, total_price_sum)
    )
)

# Agregacja na poziomie (country, cuisine)
def sum_aggregator(a, b):
    """Sumuje (orders_count, total_price_sum)"""
    return (a[0] + b[0], a[1] + b[1])

aggregated_by_country_cuisine = country_cuisine_data.reduceByKey(sum_aggregator)

# Obliczenie średniej i przygotowanie do rankingu
country_cuisine_stats = aggregated_by_country_cuisine.map(
    lambda x: (
        x[0][0],  # country
        (x[0][1], x[1][0], round(x[1][1] / x[1][0], 6))  # (cuisine, total_orders, avg_total_price)
    )
)

# Grupowanie po kraju dla obliczenia rankingu
grouped_by_country = country_cuisine_stats.groupByKey()

# Obliczanie rankingu w ramach każdego kraju
def add_ranking(country_data):
    """
    Dodaje ranking dla kuchni w ramach kraju
    country_data: (country, [(cuisine, total_orders, avg_total_price), ...])
    """
    country = country_data[0]
    cuisines = list(country_data[1])
    
    # Sortujemy według total_orders malejąco
    sorted_cuisines = sorted(cuisines, key=lambda x: x[1], reverse=True)
    
    # Dodajemy ranking
    result = []
    for rank, cuisine_data in enumerate(sorted_cuisines, start=1):
        result.append((
            country,
            cuisine_data[0],
            cuisine_data[1],
            cuisine_data[2],
            rank
        ))
    
    return result

# Stosujemy ranking
hive_RDD = grouped_by_country.flatMap(add_ranking)

> Instrukcja wykonania projektu

### Zapis wyniku 

Mając obiekt RDD o nazwie `hive_RDD` dokonaj jego zapisu w docelowym miejscu określonym w opisie projektu dla tej części projektu.

*Uwaga!* uwzględnij fakt, że docelowe miejsce może być "zajęte". 
Przed utworzeniem nowego wyniku usuń ewentualne efekty poprzednich uruchomień notatnika.  
Zwróć uwagę, że efekty te mogą istnieć na wielu poziomach.

#### Usunięcie poprzednich wyników

Wykonaj operacje usuwające ewentualne poprzednio utworzone wyniki.

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Usuwamy poprzednie wyniki (jeśli istnieją)
import subprocess

try:
    subprocess.run(
        ["hadoop", "fs", "-rm", "-r", "/tmp/output1"],
        stderr=subprocess.DEVNULL,
        check=False
    )
    print("Poprzednie wyniki usunięte")
except:
    print("Brak poprzednich wyników do usunięcia")

> Instrukcja wykonania projektu

#### Zapis nowe wyniki

Dokonaj zapisu nowego wyniku.

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Zapisujemy wynik w formacie Pickle
hive_RDD.saveAsPickleFile("/tmp/output1")
print("Wyniki zapisane w /tmp/output1")

> Instrukcja wykonania projektu

### Wynik 

Odczytaj do zmiennej `result_RDD` dane z docelowego miejsca, w którym została zapisana zawartość `hive_RDD`.

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Wczytujemy wynik z /tmp/output1 (format Pickle)
result_RDD = sc.pickleFile("/tmp/output1")

> Instrukcja wykonania projektu

Pobierz liczbę elementów z `result_RDD`. 

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
# Liczba wyników
result_count = result_RDD.count()
print(f"Liczba wyników: {result_count}")

> Instrukcja wykonania projektu

Posortuj rosnąco elementy `result_RDD` względem wszystkich wymaganych wynikowych atrybutów (w kolejności określonej w treści sekcji *Program Hive* dla Twojego zestawu). 

A następnie pobierz i wyświetl w oddzielnych liniach 10 pierwszych elementów. 

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
# Sortujemy według wszystkich atrybutów
sorted_result = result_RDD.sortBy(lambda x: (x[0], x[1], x[2], x[3], x[4]))

top_10_results = sorted_result.take(10)

for item in top_10_results:
    print(f"{item[0]}\t{item[1]}\t{item[2]}\t{item[3]}\t{item[4]}")

> Instrukcja wykonania projektu

# Część 2 - Spark SQL (*DataFrame API*)

Pamiętaj o czystości Twojego API. Musi być ona zachowana od samego początku implementacji (utworzenia kontekstu), poprzez definicje źródeł i transformacje, aż do samego końca czyli wygenerowanie ostatecznego wyniku. 

Wykorzystaj odpowiedni kontekst utworzony w początkowej części notatnika.

> Instrukcja wykonania projektu

## Dane źródłowe

W poniższych paragrafach utwórz dwa obiekty `DataFrame`, których zawartością będą Twoje dane źródłowe. 

Użyj nazw czytelnie odnoszących się zawartości Twoich danych źródłowych. 

In [None]:
# Wczytanie danych orders z datasource1 jako DataFrame
orders_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f"{input_dir}/datasource1/*.csv")

# Sprawdzenie schematu
print("Schema orders_df:")
orders_df.printSchema()

In [None]:
# Wczytanie danych restaurants z datasource4 jako DataFrame
restaurants_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f"{input_dir}/datasource4/restaurants.csv")

# Sprawdzenie schematu
print("Schema restaurants_df:")
restaurants_df.printSchema()

> Instrukcja wykonania projektu

## Przetwarzanie - etap 1 - "MapReduce" 

### Przetwarzanie

Twoim zadaniem jest utworzenie obiektu `DataFrame` o nazwie `mapreduce_DF`, który w oparciu o: 

- utworzony kontekst, oraz
- pierwszy ze źródłowych `DataFrame` (`datasource1`)

będzie zawierał wynik zgodny z oczekiwaniami treści sekcji *Program MapReduce* dla Twojego zestawu. 

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Etap 1: MapReduce - statystyki zamówień dla restauracji i typów płatności

from pyspark.sql import functions as F

# Wczesna selekcja i projekcja
mapreduce_DF = orders_df \
    .filter(F.col("status") == "Completed") \
    .select("restaurant_id", "payment_type", "total_price_usd") \
    .groupBy("restaurant_id", "payment_type") \
    .agg(
        F.count("*").alias("orders_count"),
        F.round(F.avg("total_price_usd"), 6).alias("avg_total_price")
    )

> Instrukcja wykonania projektu

### Wynik

Pobierz liczbę elementów z `mapreduce_DF`. 

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
mapreduce_df_count = mapreduce_DF.count()
print(f"Liczba grup: {mapreduce_df_count}")

> Instrukcja wykonania projektu

Posortuj rosnąco elementy `mapreduce_DF` względem wszystkich wymaganych wynikowych atrybutów (w kolejności określonej w treści sekcji *Program MapReduce* dla Twojego zestawu). 

A następnie pobierz i wyświetl w oddzielnych liniach 10 pierwszych elementów. Skorzystaj z metody `show()`.

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
# Sortujemy według wszystkich atrybutów: restaurant_id, payment_type
mapreduce_DF \
    .orderBy("restaurant_id", "payment_type") \
    .show(10, truncate=False)

> Instrukcja wykonania projektu

## Przetwarzanie - etap 2 - "Hive" 

### Przetwarzanie

Twoim zadaniem jest utworzenie obiektu `DataFrame`o nazwie `hive_DF`, który w oparciu o: 

- utworzony powyżej `mapreduce_DF` 
- drugi ze źródłowych `DataFrame` (`datasource4`)

będzie zawierał wynik zgodny z oczekiwaniami treści z sekcji *Program Hive* dla Twojego zestawu. 

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Etap 2: Hive - łączenie danych i agregacja na poziomie krajów i rodzajów kuchni

from pyspark.sql import Window
from pyspark.sql import functions as F

# Wczesna projekcja
restaurants_selected = restaurants_df.select("restaurant_id", "country", "cuisine")

joined_df = mapreduce_DF.join(restaurants_selected, "restaurant_id", "inner")

In [None]:
# Agregacja na poziomie (country, cuisine)
country_cuisine_agg = joined_df \
    .groupBy("country", "cuisine") \
    .agg(
        F.sum("orders_count").alias("total_orders"),
        F.round(
            F.sum(F.col("orders_count") * F.col("avg_total_price")) / F.sum("orders_count"),
            6
        ).alias("avg_total_price")
    )

In [None]:
# Dodawanie rankingu w ramach każdego kraju

window_spec = Window.partitionBy("country").orderBy(F.col("total_orders").desc())

hive_DF = country_cuisine_agg \
    .withColumn("rank_in_country", F.rank().over(window_spec))

hive_DF = hive_DF.select(
    "country",
    "cuisine", 
    "total_orders",
    "avg_total_price",
    "rank_in_country"
)

> Instrukcja wykonania projektu

### Zapis wyniku 

Mając obiekt `DataFrame` o nazwie `hive_DF` dokonaj jego zapisu w docelowym miejscu określonym w opisie projektu dla tej części projektu.

*Uwaga!* uwzględnij fakt, że docelowe miejsce może być "zajęte". 
Przed utworzeniem nowego wyniku usuń ewentualne efekty poprzednich uruchomień notatnika.  
Zwróć uwagę, że efekty te mogą istnieć na wielu poziomach.

#### Usunięcie poprzednich wyników

Wykonaj operacje usuwające ewentualne poprzednio utworzone wyniki.

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
import subprocess

# Usuwamy katalog z danymi Delta Lake
try:
    subprocess.run(
        ["hadoop", "fs", "-rm", "-r", "/tmp/delta/output2"],
        stderr=subprocess.DEVNULL,
        check=False
    )
    print("Poprzednie wyniki Delta Lake usunięte")
except:
    print("Brak poprzednich wyników do usunięcia")

# Usuwamy tabelę z metastore (jeśli istnieje)
try:
    spark.sql("DROP TABLE IF EXISTS output2")
    print("Tabela output2 usunięta z metastore")
except:
    pass

> Instrukcja wykonania projektu

#### Zapis nowe wyniki

Dokonaj zapisu nowego wyniku.

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Zapisujemy DataFrame w formacie Delta Lake
hive_DF.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/tmp/delta/output2")

print("Wyniki zapisane w Delta Lake: /tmp/delta/output2")

# Opcjonalnie: rejestrujemy jako tabelę
spark.sql("""
    CREATE TABLE IF NOT EXISTS output2
    USING DELTA
    LOCATION '/tmp/delta/output2'
""")

print("Tabela output2 zarejestrowana w metastore")

> Instrukcja wykonania projektu

### Wynik 

Odczytaj do zmiennej `result_DF` dane z docelowego miejsca, w którym została zapisana zawartość `hive_DF`.

Wykorzystaj do tego celu poniższe paragrafy

In [None]:
# Wczytujemy wynik z Delta Lake
result_DF = spark.read.format("delta").load("/tmp/delta/output2")

# Lub alternatywnie z tabeli:
# result_DF = spark.table("output2")

> Instrukcja wykonania projektu

Pobierz liczbę elementów z `result_DF`. 

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
# Liczba wyników
result_df_count = result_DF.count()
print(f"Liczba wyników: {result_df_count}")

> Instrukcja wykonania projektu

Posortuj rosnąco elementy `result_DF` względem wszystkich wymaganych wynikowych atrybutów (w kolejności określonej w treści sekcji *Program Hive* dla Twojego zestawu). 

A następnie pobierz i wyświetl w oddzielnych liniach 10 pierwszych elementów. 

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
# Sortujemy według wszystkich atrybutów
result_DF \
    .orderBy("country", "cuisine", "total_orders", "avg_total_price", "rank_in_country") \
    .show(10, truncate=False)

> Instrukcja wykonania projektu

## Zamknięcie kontekstu 

Zamknij kontekst, wyłączając w ten sposób aplikację *Apache Spark*.

Wykorzystaj do tego celu poniższy paragraf.

In [None]:
spark.stop()
print("Sesja Spark zamknięta")

> Instrukcja wykonania projektu

# Podsumowanie 

Jeśli implementacja tego notatnika została przez Ciebie zakończona, koniecznie uruchom jego całość kilkukrotnie, aby upewnić się, że całość przetwarzania jest poprawna, a samo przetwarzanie jest powtarzalne. 

Na zakończenie, przed rejestracją tego notatnika w ramach projektu, pamiętaj o:

- usunięciu wartości zmiennej `input_dir`
- wyczyszczeniu wszystkich wyników (prawy klawisz myszy i pozycja *Clear Output of All Cells*). <br>
  Pozostawienie Twoich wyników może być potraktowane jako chęć wpływania na ocenę recenzentów
