# Tentative de propositon d'aggréation pour avoir une ligne par prévélement dans une table 
***

## Le travail ayant déjà été fait par des bénévoles

- https://github.com/dataforgoodfr/13_pollution_eau/pull/45 par SebM42
- https://github.com/dataforgoodfr/13_pollution_eau/pull/49 par LounesAbd
- https://outline.services.dataforgood.fr/doc/notes-sur-la-table-edc_prelevements-N7BwMGDZcQ par LounesAbd

**Résumé de ces études**
- Il y a un `referenceprel` présent en double, à deux dates différentes : 06600184310

- Nous avons trois types de cas dans la table edc_prelevements :<br>
[1] les prélèvements réalisés directement sur l'UDI en question (pas de 'cdreseauamont' dans ce cas)<br>
[2] les prélèvements réalisés sur un UDI en amont qui dispose lui aussi de sa ligne dans la table (respecte le cas 1)<br>
[3] les prélèvements réalisés en amont sur un 'cdreseauamont' qui n'a pas de ligne propre dans la table (ne respecte pas le cas 1)<br>

***

In [1]:
# [TO RUN]  Packages
import pandas as pd

pd.set_option("display.max_columns", None)  # show all cols
pd.set_option("display.max_colwidth", None)  # show full width of showing cols
pd.set_option(
    "display.expand_frame_repr", False
)  # print cols side by side as it's supposed to be

In [2]:
# [TO RUN]
import duckdb
from pipelines.tasks.config.common import DUCKDB_FILE

con = duckdb.connect(database=DUCKDB_FILE, read_only=True)

In [3]:
# [OPTIONAL] Preview edc_prelevements
preview_prel = con.sql("SELECT * FROM edc_prelevements LIMIT 2").df()
preview_prel

Unnamed: 0,cddept,cdreseau,inseecommuneprinc,nomcommuneprinc,cdreseauamont,nomreseauamont,pourcentdebit,referenceprel,dateprel,heureprel,conclusionprel,ugelib,distrlib,moalib,plvconformitebacterio,plvconformitechimique,plvconformitereferencebact,plvconformitereferencechim,de_partition,de_ingestion_date,de_dataset_datetime
0,1,1000003,1007,AMBRONAY,,,,100119766,2020-02-13,11h40,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,SI REGION D'AMBERIEU-EN-BUGEY,SIE REGION D'AMBERIEU-EN-BUGEY,SIE REGION D'AMBERIEU-EN-BUGEY,C,C,C,C,2020,2025-02-14,20230811-150005
1,1,1000003,1007,AMBRONAY,1001304.0,TTP (CLG) AMBRONAY,100 %,100120290,2020-03-17,11h15,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,SI REGION D'AMBERIEU-EN-BUGEY,SIE REGION D'AMBERIEU-EN-BUGEY,SIE REGION D'AMBERIEU-EN-BUGEY,C,C,C,C,2020,2025-02-14,20230811-150005


# 1. Supression de la referenceprel en double
***

In [4]:
query_Test1 = """
SELECT
    referenceprel,
    COUNT(DISTINCT dateprel) AS nb
FROM 
    edc_prelevements
GROUP BY
   1
HAVING 
    nb >1
"""
con.sql(query_Test1).show()

┌───────────────┬───────┐
│ referenceprel │  nb   │
│    varchar    │ int64 │
├───────────────┼───────┤
│ 06600184310   │     2 │
└───────────────┴───────┘



In [5]:
query_ranked = """
WITH 
ranked AS (
    SELECT
        *,
        ROW_NUMBER() OVER ( 
                PARTITION BY referenceprel
                ORDER BY
                    dateprel,
                    heureprel
            )  AS row_num
        FROM 
            edc_prelevements
        )

SELECT 
    * EXCLUDE (row_num)
FROM 
        ranked
WHERE 
       row_num = 1
"""

