# **Introduction à Apache Airflow**
Ce notebook est une mini formation sur **Apache Airflow**.

**Objectifs :**
- Comprendre le fonctionnement de **DAGs** et des **tasks**.
- Créer et exécuter un workflow Airflow.
- Manipuler les **Operators**, **Hooks** et **XComs**.


## **1. Installation d'Apache Airflow**

In [1]:
%pip install apache-airflow

Collecting apache-airflow
  Downloading apache_airflow-2.10.4-py3-none-any.whl.metadata (43 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Downloading argcomplete-3.5.3-py3-none-any.whl.metadata (16 kB)
Collecting asgiref>=2.3.0 (from apache-airflow)
  Downloading asgiref-3.8.1-py3-none-any.whl.metadata (9.3 kB)
Collecting colorlog>=6.8.2 (from apache-airflow)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Downloading ConfigUpdater-3.2-py2.py3-none-any.whl.metadata (10 kB)
Collecting connexion<3.0,>=2.14.2 (from connexion[flask]<3.0,>=2.14.2->apache-airflow)
  Downloading connexion-2.14.2-py2.py3-none-any.whl.metadata (28 kB)
Collecting cron-descriptor>=1.2.24 (from apache-airflow)
  Downloading cron_descriptor-1.4.5-py3-none-any.whl.metadata (5.7 kB)
Collecting croniter>=2.0.2 (from apache-airflow)
  Using cached croniter-6.0.0-py2.py3-none-any.whl.metadata (32 kB)
Collecting flask-caching>=2.0.0 (from a

## **2. Configuration d'Airflow**
Avant de démarrer, nous devons initialiser la base de données d'Airflow.

In [None]:
# export AIRFLOW_HOME=~/airflow
# airflow db init
# airflow webserver -p 8080
# airflow scheduler

UsageError: Line magic function `%` not found.


## **3. Démarrage d'Airflow**
Airflow fonctionne avec un **scheduler** et une **interface web**.
Les commandes suivantes permettent de les lancer.

In [5]:
% airflow scheduler &

UsageError: Line magic function `%` not found.


In [None]:
%airflow webserver -p 8080 &

## **4. Création d'un DAG simple**
Un DAG (Directed Acyclic Graph) représente un workflow dans Airflow.

In [7]:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# Fonction Python simple
def print_hello():
    print("Hello, Airflow!")

# Définition du DAG
dag = DAG(
    'simple_dag',
    description='Un DAG simple avec Airflow',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Définition des tâches
start_task = DummyOperator(task_id='start', dag=dag)
hello_task = PythonOperator(task_id='hello', python_callable=print_hello, dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)

# Définition de l'ordre d'exécution des tâches
start_task >> hello_task >> end_task


<Task(EmptyOperator): end>

## **5. Exécution d'un DAG**
Une fois notre DAG créé, nous pouvons l'exécuter avec la commande suivante :

In [8]:
%airflow dags list

UsageError: Line magic function `%airflow` not found.


In [None]:
%airflow dags trigger simple_dag

## **6. Utilisation d'Operators Avancés**
Airflow propose plusieurs types d'Operators pour orchestrer des tâches.

### **6.1 BashOperator : Exécuter une commande shell**

In [None]:

from airflow.operators.bash_operator import BashOperator

bash_task = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag
)

start_task >> bash_task >> hello_task >> end_task


### **6.2 PythonOperator : Exécuter du code Python**

In [None]:

def print_message():
    print("Airflow exécute cette fonction Python !")

python_task = PythonOperator(
    task_id='print_message',
    python_callable=print_message,
    dag=dag
)

bash_task >> python_task >> end_task


## **7. Communication entre Tâches avec XComs**
Les XComs permettent d'échanger des données entre tâches.

In [None]:

from airflow.operators.python_operator import PythonOperator

def push_data(**kwargs):
    kwargs['ti'].xcom_push(key='message', value='Bonjour depuis la tâche A!')

def pull_data(**kwargs):
    message = kwargs['ti'].xcom_pull(task_ids='push_task', key='message')
    print(f"Message reçu : {message}")

push_task = PythonOperator(task_id='push_task', python_callable=push_data, provide_context=True, dag=dag)
pull_task = PythonOperator(task_id='pull_task', python_callable=pull_data, provide_context=True, dag=dag)

push_task >> pull_task


## **8. Connexion à une Base de Données avec Airflow**
Airflow permet de se connecter à des bases de données avec les Hooks.

In [None]:

from airflow.providers.sqlite.hooks.sqlite import SqliteHook

def fetch_data():
    hook = SqliteHook(sqlite_conn_id='sqlite_default')
    result = hook.get_records("SELECT * FROM example_table")
    print(result)

fetch_task = PythonOperator(task_id='fetch_data', python_callable=fetch_data, dag=dag)
pull_task >> fetch_task


## **9. Arrêt d'Airflow**

In [None]:
%pkill -f 'airflow webserver'

In [None]:
%pkill -f 'airflow scheduler'

## **10. Conclusion**
- Nous avons vu comment installer et utiliser Airflow.
- Création et exécution d'un DAG simple.
- Utilisation des Operators et XComs.

**À tester :** Essayez de créer votre propre DAG avec plusieurs tâches ! 🚀