# Construction d'un pipeline de traitement de données issues des logs d'une application health en temps réel à l'intérieur de snowflake
___

## 1. Initialisation

**Création d'une database `health_app`**

In [None]:
CREATE OR ALTER DATABASE health_app;

Sélectionne la base de données `"health_app"` comme base de travail par défaut.
Toutes les opérations suivantes (création de schémas, tables, vues, tâches, etc.) seront exécutées dans ce contexte de base de données, sauf si un autre nom de base est explicitement précisé dans les instructions SQL.

In [None]:
USE DATABASE health_app;

**Création du schéma `raw`**

In [None]:
CREATE OR ALTER SCHEMA raw;

**Création des tables brutes pour recevoir les événements : `raw.raw_events`**

In [None]:
CREATE OR ALTER TABLE raw.raw_events (
    event_timestamp TIMESTAMP,
    process_name STRING,
    process_id NUMBER,
    message STRING
);

> À ce stade, le pipeline est prêt à recevoir des données mais rien n’est encore en mouvement.

## 2. Ingestion automatique

**Définition du format de fichier (`raw.csv_file`) et du stage interne (`raw.internal_stage`)**

In [None]:
-- Format CSV
CREATE OR ALTER FILE FORMAT raw.csv_file 
TYPE = CSV 
FIELD_DELIMITER = '|' 
TIMESTAMP_FORMAT = 'YYYYMMDD-HH24:MI:SS:FF3';

-- Stage interne
CREATE OR ALTER STAGE raw.internal_stage
FILE_FORMAT = raw.csv_file;

**Création d’un pipe Snowpipe (`raw.load_raw_data`) configuré avec `AUTO_INGEST = TRUE`**

In [None]:
CREATE OR REPLACE PIPE raw.load_raw_data
  AUTO_INGEST = TRUE
  AS
    COPY INTO RAW.RAW_EVENTS (event_timestamp, process_name, process_id, message)
    FROM
    (SELECT
        $1 AS event_timestamp,
        $2 AS process_name,
        $3 AS process_id,
        $4 AS message
    FROM @raw.internal_stage)
  FILE_FORMAT = raw.csv_file;

> Dès qu’un fichier est déposé dans le stage, le pipe déclenche automatiquement le chargement vers raw.raw_events.
>
> Cela correspond à *l’entrée des données dans le pipeline*.

## 3. Stream sur la table brute

In [None]:
CREATE OR REPLACE STREAM raw.raw_events_stream 
ON TABLE raw.raw_events 
APPEND_ONLY = TRUE;

> Le stream `raw.raw_events_stream` capture les changements incrémentaux (`INSERT` uniquement) sur `raw.raw_events`.
>
> Il agit comme un tampon : il permet aux tâches suivantes de traiter uniquement les nouvelles données arrivées.

## 4. Tables intermédiaires et de log

In [None]:
-- Données à traiter
CREATE OR ALTER TABLE raw.data_to_process (
    event_id NUMBER,
    event_timestamp TIMESTAMP,
    process_name STRING,
    process_id NUMBER,
    message STRING
);

-- Table de log des traitements
CREATE OR ALTER TABLE raw.logging (
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    graph_run_group_id STRING,
    table_name STRING,
    n_rows NUMBER,
    error_message STRING DEFAULT NULL
);

-- Table des anomalies
CREATE OR ALTER TABLE raw.data_anomalies (
    event_id NUMBER, 
    is_correct_timestamp BOOLEAN,
    is_correct_process_name BOOLEAN,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    graph_run_group_id STRING
);

-- Statut du pipeline
CREATE OR ALTER TABLE raw.transformation_pipline_status (
    graph_run_group_id STRING,
    started_at TIMESTAMP,
    finished_at TIMESTAMP,
    status STRING
);

- `raw.data_to_process` : staging des données à transformer.

- `raw.logging` : suivi des logs d’exécution.