ranked_df = con.sql(query_ranked).df()
ranked_df.head()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,cddept,cdreseau,inseecommuneprinc,nomcommuneprinc,cdreseauamont,nomreseauamont,pourcentdebit,referenceprel,dateprel,heureprel,conclusionprel,ugelib,distrlib,moalib,plvconformitebacterio,plvconformitechimique,plvconformitereferencebact,plvconformitereferencechim,de_partition,de_ingestion_date,de_dataset_datetime
0,51,51000460,51065,BLACY,,,,5100131511,2022-12-21,12h40,"Eau d’alimentation non conforme aux exigences réglementaires fixées à 0,1 µg/l en distribution par molécule individuelle pour les paramètr es pesticide. Cependant, la/les valeur(s) détectée(s) reste(nt) inférieure(s) à la valeur sanitaire transitoire fixée à 3 µg/l pour les métabolites du Chloridazone. Il n’y a donc pas lieu de restreindre la consommation d’eau. Toutefois, afin de suivre l’évolution de la chloridazone et de ses deux métabolites, une surveillance renforcée est mise en place.",CDC VITRY CHAMPAGNE ET DER VEOLIA,VEOLIA EAU,CDC DE VITRY CHAMPAGNE ET DER,C,N,C,C,2022,2025-02-14,20230707-102238
1,51,51004051,51582,TRIGNY,,,,5100131515,2022-12-16,12h21,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,CU GRAND REIMS - EAU ARDRE ET VESLE,EAU ARDRE ET VESLE (CLIG51),COMMUNAUTE URBAINE GRAND REIMS,C,C,C,C,2022,2025-02-14,20230707-102238
2,51,51000885,51454,REIMS,,,,5100131538,2022-12-28,14h33,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,CU GRAND REIMS REGIE,COMMUNAUTE URBAINE GRAND REIMS,COMMUNAUTE URBAINE GRAND REIMS,C,C,C,C,2022,2025-02-14,20230707-102238
3,51,51000461,51328,LOISY-SUR-MARNE,,,,5100131595,2023-01-05,10h29,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,CDC VITRY CHAMPAGNE ET DER VEOLIA,VEOLIA EAU,CDC DE VITRY CHAMPAGNE ET DER,C,C,C,C,2023,2025-02-14,20241014-073810
4,51,51000643,51380,MONTMIRAIL,,,,5100131630,2023-01-10,13h37,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,CDC BRIE CHAMPENOISE LDE,SUEZ-EAU-FRANCE,CDC DE LA BRIE CHAMPENOISE,C,C,C,C,2023,2025-02-14,20241014-073810


In [6]:
query_Test1 = """
SELECT
    referenceprel,
    COUNT(DISTINCT dateprel) AS nb
FROM 
   ranked_df
GROUP BY
   1
HAVING 
    nb >1
"""
con.sql(query_Test1).show()

┌───────────────┬───────┐
│ referenceprel │  nb   │
│    varchar    │ int64 │
├───────────────┴───────┤
│        0 rows         │
└───────────────────────┘



# 2. Avoir une ligne par referenceprel 
***

In [7]:
# Compter le nombre de paramètre unique par referenceprel

