<a href="https://colab.research.google.com/github/kplr-training/Airflow/blob/main/Ateliers/Enonces/04-Pipeline%20-%20Postgres.ipynb" target="_blank"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Construction d'une pipeline d'exécution avec PostgreSQL 

![atelier4](https://user-images.githubusercontent.com/123757632/231912146-232774b4-c30c-4901-b4c1-ea27db59122b.png)

### Tâches de création de table

Il est possible d'utiliser PostgresOperator afin de définir des tâches qui créeront des tables dans la base de données postgres.

Il est prévu de créer deux tables : une pour faciliter le nettoyage des données (employees_temp) et une autre pour stocker les données nettoyées (employees).

* Tâche 1 :  
  - Tâche nommée "create_employees_table". 
  - Cette tâche est associée à une instance de PostgreSQL. 
  - La tâche exécute la commande SQL pour créer une table nommée "employees"  si elle n'existe pas avec des colonnes : 
      * "Serial Number" NUMERIC PRIMARY KEY,
      * "Company Name" TEXT,
      * "Employee Markme" TEXT,
      * "Description" TEXT,
      * "Leave" INTEGER

* Tâche 2 : 
  - Tâche nommée "create_employees_temp_table".
  - Cette tâche est associée à une instance de PostgreSQL. 
  - La tâche exécute une commande SQL pour supprimer une table nommée "employees_temp" si elle existe, puis crée une nouvelle table portant le même nom avec des colonnes : 
      * "Serial Number" NUMERIC PRIMARY KEY,
      * "Company Name" TEXT,
      * "Employee Markme" TEXT,
      * "Description" TEXT,
      * "Leave" INTEGER

### Tâche de récupération de données

Les données sont récupérées, enregistrées dans un fichier sur l'instance Airflow, puis chargées à partir de ce fichier dans une table intermédiaire afin de pouvoir exécuter les étapes de nettoyage des données.



Définir une fonction nommée **"get_data()"** qui est décorée par **"@task"**, indiquant qu'il s'agit d'une tâche Airflow. 

* Cette tâche télécharge un fichier CSV à partir de l'URL "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv" , 

* Utilise la bibliothèque **"requests"** pour envoyer une requête GET à l'URL et récupérer les données. 


In [None]:
url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"

* Les données sont ensuite écrites dans un fichier situé dans le chemin "data_path", qui est défini pour être "/opt/airflow/dags/files/employees.csv". Si le chemin n'existe pas, il est créé .  

In [None]:
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)


* Ensuite, la tâche utilise la bibliothèque **"psycopg2"** pour se connecter à la base de données PostgreSQL avec l'identifiant de connexion. 


In [None]:
postgres_hook = PostgresHook(postgres_conn_id="******")
conn = postgres_hook.get_conn()
cur = conn.cursor()


* Elle insère les données du fichier CSV dans une table temporaire nommée **"employees_temp"** à l'aide de la méthode "copy_expert" qui permet de copier les données du fichier dans la table.
    
                


In [None]:
with open(data_path, "r") as file:
        cur.copy_expert(
            "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
            file,
        )

* Enfin, la tâche confirme la transaction en appelant "conn.commit()".

### Tâche de fusion de données

Des enregistrements complètement uniques sont sélectionnés à partir des données récupérées, puis il est vérifié si des numéros de série d'employés sont déjà présents dans la base de données. Si tel est le cas, ces enregistrements sont mis à jour avec les nouvelles données.

Définir une fonction nommée "merge_data()" qui est décorée par "@task", indiquant qu'il s'agit d'une tâche Airflow.

* La tâche fusionne les données d'une table temporaire appelée **employees_temp** dans une table permanente appelée **employees**. Pour ce faire, elle sélectionne toutes les lignes distinctes de employees_temp et les insère dans employees, ou met à jour les lignes existantes s'il y a un conflit sur la colonne "Serial Number".

* La tâche est exécutée à l'aide d'un **PostgresHook**, qui se connecte à une base de données **PostgreSQL** et exécute des requêtes SQL. 

* La requête SQL utilisée dans cette tâche est définie dans la variable de requête et utilise la syntaxe INSERT INTO ... 

In [None]:
query = """
        INSERT INTO employees
        SELECT *
        FROM (
            SELECT DISTINCT *
            FROM employees_temp
        ) t
        ON CONFLICT ("Serial Number") DO UPDATE
        SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
    """

* ON CONFLICT de Postgres pour gérer les conflits lors de l'insertion de données dans la table employees.
Si la tâche s'exécute avec succès, elle renvoie 0. Si une exception se produit, elle renvoie 1. (Utilisez Exception)

### Finalisation du DAG

* Utiliser l'opérateur **>>** pour définir les dépendances entre les tâches. Cela indique que les tâches à gauche de l'opérateur doivent être exécutées avant la tâche à droite de l'opérateur.

* Plus précisément, la ligne de code établit les dépendances suivantes :

  * Les tâches create_employees_table et create_employees_temp_table doivent être exécutées avant la tâche get_data().
  * La tâche get_data() doit être exécutée avant la tâche merge_data().