# Dynamic DAG Creation : Création dynamique de DAGs pour écrire et lire des données dans PostgreSQL en utilisant les hooks 

![pgaware_multi_dynamic_hook](https://user-images.githubusercontent.com/123757632/231905847-f4620b8c-b165-4d35-9b9e-afa22bc73a71.png)

In [None]:
from pendulum import datetime
from airflow import DAG, settings, Dataset
from airflow.models import Connection
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

def create_dag(dag_id, schedule, pg_conn_id, default_args):
    suffix = pg_conn_id[-2:]
    table_name = "table_stock_"+suffix

    pg_dataset=[Dataset(f"postgres://airflow:airflow@postgres:5432/mydatabase{suffix}?table={table_name}")]


La fonction create_dag commence par extraire le suffixe de l'ID de connexion Postgres (pg_conn_id) pour obtenir le nom de la table à utiliser. Elle crée ensuite un objet Dataset à partir de cette table en utilisant la classe Dataset fournie par Airflow. Cette classe permet de manipuler des données stockées dans différents types de sources de données (bases de données, fichiers, etc.) et de les passer entre les opérateurs du DAG.

Dans ce cas, l'objet Dataset est créé en utilisant la chaîne de connexion à la base de données Postgres, qui comprend le nom d'utilisateur (airflow), le mot de passe (airflow), le nom de l'hôte (postgres) et le port (5432). Le nom de la base de données est construit à partir du suffixe extrait de l'ID de connexion Postgres. La chaîne "?table={table_name}" est utilisée pour spécifier la table qui sera lue ou écrite.

In [None]:
 def write_to_postgres(*args):
        print("Hello Dynamic Postgre DAGS")
        print("This is DAG: {}".format(str(pg_conn_id)))

        # Create a PostgresHook
        hook = PostgresHook(postgres_conn_id=pg_conn_id)

        # Execute a query
        query = (f"""CREATE TABLE IF NOT EXISTS {table_name} (price TEXT,date TEXT)""")
        hook.run(query)

        query = (f"INSERT INTO {table_name} (col1, col2) VALUES ('price_stock_{suffix}', 'date_stock_{suffix}')")
        hook.run(query)

        query = (f"SELECT * FROM {table_name}")
        rows = hook.get_records(query)
        for row in rows:
                print(row)

La fonction **write_to_postgres** se connecte à une base de données Postgres via PostgresHook, exécute des requêtes SQL et affiche les résultats.

Elle prend un nombre variable d'arguments en entrée via l'argument *args, mais ne les utilise pas. Elle affiche simplement un message de bienvenue et le nom de la connexion pg_conn_id fourni en paramètre.

**write_to_postgres** crée un objet PostgresHook pour la connexion pg_conn_id fournie et exécute trois requêtes SQL. La première requête crée une table nommée table_stock_<suffix> si elle n'existe pas déjà. La seconde requête insère une ligne dans la table avec les valeurs price_stock_<suffix> et date_stock_<suffix>. La troisième requête sélectionne toutes les lignes de la table et les affiche à l'écran avec la boucle for.

La variable suffix est définie en extrayant les deux derniers caractères de la chaîne pg_conn_id. La variable table_name est définie comme table_stock_<suffix> pour éviter les conflits entre les tables créées par différentes instances de la fonction write_to_postgres.

In [None]:
dag = DAG(
        dag_id,
        schedule=schedule,
        default_args=default_args)

    with dag:
      t1 = PythonOperator(
            task_id="write_to_postgres",
            outlets=pg_dataset,
            python_callable=write_to_postgres)

    return dag

Création d'une instance de DAG avec une seule tâche définie, qui utilise la classe PythonOperator. Les arguments fournis à PythonOperator sont task_id, outlets et python_callable. task_id est l'identifiant unique pour la tâche, outlets est un ensemble de connexions à des systèmes externes que la tâche utilise, et python_callable est la fonction **write_to_postgres** Python à exécuter.

In [None]:
session = settings.Session()
conns = (
    session.query(Connection.conn_id)
    .filter(Connection.conn_id.ilike("%MY_DATABASE_CONN%"))
    .all()
)

Récupération de toutes les connexions dont l'identifiant contient la chaîne de caractères "MY_DATABASE_CONN". Il crée une session pour interagir avec la base de données d'Airflow et exécute une requête pour récupérer les identifiants de connexions correspondantes. Les résultats sont stockés dans la variable **conns**.

In [None]:
for conn in conns:

    # BEWARE : the returned connection ID format is messed UP 
    # and needs to be cleansed and sanitized first 
    # otherwise all the rest of the code will break.
    conn = str(conn).strip("(),'")

    dag_id = "pg_dynamic_{}".format(conn)

    default_args = {"owner": "airflow", "start_date": datetime(2023, 1, 1)}

    schedule = "@daily"
    pg_conn_id = conn

    globals()[dag_id] = create_dag(dag_id, schedule, pg_conn_id, default_args)


# Prepare pg_datasets
all_pg_datasets=[]

Pour chaque identifiant de connexion dans la liste conns, on nettoie la chaîne de caractères de l'identifiant et crée un nouvel identifiant de DAG unique. Ensuite, on définit des paramètres par défaut pour les DAG (propriétaire, date de début, etc.) et crée le DAG en appelant la fonction **create_dag** avec les paramètres appropriés.

Enfin, le code crée une liste vide **all_pg_datasets** pour stocker tous les jeux de données Postgres associés à chaque DAG créé ultérieurement.

In [None]:
for conn in conns:
    conn = str(conn).strip("(),'")
    suffix = conn[-2:]
    table_name = "table_stock_"+suffix
    all_pg_datasets.append(Dataset(f"postgres://airflow:airflow@postgres:5432/mydatabase{suffix}?table={table_name}"))

Pour chaque identifiant de connexion dans la liste conns, on nettoie la chaîne de caractères de l'identifiant et extrait les deux derniers caractères pour créer un suffixe unique. Ensuite, on crée un nom de table Postgres unique en utilisant ce suffixe et stocke un nouveau dataset Postgres dans la liste **all_pg_datasets** en utilisant la classe Dataset. Le dataset contient l'URL de connexion à la base de données Postgres correspondante, avec le nom de la table Postgres correspondant à **table_name**.

In [None]:
with DAG(
    'read_from_postgres_hook_aware_MULTI',
    start_date=datetime(2023, 1, 1),
    schedule = all_pg_datasets
) as dag:
   
    def read_from_postgres():

        for conn in conns:
            conn = str(conn).strip("(),'")
            suffix = conn[-2:]
            table_name = "table_stock_"+suffix

            # Execute hook
            hook = PostgresHook(postgres_conn_id=conn)

            # Execute query
            query = (f"SELECT * FROM {table_name}")
            rows = hook.get_records(query)
            for row in rows:
                    print(row)


Créer un DAG Airflow qui effectue la lecture de données à partir de tables spécifiques dans une base de données Postgres. Le DAG est planifié pour s'exécuter en fonction de la fréquence de chaque jeu de données Postgres, définie dans la liste all_pg_datasets.

Pour chaque connexion, le code crée un suffixe unique et un nom de table correspondant. Ensuite, il crée un hook Postgres en utilisant l'identifiant de connexion Postgres correspondant et exécute une requête SQL en utilisant le nom de table unique. Les données résultantes sont récupérées en utilisant la méthode get_records de l'objet hook Postgres, puis chaque ligne est imprimée sur la console en utilisant la fonction read_from_postgres.

Le DAG est nommé "read_from_postgres_hook_aware_MULTI" et une date de début datetime est définie.

In [None]:
read_task = PythonOperator(
        task_id='read_task',
        python_callable=read_from_postgres,
        dag=dag
    )

Création de la tâche "read_task" pour exécuter la fonction **read_from_postgres** définie précédemment en tant que callable Python.