query_per_referenceprel = """
WITH 
PER_REF AS (
    SELECT
      referenceprel ,
      COUNT(DISTINCT COALESCE(inseecommuneprinc ,'NULL')) AS nb_inseecommuneprinc  ,
      COUNT(DISTINCT COALESCE(nomcommuneprinc,'NULL')) AS nb_nomcommuneprinc ,
      COUNT(DISTINCT COALESCE(dateprel,'1970-01-01')) AS nb_dateprel ,	
      COUNT(DISTINCT COALESCE(heureprel,'NULL')	) AS nb_heureprel ,
      COUNT(DISTINCT COALESCE(conclusionprel,'NULL')	) AS nb_conclusionprel ,
      COUNT(DISTINCT COALESCE(ugelib,'NULL')	) AS nb_ugelib ,
      COUNT(DISTINCT COALESCE(distrlib,'NULL')) AS nb_distrlib ,
      COUNT(DISTINCT COALESCE(moalib,'NULL')) AS nb_moalib ,
      COUNT(DISTINCT COALESCE(plvconformitebacterio,'NULL')) AS nb_plvconformitebacterio ,	
      COUNT(DISTINCT COALESCE(plvconformitechimique,'NULL')	) AS nb_plvconformitechimique ,
      COUNT(DISTINCT COALESCE(plvconformitereferencebact,'NULL')) AS nb_plvconformitereferencebact ,
      COUNT(DISTINCT COALESCE(plvconformitereferencechim,'NULL')) AS nb_plvconformitereferencechim ,
      COUNT(DISTINCT COALESCE(cdreseau,'NULL')) AS nb_cdreseau ,
      COUNT(DISTINCT COALESCE(cdreseauamont,'NULL')) AS nb_cdreseauamont ,
      COUNT(DISTINCT COALESCE(pourcentdebit,'NULL')) AS nb_pourcentdebit ,
    FROM 
        edc_prelevements
    WHERE
        referenceprel != '06600184310'
    GROUP BY
        referenceprel 
        )
        
SELECT
  COUNT(referenceprel) AS nb_referenceprel ,
  SUM(nb_inseecommuneprinc) AS nb_inseecommuneprinc ,
  SUM(nb_nomcommuneprinc) AS nb_nomcommuneprinc ,
  SUM(nb_dateprel) AS nb_dateprel ,	
  SUM(nb_heureprel) AS nb_heureprel ,
  SUM(nb_conclusionprel) AS nb_conclusionprel ,
  SUM(nb_ugelib) AS nb_ugelib ,
  SUM(nb_distrlib) AS nb_distrlib ,
  SUM(nb_moalib) AS nb_moalib ,
  SUM(nb_plvconformitebacterio) AS nb_plvconformitebacterio ,	
  SUM(nb_plvconformitechimique) AS nb_plvconformitechimique ,
  SUM(nb_plvconformitereferencebact) AS nb_plvconformitereferencebact ,
  SUM(nb_plvconformitereferencechim) AS nb_plvconformitereferencechim ,
  SUM(nb_cdreseau) AS nb_cdreseau ,
  SUM(nb_cdreseauamont) AS nb_cdreseauamont ,
  SUM(nb_pourcentdebit) AS nb_pourcentdebit ,
FROM
  PER_REF
"""

per_referenceprel_df = con.sql(query_per_referenceprel).df()
per_referenceprel_df

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,nb_referenceprel,nb_inseecommuneprinc,nb_nomcommuneprinc,nb_dateprel,nb_heureprel,nb_conclusionprel,nb_ugelib,nb_distrlib,nb_moalib,nb_plvconformitebacterio,nb_plvconformitechimique,nb_plvconformitereferencebact,nb_plvconformitereferencechim,nb_cdreseau,nb_cdreseauamont,nb_pourcentdebit
0,1436113,1436113.0,1436113.0,1436113.0,1436113.0,1436113.0,1436113.0,1436113.0,1436113.0,1436113.0,1436113.0,1436113.0,1436113.0,2083343.0,1654880.0,1779642.0


Les champs qui sont bien unique par `referenceprel` sont :
- inseecommuneprinc
- nomcommuneprincdateprel
- conclusionprel
- ugelib
- distrlib
- moalib
- plvconformitebacterio
- plvconformitechimique
- plvconformitereferencebact
- plvconformitereferencechim
- heureprel

Les champs qui varient par `referenceprel` sont :
- cdreseau
- cdreseauamont
- pourcentdebit

# Proposition agrégations
***

Dans le données "brutes" 
- `cdreseau` est le "Code de l'installation (unité de distribution)"
- `cdreseauamont` le "Code de l’installation se trouvant en amont (niveau 1) de l’unité de distribution (cdreseau). Il s’agit généralement d’une
          installation de traitement, transport ou production d’eau.
          Cette donnée n’est renseignée que si le prélevement a été effectivement réalisé sur un point de surveillance d’une
          installation amont."

Ici on crée `cdfirstreseauamont` est le Code SISE-Eaux de **la première installation** (unité de distribution) du prélèvement



[1] les prélèvements réalisés directement sur l'UDI en question (pas de 'cdreseauamont' dans ce cas)<br>
--> Si cdreseauamont IS NULL alors cdfirstreseauamont = cdreseau


[2] les prélèvements réalisés sur un UDI en amont qui dispose lui aussi de sa ligne dans la table (respecte le cas 1)<br>
--> Si cdreseauamont IS NOT NULL ET cdreseau=cdreseauamont alors cdfirstreseauamont = cdreseau = cdreseauamont

