# Librairies

In [1]:
# Divers
import os
import time
from datetime import datetime
import numpy as np
from shapely.geometry import box
from scipy.sparse import coo_matrix

# Intake
from dotenv import load_dotenv
from intake import open_catalog

# Pandas
import pandas as pd
import geopandas as gpd

# H3
import h3
from h3pandas.util import shapely
from tobler.util import h3fy
from tobler.area_weighted import area_interpolate

# SQL
from sqlalchemy import *
from geoalchemy2 import Geometry, WKTElement

# Dask
import dask_geopandas as ddg
from dask.distributed import Client

# Constantes

In [2]:
# Aire de chaque hexagone (de la résolution 0 à 15) en Km2
hex_area=[4250546.8477000, 607220.9782429, 86745.8540347, 12392.2648621, 1770.3235517, 252.9033645, 36.1290521,
              5.1612932, 0.7373276, 0.1053325, 0.0150475, 0.0021496, 0.0003071, 0.0000439, 0.0000063, 0.0000009]

In [3]:
# Chargement des constantes d'environnement
load_dotenv()

usr=os.getenv("DB_USER")
pswd=os.getenv("DB_PWD")
host=os.getenv("DB_HOST")
port=os.getenv("DB_PORT")
home=os.getenv("HOME_PATH")
db_traitement=os.getenv("DB_WORKSPACE")
db_ref=os.getenv("DB_REF")
db_externe=os.getenv("DB_EXT")
dwh_fact_strategy=os.getenv("DWH_FACT_STRATEGY")
dwh_dim_strategy=os.getenv("DWH_DIM_STRATEGY")


commun_path=os.getenv("COMMUN_PATH")
project_dir=os.getenv("PROJECT_PATH")
data_catalog_dir=os.getenv("DATA_CATALOG_DIR")
data_output_dir=os.getenv("DATA_OUTPUT_DIR")
sig_data_path=os.getenv("SIG_DATA_PATH")
db_workspace=os.getenv("DB_WORKSPACE")
db_workspace=os.getenv("DB_REF")

# Fonctions

## Fonctions Dask

### Récupération du client

In [None]:
# Récupération du scheduler orchestrant les workers
client = Client() # Scheduler / Workers locaux  "192.168.1.24:8786"

In [None]:
client # Infos sur le client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 15.78 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:58104,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 15.78 GiB

0,1
Comm: tcp://127.0.0.1:58131,Total threads: 4
Dashboard: http://127.0.0.1:58132/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:58108,
Local directory: c:\Users\stagiaire\Documents\bilbo\python\dask-worker-space\worker-ov7fwl9a,Local directory: c:\Users\stagiaire\Documents\bilbo\python\dask-worker-space\worker-ov7fwl9a

0,1
Comm: tcp://127.0.0.1:58134,Total threads: 4
Dashboard: http://127.0.0.1:58135/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:58107,
Local directory: c:\Users\stagiaire\Documents\bilbo\python\dask-worker-space\worker-y7kwps_3,Local directory: c:\Users\stagiaire\Documents\bilbo\python\dask-worker-space\worker-y7kwps_3

0,1
Comm: tcp://127.0.0.1:58138,Total threads: 4
Dashboard: http://127.0.0.1:58141/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:58109,
Local directory: c:\Users\stagiaire\Documents\bilbo\python\dask-worker-space\worker-utjm06ou,Local directory: c:\Users\stagiaire\Documents\bilbo\python\dask-worker-space\worker-utjm06ou

0,1
Comm: tcp://127.0.0.1:58137,Total threads: 4
Dashboard: http://127.0.0.1:58139/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:58110,
Local directory: c:\Users\stagiaire\Documents\bilbo\python\dask-worker-space\worker-50zhgbjp,Local directory: c:\Users\stagiaire\Documents\bilbo\python\dask-worker-space\worker-50zhgbjp


In [None]:
# Fermeture du client
client.close()

### Fonctions

In [None]:
"""
Fonction retournant un DataFrame après ajout d'une colonne hex_id en index.
En d'autres termes, cette fonction indexe un GeoDataFrame sur une grille uniforme.
Elle utilise les procédés de parallélisation et de clustering de la librairie Dask 
afin d'accélérer les temps de calculs.

param gdf: GeoDataFrame en entrée
param npartitions: nombre de tâches à effectuer en parallèle
param resolution: résolution des hexagones

return: DataFrame 

"""
def indexation_dask(gdf, npartitions, resolution):

    # Structure du DataFrame renvoyé en sortie
    # On ne conserve pas la colonne géométrie
    df_meta = pd.DataFrame(columns=list(gdf.columns))
    df_meta.drop(columns=[gdf.geometry.name], inplace=True)
    df_meta.index.names = ['hex_id']
    
    data = ddg.from_geopandas(gdf,npartitions)
    gdf_map = data.map_partitions(func=indexation, resolution=resolution, meta=df_meta)
    client.persist(gdf_map)
    return gdf_map.compute()

