In [None]:
import geopandas as gpd
import dask_geopandas as dask_gpd
from dask.distributed import Client, LocalCluster
import psycopg2
from tqdm import tqdm
# Démarrer un cluster local avec 4 cœurs
cluster = LocalCluster(n_workers=12)

client = Client(cluster)
client

In [None]:

def to_run_in_workers(batch_size, batch_count):
    import psycopg2

    # Connexion à la base de données source (oeil_traitement)
    conn1 = psycopg2.connect(host="172.20.12.13", dbname="oeil_traitement", user="hroussaffa", password="mcot")
    cursor1 = conn1.cursor()

    # Connexion à la base de données destination (data_externe)
    conn2 = psycopg2.connect(host="172.20.12.13", dbname="data_externe", user="hroussaffa", password="mcot")
    cursor2 = conn2.cursor()

    try:
        # Désactiver autocommit
        conn1.autocommit = False
        
        # Exécuter la requête avec les données spécifiques à chaque worker
        cursor1.execute("""
            SELECT id_thematique, id_split, geometry, proprio_ty
            FROM "occupation_sol"."faits_foncier_mos_formation_arboree_dafe_2014"
            WHERE proprio_ty = 'COLLECTIVITE'
            ORDER BY id_thematique DESC
            LIMIT %s OFFSET %s
        """, (batch_size, batch_count * batch_size))
        rows1 = cursor1.fetchall()
        progress_bar = tqdm(total=len(rows1), desc="Progression")

        # Parcourir les résultats de la requête
        for row1 in rows1:
            id_thematique, id_split, geom, proprio_ty = row1
            cursor2.execute("""
                SELECT  proprio_na
                FROM bdfoncier.type_foncier
                WHERE ST_Intersects(ST_PointOnSurface(%s), geometry)
            """, (geom,))
            rows2 = cursor2.fetchall()

            proprio_na_values = [row2[0] for row2 in rows2]
            aggregated_proprio_na = ', '.join(proprio_na_values) if proprio_na_values else None

            cursor1.execute("""
                UPDATE occupation_sol.faits_foncier_mos_formation_arboree_dafe_2014
                SET proprio_ty = %s
                WHERE geometry = %s
            """, (aggregated_proprio_na, geom,))
            #print("End execute update for : ", row1[0])

            progress_bar.update(1)

        # Valider la transaction
        conn1.commit()
        progress_bar.close()

    except Exception as e:
        # En cas d'erreur, annuler la transaction
        conn1.rollback()
        print("Erreur lors de la mise à jour :", e)
    finally:
        # Rétablir le mode de commit automatique
        conn1.autocommit = True
        # Fermer les connexions
        conn1.close()
        conn2.close()



In [None]:
import dask.distributed

# Liste des identifiants des workers
worker_ids = list(client.scheduler_info()['workers'])
print("worker_ids: ",worker_ids)
# Nombre de workers dans le cluster
num_workers = len(worker_ids)
batch_count=0

while True:

    if batch_count > 1000:
        break


    batch_size = 200

    # Créer une barre de progression globale
    progress_bar = tqdm(total=batch_size * num_workers, desc="Progression Generale")

    # Liste pour stocker les futures des tâches
    futures = []

    # Envoyer des tâches à chaque worker
    for worker_id in worker_ids:
        future = client.submit(to_run_in_workers, batch_size, worker_ids.index(worker_id), workers=[worker_id])
        future.add_done_callback(lambda f: progress_bar.update(batch_size))
        futures.append(future)

    # Attendre la fin de toutes les tâches
    dask.distributed.wait(futures)

    # Fermer la barre de progression
    progress_bar.close()

    # Fermer le client Dask
    #client.close()

    batch_count += 1