- `raw.data_anomalies` : stockage des anomalies détectées.

- `raw.transformation_pipline_status` : suivi global de l’état du pipeline (succès / échec).

> Ces tables servent de support opérationnel et de métadonnées de suivi pour le pipeline.

## 5. Fonctions UDF Python

In [None]:
CREATE OR REPLACE FUNCTION raw.extract_log_trigger(message STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.12'
HANDLER = 'extract_log_trigger'
AS $$
def extract_log_trigger(message: str):
    return message.strip().split(" ")[0].split(":")[0].split("=")[0].strip()
$$;

CREATE OR REPLACE FUNCTION raw.extract_log_message(message STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.12'
HANDLER = 'extract_log_trigger'
AS $$
def extract_log_trigger(message: str):
    msg_trigger = message.strip().split(" ")[0].split(":")[0].split("=")[0].strip()
    return message.replace(msg_trigger, "").strip()
$$;

`extract_log_trigger` et `extract_log_message` : fonctions de parsing du champ message.

> Elles sont utilisées dans les procédures d’enrichissement pour séparer les éléments du log.

## 6. Procédures stockées (logique métier)

**L’ordre d’appel logique des procédures est le suivant :**

In [None]:
-- 6.1 Log des résultats
CREATE OR REPLACE PROCEDURE raw.log_results(graph_run_group_id STRING, table_name STRING, n_rows NUMBER, error_message STRING)
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
INSERT INTO raw.logging (graph_run_group_id, table_name, n_rows, error_message)
VALUES (:graph_run_group_id, :table_name, :n_rows, :error_message);
$$;

-- 6.2 Identification des nouvelles données
CREATE OR REPLACE PROCEDURE raw.identify_new_data(graph_run_group_id STRING)
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
    insert_exception EXCEPTION (-20001, 'Exception in data loading into raw.data_to_process table');
BEGIN
    LET n_rows INT := 0;
    INSERT INTO raw.data_to_process (event_id, event_timestamp, process_name, process_id, message)
    WITH source AS (
        SELECT event_id, event_timestamp, process_name, process_id, message
        FROM raw.raw_events_stream
    )
    SELECT * FROM source;
    n_rows := SQLROWCOUNT;
    CALL raw.log_results(:graph_run_group_id, 'raw.data_to_process', :n_rows, NULL);
EXCEPTION
    WHEN OTHER THEN
        CALL raw.log_results(:graph_run_group_id, 'raw.data_to_process', NULL, :SQLERRM);
        RAISE insert_exception;
    RETURN :n_rows;
END;
$$;

-- 6.3 Contrôle qualité
CREATE OR REPLACE PROCEDURE raw.data_quality(graph_run_group_id STRING)
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
BEGIN
    LET number_of_incorrect_lines INT := 0;
    INSERT INTO raw.data_anomalies (event_id, is_correct_timestamp, is_correct_process_name, graph_run_group_id)
    WITH source AS (
        SELECT
            event_id,
            raw.check_correct_timestamp(event_timestamp) AS is_correct_timestamp,
            raw.check_correct_process_name(process_name) AS is_correct_process_name
        FROM raw.data_to_process
    )
    SELECT *, :graph_run_group_id
    FROM source
    WHERE is_correct_timestamp = FALSE OR is_correct_process_name = FALSE;
    number_of_incorrect_lines := SQLROWCOUNT;
    RETURN :number_of_incorrect_lines;
END;
$$;

-- 6.4 Enrichissement des données
CREATE OR REPLACE PROCEDURE raw.enrich_data(table_name STRING, process_name STRING, graph_run_group_id STRING)
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
    full_table_name STRING := CONCAT('staging.', :table_name);
    insert_exception EXCEPTION (-20002, 'Exception in data loading into staging tables');
BEGIN
    LET n_rows INT := 0;
    INSERT INTO IDENTIFIER(:full_table_name) (event_timestamp, process_id, log_trigger, message)
    WITH source AS (
        SELECT
            dtp.event_timestamp,
            dtp.process_name,
            dtp.process_id,
            raw.extract_log_trigger(dtp.message) AS log_trigger,
            raw.extract_log_message(dtp.message) AS message
        FROM raw.data_to_process dtp
        LEFT JOIN raw.data_anomalies da
        ON dtp.event_id = da.event_id
        WHERE process_name = :process_name 
        AND da.event_id IS NULL
    )
    SELECT event_timestamp, process_id, log_trigger, message
    FROM source;
    n_rows := SQLROWCOUNT;
    CALL raw.log_results(:graph_run_group_id, :table_name, :n_rows, NULL);
EXCEPTION
    WHEN OTHER THEN
        CALL raw.log_results(:graph_run_group_id, :table_name, NULL, :SQLERRM);
        RAISE insert_exception;
    RETURN :n_rows;
END;
$$;

-- 6.5 Finalisation du pipeline
CREATE OR REPLACE PROCEDURE raw.finalize_transformation(graph_run_group_id STRING, started_at STRING)
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
    pipeline_exception EXCEPTION (-20003, 'Exception in the transformation pipeline');
BEGIN
    LET n_errors INT := 0;
    SELECT COUNT(*) INTO n_errors
    FROM raw.logging
    WHERE graph_run_group_id = :graph_run_group_id AND error_message IS NOT NULL;
    
    INSERT INTO raw.transformation_pipline_status (graph_run_group_id, started_at, finished_at, status)
    SELECT
        :graph_run_group_id,
        :started_at,
        CURRENT_TIMESTAMP(),
        IFF(:n_errors > 0, 'FAILED', 'SUCCEEDED');

    IF (:n_errors = 0) THEN
        TRUNCATE TABLE raw.data_to_process;
    ELSE
        RAISE pipeline_exception;
    END IF;
END;
$$;

- `raw.identify_new_data`

    - Charge les nouvelles lignes depuis le stream `raw.raw_events_stream` vers `raw.data_to_process`.

    - Log le résultat.

- `raw.data_quality`

    - Contrôle la validité des données (timestamp, process_name).

    - Alimente la table raw.data_anomalies.

- `raw.enrich_data`

    - Exécute le parsing (`extract_log_trigger`, `extract_log_message`) et insère les données enrichies dans les tables de `staging` correspondantes.

    - Une instance par type de processus (ex. `Step_LSC`, `HiH_HiSyncUtil…`).

- `raw.finalize_transformation`

    - Vérifie si des erreurs sont survenues.

    - Met à jour le statut du pipeline (`SUCCEEDED` ou `FAILED`).

    - Vide la table intermédiaires `raw.data_to_process` si tout s’est bien passé.

## 7. Orchestration par les Tasks

Les tasks Snowflake définissent l’ordre d’exécution automatique :

**Chaîne logique:**

In [None]:
-- Racine : ingestion de nouvelles données
CREATE OR ALTER TASK raw.identify_new_data_task
WAREHOUSE = COMPUTE_WH
WHEN SYSTEM$STREAM_HAS_DATA('raw.raw_events_stream')
AS
DECLARE
    graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN
    CALL raw.identify_new_data(:graph_run_group_id);
END;

-- Contrôle qualité
CREATE OR ALTER TASK raw.data_quality_task
WAREHOUSE = COMPUTE_WH
AFTER raw.identify_new_data_task
AS
DECLARE
    graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN
    CALL raw.data_quality(:graph_run_group_id);
END;

-- Enrichissements parallèles
CREATE OR ALTER TASK raw.hih_listener_manager
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('hih_listener_manager', 'HiH_ListenerManager', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.hih_hibroadcastutil
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('hih_hi_broadcast_util', 'HiH_HiBroadcastUtil', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.step_standstepcounter
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('step_stand_step_counter', 'Step_StandStepCounter', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.step_sputils
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('step_sp_utils', 'Step_SPUtils', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.step_lsc
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('step_lsc', 'Step_LSC', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.hih_hihealthdatainsertstore
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('hih_hi_health_data_insert_store', 'HiH_HiHealthDataInsertStore', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.hih_datastatmanager
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('hih_data_stat_manager', 'HiH_DataStatManager', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.hih_hisyncutil
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('hih_hi_sync_util', 'HiH_HiSyncUtil', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.step_standreportreceiver
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('step_stand_report_receiver', 'Step_StandReportReceiver', :graph_run_group_id); END;

CREATE OR ALTER TASK raw.step_screenutil
WAREHOUSE = COMPUTE_WH
AFTER raw.data_quality_task
AS
DECLARE graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
BEGIN CALL raw.enrich_data('step_screen_util', 'Step_ScreenUtil', :graph_run_group_id); END;

-- Task finale
CREATE OR ALTER TASK raw.finalize_transformation_task
WAREHOUSE = COMPUTE_WH
FINALIZE = 'raw.identify_new_data_task'
AS
DECLARE
    graph_run_group_id STRING := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID');
    started_at TIMESTAMP := SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_ORIGINAL_SCHEDULED_TIMESTAMP');
BEGIN
    CALL raw.finalize_transformation(:graph_run_group_id, :started_at);
END;

- `raw.identify_new_data_task` : déclencheur principal (s’exécute quand le stream a des données).

- `raw.data_quality_task` : dépend du précédent (`AFTER identify_new_data_task`).

- `Tasks d’enrichissement` : dépendent toutes de `data_quality_task`, et s’exécutent en parallèle.

- `raw.finalize_transformation_task` : clôture le cycle (`FINALIZE`).

> Cette orchestration assure un pipeline asynchrone, incrémental et résilient, entièrement géré dans Snowflake.

## 8. Activation du pipeline

In [None]:
ALTER TASK raw.identify_new_data_task SUSPEND;

ALTER TASK raw.data_quality_task RESUME;
ALTER TASK raw.hih_listener_manager RESUME;
ALTER TASK raw.hih_hibroadcastutil RESUME;
ALTER TASK raw.step_standstepcounter RESUME;
ALTER TASK raw.step_sputils RESUME;
ALTER TASK raw.step_lsc RESUME;
ALTER TASK raw.hih_hihealthdatainsertstore RESUME;
ALTER TASK raw.hih_datastatmanager RESUME;
ALTER TASK raw.hih_hisyncutil RESUME;
ALTER TASK raw.step_standreportreceiver RESUME;
ALTER TASK raw.step_screenutil RESUME;

ALTER TASK raw.identify_new_data_task RESUME;
ALTER TASK raw.finalize_transformation_task RESUME;

> Suspension et reprise sélective des tasks pour s’assurer du bon ordre d’activation.
>
> Une fois toutes les tasks activées, le pipeline est entièrement opérationnel.

## 9. Tests du pipeline

In [None]:
TRUNCATE TABLE raw.raw_events;
TRUNCATE TABLE raw.data_to_process;
TRUNCATE TABLE staging.step_lsc;

INSERT INTO raw.raw_events (event_timestamp, process_name, process_id, message)
VALUES ('2017-12-23 22:15:29.606'::TIMESTAMP, 'Step_LSC', 30002312, 'onStandStepChanged 3579');

SELECT * FROM raw.raw_events_stream;

SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START=>DATEADD('hour',-1,current_timestamp())
))
WHERE schema_name = 'RAW';

- Vidage des tables (`TRUNCATE`) pour un état propre.

- Insertion d’un événement test.

- Consultation du stream et de l’historique des tasks via `INFORMATION_SCHEMA.TASK_HISTORY`.

> Permet de valider le déclenchement et le chaînage correct du pipeline.