In [None]:
def compact_dask(tab_name, schema, gdf, tx_spatial, res_min, res_max, nb_cluster=1, i_start=1, fast=False):
    if nb_cluster > len(gdf):
        nb_cluster = len(gdf)

    with open('suivi.txt', 'a') as f:
            f.write(f"{datetime.now()} (compact_dask)\nnom: {schema}.{tab_name}\n")
            f.write(f"nb_cluster: {nb_cluster} | res: {res_min}-{res_max}\n")

    list_of_sep = np.linspace(0, len(gdf), num=nb_cluster+1, endpoint=True, dtype=int)
    for i in range (i_start-1, len(list_of_sep)-1):
        start_time = time.time()
        print('Cluster ' +  str((i+1)))

        gdf_cluster = gdf.iloc[list_of_sep[i]:list_of_sep[i+1]]
        cluster_output = compact_dask_fct(gdf_cluster, len(gdf_cluster)-1, tx_spatial, res_min, res_max, fast)

        # Connexion à la base de données "oeil_traitement"
        connection = getEngine()    

        # Intégration de la table dans le DWH
        updateTable(cluster_output, tab_name, connection, schema, methode="append", geom=False)    
        connection.dispose()

        with open('suivi.txt', 'a') as f:
            f.write('Cluster ' +  str((i+1)) + '\n')
            f.write("   --- %s objets ---\n" % (len(cluster_output)))
            f.write("   --- %s secs ---\n" % (round((time.time() - start_time), 2)))

        print("   --- %s objets ---" % (len(cluster_output)))
        print("   --- %s secs ---" % (round((time.time() - start_time), 2))) 

    with open('suivi.txt', 'a') as f:
        f.write("\n") 

In [None]:
def compact_dask_partition(tab_name, schema, gdf, decoupage, nb_cluster, npartitions, colonne, tx, res_min, res_max, i_start=1, tx_spatial=0.5, debug=False):   
    gdf.index.names = ['index']
    if nb_cluster > len(decoupage):
        nb_cluster = len(decoupage)

    len_buffer = np.sqrt(hex_area[res_min]*1000000)*0.6

    if not debug:
        with open('suivi.txt', 'a') as f:
                f.write(f"{datetime.now()} (compact_dask_partition)\nnom: {schema}.{tab_name}\n")
                f.write(f"nb_cluster: {nb_cluster} | npartitions: {npartitions} | colonne: {colonne} | res: {res_min}-{res_max}\n")

    list_of_sep = np.linspace(0, len(decoupage), num=nb_cluster+1, endpoint=True, dtype=int)
    for i in range (i_start-1, len(list_of_sep)-1):
        start_time = time.time()
        print('Cluster ' +  str((i+1)))
        decoupage_cluster = decoupage.iloc[list_of_sep[i]:list_of_sep[i+1]]
        # gdf_cluster = gpd.clip(gdf.to_crs(4326), gpd.GeoDataFrame(columns={'geometry':decoupage_cluster.geometry.union}, geometry='geometry', crs="EPSG:4326"), keep_geom_type=True)
        gdf_cluster = gpd.clip(gdf, decoupage_cluster["geometry"].buffer(len_buffer))
        cluster_output = compact_dask_partition_fct(gdf_cluster, decoupage_cluster, npartitions, colonne, tx, res_min, res_max, tx_spatial)

        if not debug:
            # Connexion à la base de données "oeil_traitement"
            connection = getEngine()    
            # Intégration de la table dans le DWH
            updateTable(cluster_output, tab_name, connection, schema, methode="append", geom=False)    
            connection.dispose()

            with open('suivi.txt', 'a') as f:
                f.write('Cluster ' +  str((i+1)) + '\n')
                f.write("   --- %s objets ---\n" % (len(cluster_output)))
                f.write("   --- %s secs ---\n" % (round((time.time() - start_time), 2)))

        print("   --- %s objets ---" % (len(cluster_output)))
        print("   --- %s secs ---" % (round((time.time() - start_time), 2)))
        
    if not debug:
        with open('suivi.txt', 'a') as f:
                f.write("\n")

## Fonctions de traitements

In [7]:
"""
Fonction retournant un DataFrame après ajout d'une colonne hex_id en index.
En d'autres termes, cette fonction indexe un GeoDataFrame sur une grille uniforme.

param gdf: GeoDataFrame en entrée
param resolution: résolution des hexagones
return: DataFrame

"""
def indexation(gdf, resolution):
    # Mise en conformité de la colonne "geometry"
    if(gdf.geometry.name != 'geometry'):
        gdf.rename_geometry('geometry',inplace=True)

    gdf = h3_polyfill(gdf.to_crs(epsg=4326), resolution) # Indexation du DataFrame
    df = pd.DataFrame(gdf.drop(columns={gdf.geometry.name})) # Suppression de la colonne "geometry"
    df = df.explode('hex_id') # Explosion des liste d'identifiants hexagonaux
    df.set_index('hex_id', inplace=True) # Définition de l'index
    
    return df

In [8]:
"""
Fonction indexant un GeoDataFrame sur une grille dynamique.

param gdf: GeoDataFrame en entrée
param tx_spatial: taux minimal de remplissage d'une zone de données pour chaque échelle de résolution
    (sous forme de dictionnaire)
param res_min: résolution minimale des hexagones
param res_max: résolution maximale des hexagones
return: DataFrame

"""
def compact(gdf, tx_spatial, res_min, res_max, fast=False):
    
    tx_spatial[res_max] = 0.5
    
    # Création d'un GeoDataFrame vide
    output = pd.DataFrame(columns=gdf.drop(columns=[gdf.geometry.name]).columns.tolist())
    output.index.names = ['hex_id']

    len_buffer = np.sqrt(hex_area[res_min]*1000000)*1
    gdf_init = gpd.GeoSeries(gdf.unary_union.buffer(len_buffer), crs="EPSG:3163")
    hex = h3fy(gdf_init,res_min)

    for index_obj, row_obj in gdf.iterrows():
        res = res_min
        gdf_invalid = hex
        geom_obj = row_obj[gdf.geometry.name]

        while(res <= res_max):
            valid_cells = [] # Cellules n'ayant pas besoin d'être divisées
            valid_geom = []
            invalid_cells = [] # Cellules ayant besoin d'être divisées
            invalid_geom = []

            clip = to_children(gdf_invalid, res)
            for index_clip, row_clip in clip.iterrows():
                hex_geom = row_clip.geometry
                area = geom_obj.intersection(hex_geom).area
                if fast:
                    intersection = area
                else:
                    intersection = geom_obj.intersection(hex_geom.buffer(np.sqrt(hex_area[res]*1000000)*0.1)).area
                if  intersection == 0:
                    continue
                elif area/hex_geom.area >= tx_spatial[res]:
                    valid_cells.append(index_clip)
                    valid_geom.append(hex_geom)
                elif res < res_max:
                    invalid_cells.append(index_clip)
                    invalid_geom.append(hex_geom)

            data = {}
            for key, value in row_obj.iteritems():
                if key != gdf.geometry.name:
                    data[key] = value
            valid_gdf = pd.DataFrame(data, index=valid_cells)
            valid_gdf.index.name = "hex_id"

            if res < res_max:
                gdf_invalid = clip.loc[invalid_cells]

            output = pd.concat([output, valid_gdf], ignore_index=False) 
            res+=1
    return output

