#Tworzenie środowiska Composer

In [None]:
my-composer-environment # Nazwa
data_stream_airflow # Zasobnik niestandardowy
europe-central2 # Lokalizacja
europe-central2-c # Strefa

#Ustawianie zmiennych środowiskowych Apache Airflow

- Zmienne Apache Airflow `gcp_project, gcs_bucket, gce_zone` to specyficzna koncepcja Airflow, która różni się od zmiennych środowiskowych.

- Ta metoda nie działa!

In [None]:
COMPOSER_INSTANCE=my-composer-environment

In [None]:
PROJECT_ID=third-essence-345723

In [None]:
GCS_BUCKET=gs://data_stream_airflow

In [None]:
GCE_ZONE=europe-central2-c

In [None]:
gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location europe-central2 variables -- --set gcp_project ${PROJECT_ID}

In [None]:
gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location europe-central2 variables -- --set gcs_bucket ${GCS_BUCKET}

In [None]:
gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location europe-central2 variables -- --set gce_zone ${GCE_ZONE}

## Użycie interfejsu użytkownika Airflow:
- Zaloguj się do interfejsu użytkownika Airflow w Cloud Composer.
- Przejdź do sekcji Admin > Variables i dodaj/edytuj zmienne ręcznie.

#DAG

In [None]:
# Importowanie bibliotek i modułów
import datetime  # Importowanie modułu datetime do manipulacji datami i czasami.
import os  # Importowanie modułu os do obsługi zmiennych środowiskowych i operacji na ścieżkach.

from airflow import models  # Importowanie models z Airflow do tworzenia DAG-ów.
from airflow.contrib.operators import dataproc_operator  # Importowanie operatorów Dataproc do zarządzania klastrami Dataproc.
from airflow.utils import trigger_rule  # Importowanie trigger_rule do definiowania warunków uruchamiania zadań.

# Określenie ścieżki wyjściowej dla wyników zadania Dataproc w Google Cloud Storage.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep  # Ścieżka pliku jest dynamiczna i zawiera znacznik czasu.

# Ścieżka do przykładowego pliku JAR Hadoop wordcount dostępnego na każdym klastrze Dataproc.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

# Argumenty przekazywane do zadania Dataproc.
input_file = 'gs://pub/shakespeare/rose.txt'  # Ścieżka do pliku wejściowego w GCS (zawierającego tekst).
wordcount_args = ['wordcount', input_file, output_file]  # Argumenty dla zadania Hadoop.

# Ustalenie daty rozpoczęcia DAG-a (dzień wczorajszy).
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

# Argumenty domyślne dla DAG-a.
default_dag_args = {
    'start_date': yesterday,  # Data startowa.
    'email_on_failure': False,  # Wyłączone wysyłanie e-maili przy błędach.
    'email_on_retry': False,  # Wyłączone wysyłanie e-maili przy ponownych próbach.
    'retries': 1,  # Liczba ponownych prób w przypadku błędu zadania.
    'retry_delay': datetime.timedelta(minutes=5),  # Opóźnienie między próbami.
    'project_id': models.Variable.get('gcp_project')  # Identyfikator projektu GCP pobrany z zmiennych Airflow.
}

# Definicja DAG-a.
with models.DAG(
        'composer_hadoop_tutorial',  # Nazwa DAG-a.
        schedule_interval=datetime.timedelta(days=1),  # Harmonogram uruchamiania - raz dziennie.
        default_args=default_dag_args) as dag:

    # Zadanie tworzenia klastra Cloud Dataproc.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',  # Unikalny identyfikator zadania.
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',  # Nazwa klastra z dynamicznie generowaną datą.
        num_workers=2,  # Liczba węzłów roboczych.
        zone=models.Variable.get('gce_zone'),  # Strefa GCE pobrana z zmiennych Airflow.
        master_machine_type='n1-standard-1',  # Typ maszyny dla węzła master.
        worker_machine_type='n1-standard-1')  # Typ maszyn dla węzłów roboczych.

    # Zadanie uruchomienia przykładu Hadoop wordcount.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',  # Unikalny identyfikator zadania.
        main_jar=WORDCOUNT_JAR,  # Ścieżka do pliku JAR Hadoop.
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',  # Nazwa klastra.
        arguments=wordcount_args)  # Argumenty dla zadania.

    # Zadanie usunięcia klastra Cloud Dataproc.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',  # Unikalny identyfikator zadania.
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',  # Nazwa klastra.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)  # Reguła uruchamiania - uruchom nawet w przypadku błędu poprzednich zadań.

    # Definicja zależności między zadaniami.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

In [None]:
import datetime
import os
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator, DataprocSubmitJobOperator, DataprocDeleteClusterOperator
from airflow.utils import trigger_rule

# Ścieżka wyjściowa dla zadania Dataproc
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

# Ścieżka do przykładowego pliku JAR Hadoop wordcount
WORDCOUNT_JAR = 'gs://hadoop-jar-files/hadoop-mapreduce-examples.jar'

# Plik wejściowy dla zadania
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

# Data startowa dla DAG
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

# Domyślne argumenty dla DAG
default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# Definicja regionu (należy ustawić odpowiednią zmienną Airflow lub podać region bezpośrednio)
region = models.Variable.get('dataproc_region', 'europe-central2')  # Ustaw domyślny region

with models.DAG(
        'composer_hadoop',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),  # Ustaw strefę z odpowiedniej zmiennej
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2',
        region=region,  # Dodano region
        internal_ip_only=False  # Ustawienie internal_ip_only na false
    )

    run_dataproc_hadoop = DataprocSubmitJobOperator(
        task_id='run_dataproc_hadoop',
        job={
            "placement": {"cluster_name": 'composer-hadoop-tutorial-cluster-{{ ds_nodash }}'},
            "hadoop_job": {
                "main_class": "org.apache.hadoop.examples.WordCount",
                "jar_file_uris": [WORDCOUNT_JAR],
                "args": wordcount_args,
            },
        },
        region=region  # Dodano region
    )

    delete_dataproc_cluster = DataprocDeleteClusterOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
        region=region  # Dodano region
    )

    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster