# Apache Airflow, część 2

## 1. Operatory bardziej szczegółowo.

W poprzednich zajęciach wspomniano o operatorach oraz ich głównych funkcjach w architekturze Apache Airflow. Baza operatorów już zdefiniowanych, tzw. `core operators` znajduje się poda adresem `URL 1`, a te które są zdefiniowane poza standardową biblioteką pod adresem `URL 2` lub `URL 3`.
> URL 1: https://airflow.apache.org/docs/apache-airflow/stable/operators-and-hooks-ref.html  
> URL 2: https://airflow.apache.org/docs/apache-airflow-providers/operators-and-hooks-ref/index.html  
> URL 3: https://registry.astronomer.io/modules?types=operators

Operatory wbudowane dzielą się na kilka rodzajów (patrz `URL 1`) i wyróżniamy:
* operatory bazowe -  klasy bazowe, z których następnie dziedziczą kolejne, specyficzne operatory
* **operatory wbudowane (core)**:
  *  `BashOperator` ([docs](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/bash/index.html#module-airflow.operators.bash)) - operator pozwalający na uruhcamianie poleceń powłoki bash,
  *  `PythonOperator` ([docs](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html#module-airflow.operators.python)) - operator pozwalający na wykonanie kodu języka Python, zalecane jest wykorzystanie dekoratora `@task` zamiast klasycznego podejścia z wykorzystaniem instancji klasy `PythonOperator`, zobacz opis i przykład [tu](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html)
  *  inne - np. `EmailOperator` , `EmptyOperator`.
*  **`Sensory`** - są to operatory, które wprowadzają możliwość pracy z taskami w formie eventów, gdyż te operatory zostały stworzone, aby dostarczać informacji (statusu) zainstnienia różnych zdarzeń, np.:
  *  `FileSensor` - pozwala sprawdzać czy w systemie plików pojawiły się określone pliki, co umożliwia wykonanie kolejnych tasków lub nie w zależności od wykrytego stanu,
  *  `ExternalTaskSensor` - sensor pozwalający na monitorowanie stanu wykonania w jakimś stopniu zależnych od siebie grafów zadań (ang. DAG), możliwe sprawdzenie stanu wykonania innego grafu,
  *  `TimeSensor` - oczekuje na określoną porę dnia,
  *  inne: `PythonSensor`, `BashSensor`, `TimeDeltaSensor`, `DayOfWeekSensor` i inne.

## 1.1 Przykład własnego grafu zadań wykorzystujących BashOperator oraz PythonOperator z wykorzystanie API TaskFlow.

#### 1.1.1 TaskFlow API

> Przewodnik z przykładem wykorzystania API TaskFlow: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

API TaskFlow jest wprowadzonym w wersji 2.0 zmodyfikowanym podejściem do definiowania grafów za pośrednictwem języka Python i bazuje na wykorzystaniu dekoratorów w miejsce tworzenia instancji klas samego DAG jak i poszczególnych operatorów.

Poniżej przykład z dokumentacji z linka powyżej.

_**Listing 1**_
```python
import json
import pendulum
from airflow.decorators import dag, task


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)

def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
        
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
        
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")

    # nie jest konieczne jawne deklarowanie zależności między taskami, jeżeli powinny zostać wywołane sekwencyjnie
    # kolejność ich wywołania będzie to determinowała w tym przypadku
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])

# i ostatecznie uruchomienie całego grafu 
tutorial_taskflow_api()
```

#### 1.1.1 Edycja konfiguracji i przebudowanie bazy Airflow.

Aby wskazać inny niż domyślny folder z grafami zadań należy zmienić wpis w konfiguracji Apache Airflow w pliku `airflow.cfg`. Domyślnie jest to ścieżka `/home/spark/airflow/airflow.cfg`. Jeżeli mapowanie tej ścieżki w oprogramowaniu docker zostało wykonane poprawnie (lab 8) to ten plik powinien być widoczny również z poziomu komputera hosta jeżeli w takiej konfiguracji zostało to uruchomione.

**Nie jest złym pomysłem wcześniejsze wykonanie kopii tego pliku**

Wpis w konfiguracji, który wskazuje położenie grafów to `core.dags_folder`, która znajduje się na samym początku pliku:
```bash
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
#
# Variable: AIRFLOW__CORE__DAGS_FOLDER
#
dags_folder = /home/spark/airflow/dags
```
Tutaj będziemy umieszczać zdefiniowane przez nas grafy.
Domyślnie Airflow wczytuje również wszystkie przykładowe grafy, które widoczne są na liście i na podstawie których przedstawiony był opis przykładowego grafu w lba 8. Aby wyłączyć wczytywanie przykładoweych grafów należy zmienić linię w konfiguracji z:
```bash
load_examples = True
```
na
```bash
load_examples = False
```

Lista grafów, które znajdują się na liście jest przeładowywana co pewien czas, ale informacje te znajdują się również w bazie, która jest konfigurowana przy pierwszym uruchomieniu Airflow.

Zmiana folderu grafów to dość daleko idąca zmiana, więc zazwyczaj wykonana w celu konfiguracji zupełnie nowego środowiska. Pozbędziemy się więc z bazy informacji, które już tam zostały zapisane.

Polecenie `airflow db reset` zresetuje bazę do ustawień początkowych. Po jej wykonaniu należy przeładować Apache Airflow i zalogować się nowym hasłem, które zostanie utworzone.
Lista dostępnych grafów powinna być teraz pusta.

> Więcej poleceń Airflow CLI dla bazy danych: https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#db

#### 1.1.2 Stworzenie własnego grafu zadań z wykorzystaniem TaskFlow API.

In [None]:
# sprawdzamy czy pakiet BeautifulSoup jest już zainstalowany
!pip list | grep beautifulsoup

In [None]:
# jeżeli nie to instalujemy
!pip install beautifulsoup4

**Listing 2**

In [None]:
# plik o nazwie bgg_top_games.py
import pendulum
from datetime import datetime, timedelta
from airflow.decorators import dag, task


@dag(
    schedule=timedelta(days=1),
    start_date=pendulum.datetime(2024, 12, 4, tz="UTC"),
    catchup=False,
    tags=["bgg"],
)
def bgg_top_games_list():
    """
    ### Zadania polegające na pobraniu aktualnego zestawienia najlepiej ocenianych gier planszowych
    z serwisu BoardGameGeek.com w postaci dokumentu HTML, parsowanie i zapisanie w konkretnym formacie
    danych.
    Adres zestawienia: https://boardgamegeek.com/browse/boardgame
    """

    # @task.bash(cwd='../data/bgg/raw/')
    # powyższa linia nie zadziała w naszym przypadku, gdyż narzędzie cwd nie jest zainstalowane w naszym obrazie dockerowym
    # będzie więc używana pełna ścieżka
    @task.bash
    def extract():
        """
        #### Zadanie ekstrakcji danych. Tu można podejść do tego na kilka sposobów. Np. pobrać
        dane bezpośrednio z poziomu Pythona, ale dla, żeby pokazać szersze spektrum zadań,
        użyte zostanie inne podejście. Dane zostaną pobrane z pomocą BashOperator i polecenia curl.
        """
        base_path = '/home/spark/airflow/data/bgg/raw/'
        filepath = f'{base_path}bgg_{datetime.strftime(datetime.now(), "%Y-%m-%d")}.html'
        command = f'curl -s https://boardgamegeek.com/browse/boardgame > {filepath} && echo {filepath}'

        return command
    
    @task()
    def transform(bgg_page_file: str):
        """
        #### Zadanie transformacji danych.
        """
        from bs4 import BeautifulSoup
        import csv

        csv_path = '/home/spark/airflow/data/bgg/csv/'

        print("-" * 100)
        print(f"Processing file: {bgg_page_file}")

        try:
            with open(bgg_page_file, 'r') as file:
                parsed_html = BeautifulSoup(file, 'html.parser')
        except OSError as err:
            raise OSError()

        # parsowanie tabeli i zapisanie danych jako json
        table_html = parsed_html.body.find('table', attrs={'class':'collection_table'})

        rows = table_html.find_all('tr')
        data = []
        col_names = []
        for row_id, row in enumerate(rows):
            if row_id == 0:
                col_names = [ele.text.strip() for ele in row.find_all('th')]
                continue
            cols = [ele.text.strip() for ele in row.find_all('td')]
            data.append([ele for ele in cols if ele])

        # zapisanie danych w formacie csv
        csv_filename = bgg_page_file.split('/')[-1].split('.')[0] + '.csv'
        try:
            with open(csv_path + csv_filename, 'w') as csvfile:
                bggwriter = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                bggwriter.writerow(col_names)
                bggwriter.writerows(data)
        except OSError as err:
            raise OSError()
        
        return csv_path + csv_filename

    @task
    def load(bgg_csv_file: str):
        import pandas as pd

        df = pd.read_csv(bgg_csv_file, header=0)
        print(df.info())
        print(df.head())
        

    bgg_page_of_the_day = extract()
    bgg_csv = transform(bgg_page_of_the_day)
    bgg_pandas_data = load(bgg_csv)


bgg_top_games_list()


> Definiowanie konfiguracji połączenia Spark w Airflow
https://airflow.apache.org/docs/apache-airflow-providers-apache-spark/stable/connections/spark-submit.html

> Aby możliwe było wywoływanie zadań Apache Spark z poziomu Apache Airflow konieczne będzie zainstalowanie poniższej paczki:
```console
pip install apache-airflow[apache-spark]
```

Kolejny przykład grafu zadań pokazuje wykorzystanie operatora `FileSensor`, który nasłuchuje czy w folderze `/home/spark/airflow/data/bgg/csv/` znajduje się jakiś plik, a jeżeli tak to kolejne zadanie z wykorzystaniem Sparka przetwarza ten plik.

Apache Airflow pozwala na zdefiniowanie połączeń do różnych usług i zapisanie ich do późniejszego wykorzystania w specjalnie do tego przeznaczonym miejscu. Dostajemy się do tych ustawień poprzez menu `Admin -> Connections`. W poniższym grafie wykorzystane są dwa takie połączenia.

Połączenie do systemu plików w zadaniu `wait_for_file` przekazywane przez parametr `fs_conn_id='fs_bgg'` zostało zdefiniowane w sposób przedstawiony na poniższym zrzucie ekranu.

![airflow_config_fs](airflow_config_fs.png)


Kolejne połączenie dotyczy zadania `read_csv_with_spark`, które wymaga połączenia do klastra Sparka. Ponownie w `connections` modyfikujemy lub dodajemy połączenie podając parametry jak na zrzucie ekranu poniżej.

![airflow_config_spark](airflow_config_spark.png)

In [None]:
# plik bgg_spark_warehouse.py
# przykład operatora Sensora
import pendulum
import os
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.sensors.filesystem import FileSensor
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from pyspark.sql import SparkSession
from pyspark import SparkContext


@dag(
    schedule=timedelta(hours=4),
    start_date=pendulum.datetime(2024, 12, 11, tz="UTC"),
    catchup=False,
    tags=["bgg"],
)
def bgg_save_to_warehouse():

    # base_filepath = '/home/spark/airflow/data/bgg/csv/'
    # today = datetime.strftime(datetime.now(), "%Y-%m-%d")

    # jeżeli ścieżka wskazuje na folder to nasłuchiwane będzie pojawienie się
    # jakiegokolwiek pliku w tym folderze
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        fs_conn_id='fs_bgg',
        filepath='csv',
        poke_interval=10,
        timeout=300
    )

    # @task.pyspark(conn_id="spark_default")
    # def read_csv_with_spark(spark: SparkSession, sc: SparkContext):
    #     print("Spark task!")
    #     for file in os.listdir(base_filepath):
    #         if file.endswith(".csv"):
    #             print("Plik odnaleziony!")
    #             print(f"{base_filepath + file}")
    #             print(spark.sparkContext)
    #             df = spark.read.csv(base_filepath + file, header=True, inferSchema=True)
    #             # print(df.show(5))

    spark_submit_task = SparkSubmitOperator(
        task_id='spark_read_csv',
        application='/opt/spark/work-dir/spark_scripts/read_bgg_csv.py',
        conn_id='spark_default',
        executor_cores=2,
        executor_memory='2g',
        num_executors=2,
        name='airflow-spark',
    )
    
    
    wait_for_file >> spark_submit_task
    

bgg_save_to_warehouse()


In [None]:
# plik read_bgg_csv.py
import os
from pyspark.sql import SparkSession


warehouse_location = '/opt/spark/work-dir/lab_07/metastore_db'
base_filepath = '/home/spark/airflow/data/bgg/csv/'

spark = SparkSession\
        .builder\
        .master("local[2]")\
        .appName("Apache SQL and Hive")\
        .config("spark.memory.offHeap.enabled","true")\
        .config("spark.memory.offHeap.size","4g")\
        .enableHiveSupport()\
        .config("spark.sql.warehouse.dir", warehouse_location)\
        .getOrCreate()

for file in os.listdir(base_filepath):
    if file.endswith(".csv"):
        df = spark.read.csv(base_filepath + file, header=True, inferSchema=True)
        print(df.show(5))

## Zadania

**Zadanie 1**  
Dodaj do kolekcji swoich grafów zadań w Apache Airflow graf z listingu 2. Uruchom graf i sprawdź czy każde zadanie wykonuje się poprawnie.

**Zadanie 2**  
Zmodyfikuj efekt działania grafu tak, aby:
* w kolumnie `Thumbnail image` znajdowała się ścieżka do pliku z grafiką (adres URL),
* w pliku znajdowała się jeszcze jedna kolumna o nazwie `description`, która zawierać będzie to, co w kolumnie `Title` jest opisem (tekst po dacie premiery gry w nawiasach () ),
* pozbądź się kolumny `shop`.

**Zadanie 3**  
Zmodyfikuj graf z listingu 2 tak, aby przetworzony plik csv po wykonaniu zadania `read_csv_with_spark` był przenoszony w inne miejsce, np. do podfolderu '/home/spark/airflow/data/bgg/processed/'. Dodaj nowy task w tym grafie i wykorzystaj operator bash do wykonania przeniesienia tego pliku.