## Fonctions BDD

In [10]:
"""
Fonction retournant l'engine de connexion à la base de données.

param user: user
param pswd: mot de passe
param host: hôte
param dbase: nom de la base de données

"""
def getEngine(user=usr, pswd=pswd, host=host, dbase=db_traitement):
    connection = f'postgresql://{user}:{pswd}@{host}:{port}/{dbase}'
    return create_engine(connection)

In [11]:
"""
Fonction d'intégration ou de mise à jour des données dans le DWH.

param new_lines: DataFrame contenant les données à intégrer
param table_name: nom de la table de destination 
    (en mode 'append' si la table est inexistante ou en mode 'replace', une nouvelle table sera créee)
param engine: engine de connexion à la base de données
param schema: schéma dans lequel se trouve la table de destination
param methode: 'append' pour ajouter des données à une table déjà existante ou 
    'replace' pour écraser la table de destination si elle existe déjà
param geom: Boolean indiquant si les données contiennent une dimension géométrique
param dtype: dictionnaire {nom_champ:type} permettant d'indique le type de certains champs

"""

def updateTable(new_lines, table_name, engine, schema, methode='append', geom=True, dtype=None, geometry_type='POLYGON', index_label='hex_id', chunksize=None):
    dict_types = {'geometry': Geometry(geometry_type, srid=3163)}
    
    if(methode=='replace'): # Suppression de la table de destination si elle existe déjà
        engine.execute(f'DROP TABLE IF EXISTS {schema}.{table_name} CASCADE')
    if(dtype is not None): # Mise à jour du dictionnaire des types de champs
        dict_types.update(dtype)
        
    if(not new_lines.empty): # Si des données sont à integrer
        if(geom): # Si les données contiennent une dimension géométrique
            if(new_lines.geometry.name != 'geom'):
                new_lines = new_lines.rename_geometry('geom')
            new_lines['geometry'] = new_lines['geom'].apply(lambda x: WKTElement(x.wkt, srid=3163))
            new_lines.drop('geom', 1, inplace=True)
            new_lines.to_sql(name=table_name, con=engine, schema=schema, if_exists=methode, index=True, index_label=index_label, dtype=dict_types, chunksize=chunksize)
        else:
            new_lines.to_sql(name=table_name, con=engine, schema=schema, if_exists=methode, index=True, index_label=index_label, dtype=dict_types, chunksize=chunksize)
    return new_lines

In [12]:
"""
Génération d'une vue ajoutant une colonne de géométrie à la table donnée en argument.

param table_name: nom de la table
param engine: engine de connexion à la base de données
param schema: schéma

"""
def geomView(table_name, engine, schema):
    query = f'DROP VIEW IF EXISTS {schema}.view_{table_name};' + f'CREATE VIEW {schema}.view_{table_name} AS (SELECT row_number() OVER() AS id, *, h3_to_geo_boundary(hex_id::h3index)::geometry AS geometry FROM {schema}.{table_name})'
    engine.execute(query)

## Fonctions utiles

In [22]:
"""
Fonction permettant de charger une table sous forme de DataFrame à partir 
d'un catalogue Intake.

param catalog: catalogue intake
param table_name: nom de la table référencé dans le catalogue

return: DataFrame

"""

def loadData(catalog, table_name):
    dataName = f"{table_name}"
    entryCatalog = getattr(open_catalog(catalog),dataName)
    data = entryCatalog
    return data.read()

In [24]:
"""
Fonction permettant de remplacer les valeurs d'un ou plusieurs champ(s) 
d'une table par les valeurs d'un champ d'une autre table (appelé champ 
de standardisation) suivant une jointure définie.

param df: DataFrame en entrée
df_right: DataFrame de jointure indexé sur son champ de jointure
std_field_right: champ de standardisation de field_right
dic: {join_field_df: champ de jointure de df, num_col: numéro de colonne du futur champ standardisé}

return: DataFrame standardisé

"""

def standardizeField(df, df_right, std_field_right, dic):
    for join_field_df, num_col in dic.items():
        std_field_df = df.join(df_right, on=join_field_df)[std_field_right] # Création du champ standardisé
        df = df.drop(join_field_df, axis= 1) # Suppression du champ non standardisé
        df.insert(num_col, join_field_df, std_field_df, allow_duplicates=True) # Insertion du champ standardisé
    return df

In [25]:
"""
Fonction retourant un dictionnaire associant à chaque résolution le taux à partir duquel un 
élément classifiant est considéré comme unique dans chaque cellule.

param min_carto_unit_m: unité minimale de cartographie (plus petit détail visible) en m
return: dictionnaire

"""
def compute_dict_tx(min_carto_unit_m):
    dic = {}
    for i in range (16):
        val = 1-(((min_carto_unit_m**2)/1000000)/hex_area[i])
        if val >= 0:
            dic[i] = val
        else:
            dic[i] = 0
    return dic