[3] les prélèvements réalisés en amont sur un 'cdreseauamont' qui n'a pas de ligne propre dans la table (ne respecte pas le cas 1)<br>
--> Si cdreseauamont IS NOT NULL ET cdreseau!=cdreseauamont alors cdfirstreseauamont = cdreseauamont

On peut donc résumé par <br>
Si cdreseauamont IS NULL alors cdfirstreseauamont = cdreseau<br>
Si cdreseauamont IS NOT NULL alors cdfirstreseauamont = cdreseauamont<br>


```
      (CASE 
        WHEN cdreseauamont IS NULL THEN cdreseau
        WHEN cdreseauamont IS NOT NULL THEN cdreseauamont
      END) AS cdfirstreseauamont,
```

In [8]:
query_AGG = """
WITH 
prelevements_cdfirstreseauamont AS (
    SELECT DISTINCT
      referenceprel , 
      (CASE 
        WHEN cdreseauamont IS NULL THEN cdreseau
        WHEN cdreseauamont IS NOT NULL THEN cdreseauamont
      END) AS cdfirstreseauamont,
      dateprel ,	
      heureprel ,
      TRY_STRPTIME(dateprel || ' ' || REPLACE(heureprel, 'h', ':'), '%Y-%m-%d %H:%M') AS datetimeprel,
      conclusionprel ,
      plvconformitebacterio ,	
      plvconformitechimique ,
      plvconformitereferencebact ,
      plvconformitereferencechim ,
    FROM
        edc_prelevements ),

ranked AS (
    SELECT
        *,
        ROW_NUMBER() OVER ( 
                PARTITION BY referenceprel
                ORDER BY
                    dateprel,
                    heureprel
            )  AS row_num
        FROM 
            prelevements_cdfirstreseauamont
        )

SELECT 
    * EXCLUDE (row_num)
FROM 
        ranked
WHERE 
       row_num = 1
"""

AGG_df = con.sql(query_AGG).df()
AGG_df.head()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,referenceprel,cdfirstreseauamont,dateprel,heureprel,datetimeprel,conclusionprel,plvconformitebacterio,plvconformitechimique,plvconformitereferencebact,plvconformitereferencechim
0,7600258662,76000741,2020-03-05,09h30,2020-03-05 09:30:00,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,C,C,C,C
1,7600258670,76000688,2020-02-20,08h50,2020-02-20 08:50:00,Eau non conforme aux exigences de qualité pour la déséthyl atrazine. L a collectivité bénéficie d'une dérogation préfectorale pen dant la réalisation des travaux nécessaires au retour à la conformité. L'eau peut être consommée sans risque pour la santé.,S,D,S,C
2,7600258773,76000393,2020-03-11,11h34,2020-03-11 11:34:00,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,C,C,C,C
3,7600258953,76002269,2020-03-12,13h51,2020-03-12 13:51:00,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,C,C,C,C
4,7600258958,76000797,2020-03-26,10h35,2020-03-26 10:35:00,Eau d'alimentation conforme aux exigences de qualité en vigueur pour l'ensemble des paramètres mesurés.,C,C,C,C


In [9]:
query_check = """
SELECT
    referenceprel,
    COUNT(DISTINCT cdfirstreseauamont) AS nb_cdfirstreseauamont
FROM 
    AGG_df
GROUP BY
   1
HAVING 
    nb_cdfirstreseauamont >1
"""
con.sql(query_check).show()

┌───────────────┬───────────────────────┐
│ referenceprel │ nb_cdfirstreseauamont │
│    varchar    │         int64         │
├───────────────┴───────────────────────┤
│                0 rows                 │
└───────────────────────────────────────┘



In [10]:
# [TO RUN] To check Test 2
query_check2 = """
    SELECT DISTINCT
        *
    FROM
        AGG_df 
    WHERE
        referenceprel = '06600184310'
"""
con.sql(query_check2).df()

Unnamed: 0,referenceprel,cdfirstreseauamont,dateprel,heureprel,datetimeprel,conclusionprel,plvconformitebacterio,plvconformitechimique,plvconformitereferencebact,plvconformitereferencechim
0,6600184310,66000125,2021-07-01,14h32,2021-07-01 14:32:00,Eau d'alimentation non conforme aux exigences de qualité en vigueur. Ce non respect des exigences porte sur la température de l'eau.,C,C,C,N