In [26]:
"""
Fonction adaptée de h3pandas.polyfill. 
Fonction ajoutant une colonne contenant les idenifiants des hexagones relatifs à 
l'objet concerné pour la résolution donnée.

param gdf: GeoDataFrame en entrée
param resolution: résolution des hexagones

return: GeoDataFrame

"""
def h3_polyfill(gdf, resolution):
    def func(row):
        return list(shapely.polyfill(row.geometry, resolution, True))
    result = gdf.apply(func, axis=1) # Application de la fonction à chaque ligne
    assign_args = {"hex_id": result}
    return gdf.assign(**assign_args)

In [None]:
def segmentation(gdf, res_min):
    # Application d'un buffer de la taille d'un demi-hexagone
    len_buffer = np.sqrt(hex_area[res_min]*1000000)*0.6
    gdf_init = gpd.GeoSeries(gdf.geometry.unary_union.buffer(len_buffer), crs="EPSG:3163")
    return h3fy(gdf_init,res_min)

In [27]:
"""
Fonction retournant la liste des enfants (et leur géométrie) des hexagones 
stockés dans un GeoDataFrame.

param gdf: GeoDataFrame en entrée (hex_id doit être en index)
param resolution: résolution des hexagones enfants

return: GeoDataFrame

"""

def to_children(gdf, resolution=None):
    list_index = []
    list_coord = []
    # Pour chaque hexagone en entrée
    for index, row in gdf.iterrows():
        id_children = h3.h3_to_children(index,resolution) # Index des enfants
        list_index += list(id_children)
    list_geom = [h3.h3_to_geo_boundary(index) for index in list_index] # Géométrie des enfants

    # Inversion des coordonnées
    for elem in list_geom:
        coord = []
        for point in elem:
            point = tuple(reversed(point))
            coord.append(point)
        list_coord.append(tuple(coord))

    list_geom = [shapely.Polygon(elem) for elem in list_coord] # Conversion des coordonnées en type Polygon
    output = gpd.GeoDataFrame({'geometry': list_geom}, geometry='geometry', crs='EPSG:4326', index=list_index)
    output.index.name = "hex_id"
    return output.to_crs(3163)

In [29]:
"""
Fonction adaptée de tobler.area_interpolate.

"""
def area_tables(source_df, target_df):

    # Il est en général plus performant d'utiliser le plus long DataFrame comme index spatial
    if source_df.shape[0] > target_df.shape[0]:
        spatial_index = "source"
    else:
        spatial_index = "target"

    # Index de liaison entre la source et la target par intersection
    if spatial_index == "source":
        ids_tgt, ids_src = source_df.sindex.query_bulk(target_df.geometry, predicate="intersects")
    elif spatial_index == "target":
        ids_src, ids_tgt = target_df.sindex.query_bulk(source_df.geometry, predicate="intersects")

    # Liste des aires d'intersection
    areas = source_df.geometry.values[ids_src].intersection(target_df.geometry.values[ids_tgt]).area

    # Co-matrice des aires
    table = coo_matrix((areas,(ids_src, ids_tgt)), shape=(source_df.shape[0], target_df.shape[0]), dtype=np.float32)
    table = table.tocsr()

    return table

In [30]:
"""
Fonction adaptée de tobler.area_interpolate.

"""

def h3_area_interpolate(source_df, target_df, categorical_variables):

    table = area_tables(source_df, target_df)
    for variable in categorical_variables: # Pour chaque champ de classe
        unique = source_df[variable].unique() # Liste des classes d'un champ
        for value in unique: # Pour chaque classe
            mask = source_df[variable] == value # Dataframe composé uniquement de la même classe
            target_df[value] = pd.DataFrame(np.asarray(table[mask].sum(axis=0))[0]).div(target_df.area.values, axis="rows")

    target_df.set_index("hex_id", inplace=True, drop=True)
    return target_df

### Dask

In [None]:
"""
Fonction indexant un GeoDataFrame sur une grille dynamique.
Elle utilise les procédés de parallélisation et de clustering de la librairie Dask 
afin d'accélérer les temps de calculs.

param gdf: GeoDataFrame en entrée
param tx_spatial: taux minimal de remplissage d'une zone de données pour chaque échelle de résolution
    (sous forme de dictionnaire)
param res_min: résolution minimale des hexagones
param res_max: résolution maximale des hexagones
return: DataFrame

"""
def compact_dask_fct(gdf, npartitions, tx_spatial, res_min, res_max, fast=False):

    # Structure du DataFrame renvoyé en sortie
    # On ne conserve pas la colonne géométrie
    df_meta = pd.DataFrame(columns=gdf.drop(columns=[gdf.geometry.name]).columns.tolist())
    df_meta.index.names = ['hex_id']
    
    data = ddg.from_geopandas(gdf,npartitions)
    gdf_map = data.map_partitions(func=compact, tx_spatial=tx_spatial, res_min=res_min, res_max=res_max, fast=fast, meta=df_meta)
    client.persist(gdf_map)
    return gdf_map.compute()

In [None]:
def compact_dask_partition_fct(gdf, decoupage, npartitions, colonne, tx, res_min, res_max, tx_spatial=0.5):   
    
    gdf_invalid = decoupage
    # Fonction utilisée lors de la parallélisation
    def my_fct(geom_clip, gdf, colonne, tx, res_min, res_max, tx_spatial):
        clip = to_children(geom_clip, res_min)
        gdf_valid_bdd = compact_for_dask_use(clip=clip, gdf=gpd.clip(gdf, clip, keep_geom_type=True), colonne=colonne, tx=tx, res_min=res_min, res_max=res_max, tx_spatial=tx_spatial)
        return gdf_valid_bdd

    # Création d'un GeoDataFrame vide
    output = gpd.GeoDataFrame(columns=[colonne,'geometry'], geometry='geometry')
    output.index.names = ['hex_id']
    
    # Structure du DataFrame renvoyé en sortie
    df_meta = pd.DataFrame(columns=[colonne,'geometry','type'])
    df_meta.index.names = ['hex_id']

    while(res_min <= res_max):
        print('   Résolution ' +  str(res_min))

        # return my_fct(gdf_invalid, gdf=gdf, colonne=colonne, tx=tx, res_min=res_min, res_max=res_max)
        data = ddg.from_geopandas(gdf_invalid, npartitions)
        gdf_map = data.map_partitions(func=my_fct, gdf=gdf, colonne=colonne, tx=tx, res_min=res_min, res_max=res_max, tx_spatial=tx_spatial, meta=df_meta)
        client.persist(gdf_map)
        gdf_bdd = gdf_map.compute()
        
        gdf_valid = gdf_bdd[gdf_bdd['type']=='valid'].drop(columns=['type'])
        gdf_invalid = gdf_bdd[gdf_bdd['type']=='invalid'].drop(columns=['type',colonne])

        # Concaténation avec les cellules valides de la résolution précédente
        output = pd.concat([output, gdf_valid], ignore_index=False) 
        # Incrémentation de la résolution
        res_min+=1
    output.drop('geometry', axis=1, inplace=True) # Suppression de la colonne des géométries
    return output

In [45]:
"""
Fonction utilisée dans compact_partition_dask.

"""

def compact_for_dask_use(clip, gdf, colonne, tx, res_min, res_max, tx_spatial=0.5):
        
    valid_cells = [] # Cellules n'ayant pas besoin d'être divisées
    valid_label = [] # Classe associée à chaque cellule
    list_geom_valid = [] # Géométrie associée à chaque cellule
    invalid_cells=[]
    list_geom_invalid = []

    # Association des taux d'occupation de chaque classe pour chaque hexagone
    gdf_interp = h3_area_interpolate(source_df=gdf, target_df=clip.reset_index(), categorical_variables=[colonne])

    # Pour chaque enregistrement
    if len(gdf_interp.columns[1:]):
        for index, row in gdf_interp.iterrows():
            maximum = max(row[gdf_interp.columns[1:]])
            somme = row[gdf_interp.columns][1:].sum()
            end=False

            if somme < 1-tx[res_min] and (res_min < res_max):
                end = True
            
            else:
                for i in range(1,gdf_interp.columns.size): # Pour chaque colonne (sauf la géométrie)
                    valeur_i = row[gdf_interp.columns[i]]

                    if res_min < res_max: # Si la résolution maximale n'est pas atteinte
                        # Si le pourcentage de présence de la classe dans la cellule est supérieure à tx_spatial et est supérieure à tx_theme de l'ensemble des classes
                        if valeur_i >= tx[res_min]:
                            valid_cells.append(index)
                            valid_label.append(gdf_interp.columns[i])
                            list_geom_valid.append(row['geometry'])
                            end = True
                            break
                    else: # Si la résolution maximale est atteinte
                        # La valeur de cellule est déterminée par la classe majoritaire même si tx_theme n'est pas atteint
                        if(valeur_i == maximum) and (somme >= tx_spatial):
                            valid_cells.append(index)
                            valid_label.append(gdf_interp.columns[i])
                            list_geom_valid.append(row['geometry'])
                            end = True
                            break
            if (not end) and (res_min < res_max):
                invalid_cells.append(index)
                list_geom_invalid.append(row['geometry'])

        # Création d'un GeoDataFrame contenant les cellules valides avec leur classe associée
        gdf_valid_bdd = gpd.GeoDataFrame({colonne: valid_label, 'geometry': list_geom_valid, 'type':'valid'}, geometry='geometry', crs="EPSG:3163", index=valid_cells)
        gdf_valid_bdd.index.name = "hex_id"
        gdf_invalid_bdd = gpd.GeoDataFrame({colonne: None, 'geometry': list_geom_invalid, 'type':'invalid'}, geometry='geometry', crs="EPSG:3163", index=invalid_cells)
        gdf_invalid_bdd.index.name = "hex_id"
        return pd.concat([gdf_valid_bdd, gdf_invalid_bdd], ignore_index=False) 
    else:
        return gpd.GeoDataFrame({colonne: None, 'geometry': clip['geometry'], 'type':'invalid'}, geometry='geometry', crs="EPSG:3163", index=clip.index)

# Données

## Récupération des données sources

In [32]:
# Connexion à la base de données "oeil_traitement"
engine = getEngine()

In [26]:
# Connexion à la base de données du RDS
engineRDS = getEngine(user='postgres',pswd='XwUxFfrL6yRK5Wz',host='oeil-pg-aws.cluster-ck8dgtf46vxd.ap-southeast-2.rds.amazonaws.com',dbase='oeil')

In [33]:
catalog = f"{data_catalog_dir}bilbo_data.yaml" # Choix du catalogue de données

In [35]:
# Coordonnées d'une bbox sur l'île des Pins
xmin = 536159
xmax = 571819
ymin = 156515
ymax = 190058

## Tables de dimensions

### communes

In [48]:
%%time
# Récupétation de la table des communes (sur l'emprise souhaitée)
data_communes = loadData(catalog,'communes') # .cx[xmin:xmax, ymin:ymax]

Wall time: 2.99 s


In [69]:
data_communes # Visualisation de la table

Unnamed: 0,objectid,nom,nom_minus,code_com,code_post,nom_fichier,shape_length,shape_area,shape
0,1,MONT DORE,Mont Dore,98817,98809,MONT_DORE,400107.916505,636181900.0,MULTIPOLYGON Z (((450513.398 219933.702 -10000...
1,2,LA FOA,La Foa,98813,98880,LA_FOA,298739.0472,460955300.0,MULTIPOLYGON Z (((371445.862 277499.216 -10000...
2,3,POUM,Poum,98826,98826,POUM,676002.442656,470295900.0,"MULTIPOLYGON Z (((191490.890 456172.812 1.000,..."
3,4,OUEGOA,Ouégoa,98819,98821,OUEGOA,365782.158975,649195700.0,MULTIPOLYGON Z (((235001.134 434615.994 -10000...
4,5,MOINDOU,Moindou,98816,98819,MOINDOU,207876.89136,321459700.0,MULTIPOLYGON Z (((371445.862 277499.216 -10000...
5,6,SARRAMEA,Sarraméa,98828,98882,SARRAMEA,58079.79504,105401900.0,MULTIPOLYGON Z (((376716.288 288761.483 9999.0...
6,7,POUEBO,Pouébo,98824,98824,POUEBO,178372.386245,195788700.0,MULTIPOLYGON Z (((272317.912 407594.073 -10000...
7,8,YATE,Yaté,98832,98834,YATE,351071.233169,1332883000.0,MULTIPOLYGON Z (((456960.568 261512.590 -10000...
8,9,PAITA,Païta,98821,98889,PAITA,398184.267799,692047300.0,MULTIPOLYGON Z (((439992.642 225404.891 -10000...
9,10,VOH,Voh,98831,98833,VOH,387363.290562,797185100.0,MULTIPOLYGON Z (((266338.429 348818.522 -10000...


### provinces

In [39]:
%%time
# Récupétation de la table des provinces (sur l'emprise souhaitée)
data_provinces = loadData(catalog,'provinces') # .cx[xmin:xmax, ymin:ymax]

Wall time: 2.77 s


In [72]:
data_provinces # Visualisation de la table

Unnamed: 0,objectid,nom,nom_fichier,shape_length,shape_area,shape
0,1,PROVINCE NORD,PROVINCE_NORD,2915291.0,9418838000.0,MULTIPOLYGON Z (((415373.355 295332.719 -10000...
1,2,PROVINCE SUD,PROVINCE_SUD,2572541.0,6983027000.0,MULTIPOLYGON Z (((415373.355 295332.719 -10000...
2,3,PROVINCE DES ILES,PROVINCE_DES_ILES,844664.7,1948349000.0,"MULTIPOLYGON Z (((518527.275 346132.548 1.000,..."


### dim_dates

In [9]:
%%time
# Récupétation de la table de standardisation des dates
# Le set_index sur le champ de jointure est nécessaire à l'application de la fonction "standardizeField"
data_date = pd.read_sql("SELECT * FROM pression_eau.dim_date",engine).set_index('date')

Wall time: 167 ms


In [None]:
data_date # Visualisation de la table

NameError: name 'data_date' is not defined

## Tables de faits

### incendies_Sentinel

In [15]:
%%time
# Récupétation de la table des incendies sur l'emprise souhaitée
data_feux_raw = loadData(catalog,'incendies_Sentinel') #.cx[xmin:xmax, ymin:ymax]

Wall time: 3.03 s


In [None]:
data_feux_raw # Visualisation de la table

Unnamed: 0,objectid,province,commune,surface_ha,idfusion,classification,begdate,enddate,fs_x,fs_y,...,pentemediane,penteminimale,pentemaximale,tailleincendie,derniere_detection,debutviirs,finviirs,shape_starea__,shape_stlength__,geom
117,15865,PROVINCE SUD,Ile des Pins,1.10923,S19698,Valide,2020-11-26,2020-11-26,557910,172898,...,,,,0ha - 10ha ...,2020-11-26,NaT,NaT,11092.3,879.694444,"POLYGON ((557883.772 172965.653, 557933.753 17..."
122,11385,Province Sud,ILE DES PINS,14.311355,941,Valide,2017-01-06,2017-01-06,547895,178828,...,,,,,2017-01-06,NaT,NaT,143113.6,5638.296097,"POLYGON ((547722.914 179070.783, 547723.057 17..."
923,13509,Province Sud,ILE DES PINS,3.517956,1126,Valide,2017-11-17,2017-11-17,548112,174638,...,,,,,2017-11-17,NaT,NaT,35179.56,1519.558478,"POLYGON ((548153.733 174755.040, 548153.877 17..."
1038,14190,PROVINCE SUD,Ile des Pins,1.019392,S11647,Valide,2020-08-13,2020-08-13,548490,176350,...,,,,0ha - 10ha ...,2020-08-13,NaT,NaT,10193.92,999.701704,"POLYGON ((548411.811 176406.448, 548441.802 17..."
1257,14512,PROVINCE SUD,Ile des Pins,1.35905,SGroup_90j_82,Valide,2020-01-26,2020-01-26,559470,170398,...,,,,0ha - 10ha ...,2020-01-26,NaT,NaT,13590.5,619.78356,"POLYGON ((559521.892 170388.482, 559521.893 17..."
1381,16237,PROVINCE SUD,Ile des Pins,3.258223,S23691,Valide,2020-12-21,2020-12-21,550116,165331,...,,,,0ha - 10ha ...,2020-12-21,NaT,NaT,32582.23,1679.541881,"POLYGON ((550120.275 165461.609, 550140.269 16..."
1520,10967,Province Sud,ILE DES PINS,14.191474,942,Valide,2017-01-06,2017-01-16,548398,177246,...,,,,,2017-01-16,NaT,NaT,141914.7,6598.018461,"MULTIPOLYGON (((548175.509 177294.512, 548275...."
1602,13892,PROVINCE SUD,Ile des Pins,1.259256,S11666,Valide,2020-08-18,2020-08-18,548458,175581,...,,,,0ha - 10ha ...,2020-08-18,NaT,NaT,12592.56,619.816959,"POLYGON ((548426.971 175686.752, 548456.961 17..."
1751,12265,,,3.947643,1032,Valide,2019-01-21,2019-01-21,546611,180302,...,2.0,0.0,9.013878,0ha - 10ha ...,2019-01-21,NaT,NaT,39476.43,1779.468467,"POLYGON ((546593.564 180422.339, 546593.635 18..."
1789,11211,Province Sud,ILE DES PINS,34.69821,1125,Valide,2017-12-27,2017-12-31,549509,179439,...,,,,,2018-01-06,NaT,NaT,346982.1,6597.926979,"MULTIPOLYGON (((549596.280 179923.944, 549616...."


In [16]:
# Standardisation des champs de dates
data_feux = standardizeField(data_feux_raw, data_date, 'date_id', {'begdate':5, 'enddate':6, 'derniere_detection':21}) #.cx[xmin:xmax, ymin:ymax]

In [None]:
data_feux # Visualisation de la table

Unnamed: 0,objectid,province,commune,surface_ha,idfusion,begdate,enddate,classification,fs_x,fs_y,...,pentemediane,penteminimale,pentemaximale,derniere_detection,tailleincendie,debutviirs,finviirs,shape_starea__,shape_stlength__,geom
117,15865,PROVINCE SUD,Ile des Pins,1.10923,S19698,20201126,20201126,Valide,557910,172898,...,,,,20201126,0ha - 10ha ...,NaT,NaT,11092.3,879.694444,"POLYGON ((557883.772 172965.653, 557933.753 17..."
122,11385,Province Sud,ILE DES PINS,14.311355,941,20170106,20170106,Valide,547895,178828,...,,,,20170106,,NaT,NaT,143113.6,5638.296097,"POLYGON ((547722.914 179070.783, 547723.057 17..."
923,13509,Province Sud,ILE DES PINS,3.517956,1126,20171117,20171117,Valide,548112,174638,...,,,,20171117,,NaT,NaT,35179.56,1519.558478,"POLYGON ((548153.733 174755.040, 548153.877 17..."
1038,14190,PROVINCE SUD,Ile des Pins,1.019392,S11647,20200813,20200813,Valide,548490,176350,...,,,,20200813,0ha - 10ha ...,NaT,NaT,10193.92,999.701704,"POLYGON ((548411.811 176406.448, 548441.802 17..."
1257,14512,PROVINCE SUD,Ile des Pins,1.35905,SGroup_90j_82,20200126,20200126,Valide,559470,170398,...,,,,20200126,0ha - 10ha ...,NaT,NaT,13590.5,619.78356,"POLYGON ((559521.892 170388.482, 559521.893 17..."
1381,16237,PROVINCE SUD,Ile des Pins,3.258223,S23691,20201221,20201221,Valide,550116,165331,...,,,,20201221,0ha - 10ha ...,NaT,NaT,32582.23,1679.541881,"POLYGON ((550120.275 165461.609, 550140.269 16..."
1520,10967,Province Sud,ILE DES PINS,14.191474,942,20170106,20170116,Valide,548398,177246,...,,,,20170116,,NaT,NaT,141914.7,6598.018461,"MULTIPOLYGON (((548175.509 177294.512, 548275...."
1602,13892,PROVINCE SUD,Ile des Pins,1.259256,S11666,20200818,20200818,Valide,548458,175581,...,,,,20200818,0ha - 10ha ...,NaT,NaT,12592.56,619.816959,"POLYGON ((548426.971 175686.752, 548456.961 17..."
1751,12265,,,3.947643,1032,20190121,20190121,Valide,546611,180302,...,2.0,0.0,9.013878,20190121,0ha - 10ha ...,NaT,NaT,39476.43,1779.468467,"POLYGON ((546593.564 180422.339, 546593.635 18..."
1789,11211,Province Sud,ILE DES PINS,34.69821,1125,20171227,20171231,Valide,549509,179439,...,,,,20180106,,NaT,NaT,346982.1,6597.926979,"MULTIPOLYGON (((549596.280 179923.944, 549616...."


### mos_2014

In [30]:
%%time
# Récupétation de la table de MOS de 2014 sur l'emprise souhaitée
data_mos2014 = loadData(catalog,'mos2014') #.cx[xmin:xmax, ymin:ymax]

Wall time: 2min 44s


In [None]:
data_mos2014 # Visualisation de la table

Unnamed: 0,objectid,idobj,c_2014_n1,c_2014_n2,c_2014_n3,l_2014_n1,l_2014_n2,l_2014_n3,source_14,d_srce_14,...,observ,surface,ombre,d_arbore,d_arbustif,d_herbace,d_autre,shape_length,shape_area,shape
63989,65684,65567.0,1,11,112,Territoires artificialisés,Zones urbanisées,Tissu urbain discontinu,SPOT6,29-06-2013,...,,107476.627135,0,0.0,0.0,0.0,0.0,2427.771791,107476.627135,"MULTIPOLYGON (((550250.573 180865.721, 550283...."
64338,66338,66342.0,1,11,113,Territoires artificialisés,Zones urbanisées,Habitat isolé,SPOT6,29-06-2013,...,,16058.550956,0,0.0,0.0,0.0,0.0,636.977211,16058.550956,"MULTIPOLYGON (((549233.485 170283.621, 549237...."
64353,66348,66352.0,1,11,113,Territoires artificialisés,Zones urbanisées,Habitat isolé,SPOT6,29-06-2013,...,,13666.539589,0,0.0,0.0,0.0,0.0,497.477330,13666.539589,"MULTIPOLYGON (((553491.915 173790.477, 553436...."
64579,69550,69549.0,3,31,311,Formation végétale,Formation arborée,Strate arborée,google earth - MOS2010,2013-2014,...,,17053.682250,1,0.0,0.0,0.0,0.0,608.379883,17053.682250,"MULTIPOLYGON (((550257.722 171406.377, 550244...."
65817,65648,65533.0,1,11,112,Territoires artificialisés,Zones urbanisées,Tissu urbain discontinu,SPOT6,29-06-2013,...,,24086.665232,0,0.0,0.0,0.0,0.0,647.001537,24086.665232,"MULTIPOLYGON (((552834.537 169426.364, 552832...."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
110401,110413,110418.0,5,51,510,Surfaces en eau,Eaux continentales,Eaux continentales,SPOT6,29-06-2013,...,,31638.104523,0,0.0,0.0,0.0,0.0,796.320340,31638.104523,"MULTIPOLYGON (((551107.798 175053.187, 551050...."
110403,110417,110422.0,5,51,510,Surfaces en eau,Eaux continentales,Eaux continentales,SPOT6,29-06-2013,...,,319423.218084,0,0.0,0.0,0.0,0.0,5151.010276,319423.218084,"MULTIPOLYGON (((552139.619 176476.590, 552169...."
110404,110418,110423.0,5,51,510,Surfaces en eau,Eaux continentales,Eaux continentales,SPOT6,29-06-2013,...,,33090.483115,0,0.0,0.0,0.0,0.0,843.625612,33090.483115,"MULTIPOLYGON (((549376.792 177662.598, 549377...."
110406,110419,110424.0,5,51,510,Surfaces en eau,Eaux continentales,Eaux continentales,SPOT6,29-06-2013,...,,83562.732784,0,0.0,0.0,0.0,0.0,2430.093709,83562.732783,"MULTIPOLYGON (((550575.416 177939.257, 550600...."


# Indexation des données

## Maillage simple

### communes

In [49]:
%%time
segmentation_communes = segmentation(data_communes,8)

Wall time: 1min 55s


In [70]:
%%time
compact_dask_partition(tab_name="dim_communes_8_v2", schema="bilbo", gdf=data_communes, decoupage=segmentation_communes, nb_cluster=10, npartitions=16, colonne='nom', tx=compute_dict_tx(7), res_min=8, res_max=8, i_start=1, tx_spatial=0)

Cluster 1
   Résolution 8
   --- 2596 objets ---
   --- 4.49 secs ---
Cluster 2
   Résolution 8
   --- 2596 objets ---
   --- 4.42 secs ---
Cluster 3
   Résolution 8
   --- 2596 objets ---
   --- 4.51 secs ---
Cluster 4
   Résolution 8
   --- 2596 objets ---
   --- 4.59 secs ---
Cluster 5
   Résolution 8
   --- 2597 objets ---
   --- 4.61 secs ---
Cluster 6
   Résolution 8
   --- 2596 objets ---
   --- 4.42 secs ---
Cluster 7
   Résolution 8
   --- 2596 objets ---
   --- 4.04 secs ---
Cluster 8
   Résolution 8
   --- 2596 objets ---
   --- 4.24 secs ---
Cluster 9
   Résolution 8
   --- 2596 objets ---
   --- 4.29 secs ---
Cluster 10
   Résolution 8
   --- 2597 objets ---
   --- 2.21 secs ---
Wall time: 41.9 s


### provinces

In [40]:
%%time
segmentation_provinces = segmentation(data_provinces,8)

Wall time: 1min 51s


In [68]:
%%time
compact_dask_partition(tab_name="dim_provinces_8_v2", schema="bilbo", gdf=data_provinces, decoupage=segmentation_provinces, nb_cluster=10, npartitions=16, colonne='nom', tx=compute_dict_tx(7), res_min=8, res_max=8, i_start=1, tx_spatial=0)

Cluster 1
   Résolution 8
   --- 2596 objets ---
   --- 3.83 secs ---
Cluster 2
   Résolution 8
   --- 2596 objets ---
   --- 3.83 secs ---
Cluster 3
   Résolution 8
   --- 2596 objets ---
   --- 3.71 secs ---
Cluster 4
   Résolution 8
   --- 2596 objets ---
   --- 4.12 secs ---
Cluster 5
   Résolution 8
   --- 2597 objets ---
   --- 4.04 secs ---
Cluster 6
   Résolution 8
   --- 2596 objets ---
   --- 3.8 secs ---
Cluster 7
   Résolution 8
   --- 2596 objets ---
   --- 3.67 secs ---
Cluster 8
   Résolution 8
   --- 2596 objets ---
   --- 3.79 secs ---
Cluster 9
   Résolution 8
   --- 2596 objets ---
   --- 3.77 secs ---
Cluster 10
   Résolution 8
   --- 2597 objets ---
   --- 2.09 secs ---
Wall time: 36.7 s


## Maillage adaptatif

### incendies_Sentinel

In [None]:
%%time
compact_dask(tab_name="test", schema="bilbo", gdf=data_feux, tx_spatial=compute_dict_tx(7), res_min=6, res_max=13, nb_cluster=2, i_start=1, fast=False)

NameError: name 'data_feux' is not defined

### mos_2014

In [None]:
segmentation_mos2014 = segmentation(data_mos2014,6)

In [None]:
%%time
compact_dask_partition(tab_name="test_grande_echelle_mos_iiiiiiiiiii", schema="bilbo", gdf=data_mos2014, decoupage=segmentation_mos2014, nb_cluster=200, npartitions=2, colonne='l_2014_n3', tx=compute_dict_tx(7), res_min=6, res_max=13, i_start=194)

NameError: name 'data_mos2014_nc' is not defined