In [1]:
from intake import open_catalog
import intake

import pandas as pd
import numpy as np
import geopandas as gpd

from pystac_client import Client as psc
import stackstac
import rasterio.features

from pyproj import CRS
import math
from shapely.geometry import mapping
from dotenv import load_dotenv
load_dotenv()
import os
from datetime import datetime, timedelta

from dask.distributed import Client, LocalCluster, Variable
import dask
import dask_geopandas
import logging
import xarray as xr

#### dask worker tcp://172.20.12.11:8786 --nthreads=12

In [2]:
def getDaskClient(local=False):

    def createClient(schedulerIp="172.20.12.11:8786"):
        client = Client(schedulerIp)
        return client

    if local :
        # Démarrer un cluster local avec 4 cœurs
        cluster = LocalCluster(n_workers=2,threads_per_worker=8,silence_logs='DEBUG',memory_limit='20GB', timeout="60s",heartbeat_interval="10s")

        client = Client(cluster)
        return client


    if 'client' in globals():   
        if client.scheduler != None:
        # La variable client existe dans l'espace de noms global
            return client
        else:
            schedulerIp = os.getenv("SCHEDULER_IP")
            if schedulerIp:
                client = createClient(schedulerIp)
            else:
                client = createClient(schedulerIp)
                
    else:
        schedulerIp = os.getenv("SCHEDULER_IP")
        if schedulerIp:
            client = createClient(schedulerIp)
        else:
            client = createClient()

    return client

client = getDaskClient(local=False)
client

0,1
Connection method: Direct,
Dashboard: http://172.20.12.11:8787/status,

0,1
Comm: tcp://172.28.0.9:8786,Workers: 1
Dashboard: http://172.28.0.9:8787/status,Total threads: 12
Started: 3 weeks ago,Total memory: 23.84 GiB

0,1
Comm: tcp://172.20.10.113:54786,Total threads: 12
Dashboard: http://172.20.10.113:54787/status,Memory: 23.84 GiB
Nanny: tcp://172.20.10.113:54782,
Local directory: C:\Users\ORIANE~1.BRU\AppData\Local\Temp\dask-scratch-space\worker-hif7kpr0,Local directory: C:\Users\ORIANE~1.BRU\AppData\Local\Temp\dask-scratch-space\worker-hif7kpr0
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 131.78 MiB,Spilled bytes: 0 B
Read bytes: 28.63 kiB,Write bytes: 50.67 kiB


In [3]:
table_source = "sentinel_surfaces_detectees"

date_start=datetime(2023,12,1)
date_end=datetime(2023,12,31)

URL = "https://earth-search.aws.element84.com/v1"
URL_2 = "https://catalogue.dataspace.copernicus.eu/stac"
URL_3 = "https://services.sentinel-hub.com/api/v1/catalog/1.0.0/"
URL_4 = "https://earthengine.openeo.org/v1.0/"

collection = "SENTINEL-2"
collect_amazon ='sentinel-2-l2a'
collect_EE = 'COPERNICUS/S2_SR_HARMONIZED'

interval_before=150
interval_after=40

offset = 0
limit = 12

output_path = "A:/INDICATEUR_FEUX/2023/"

In [4]:
catalog_path = f'{os.getenv("PROJECT_PATH")}/data_reference_feux.yaml'

sql = f"""SELECT row_number() OVER () AS id,
si.date_,
si.surface_id_h3,
si.geometry
FROM feux_cq.{table_source} si
WHERE si.date_ >= '{pd.to_datetime(date_start).strftime('%Y-%m-%d')}' AND si.date_ <= '{pd.to_datetime(date_end).strftime('%Y-%m-%d')}'
"""

catalog = open_catalog(catalog_path)
dataCatalog = getattr(catalog, table_source)(sql_expr=sql)
gdf = dataCatalog.read()

if gdf.duplicated(subset=['surface_id_h3']).sum()>=1:
    gdf = gdf.drop_duplicates(subset=['surface_id_h3'])

gdf = gdf.to_crs(epsg=4326)

gdf['date_']= pd.to_datetime(gdf['date_'], format='%Y-%m-%d').dt.date
#gdf = gdf.explode()
gdf=gdf.reset_index(drop=True)

In [5]:
def create_indicateur_spectraux(data_indices):
    """
    Correction des valeurs de bandes, on avait des valeurs négatives de reflectance par endroit
    """
    """
    data_indices["red+"]=data_indices["red"].where(lambda x : x>0, lambda x : -x)
    data_indices["nir+"]=data_indices["nir"].where(lambda x : x>0, lambda x : -x)
    data_indices["swir22+"]=data_indices["swir22"].where(lambda x : x>0, lambda x : -x)
    data_indices["swir16+"]=data_indices["swir16"].where(lambda x : x>0, lambda x : -x)
    data_indices["green+"]=data_indices["green"].where(lambda x : x>0, lambda x : -x)
    data_indices["blue+"]=data_indices["blue"].where(lambda x : x>0, lambda x : -x)
    data_indices["nir08+"]=data_indices["nir08"].where(lambda x : x>0, lambda x : -x)
    data_indices["rededge1+"]=data_indices["rededge1"].where(lambda x : x>0, lambda x : -x)
    data_indices["rededge2+"]=data_indices["rededge2"].where(lambda x : x>0, lambda x : -x)
    data_indices["rededge3+"]=data_indices["rededge3"].where(lambda x : x>0, lambda x : -x)
    """
    data_indices["red"] = data_indices["red"]+0.1  #B4
    data_indices["nir"] = data_indices["nir"]+0.1  #B8
    data_indices["swir22"] = data_indices["swir22"]+0.1  #B12
    data_indices["swir16"] = data_indices["swir16"]+0.1  #B11
    data_indices["green"] = data_indices["green"]+0.1 #B3
    data_indices["blue"] = data_indices["blue"]+0.1
    data_indices["nir08"] = data_indices["nir08"]+0.1  #B8A
    data_indices["rededge1"] = data_indices["rededge1"]+0.1  #B5
    data_indices["rededge2"] = data_indices["rededge2"]+0.1   #B6
    data_indices["rededge3"] = data_indices["rededge3"]+0.1   #B7

    data_indices['ndvi'] = ((data_indices['nir'].astype(float) - data_indices['red'].astype(float))/(data_indices['nir'].astype(float)+ data_indices['red'].astype(float)))

    data_indices['nbr'] = ((data_indices['nir'] - data_indices['swir22'])/(data_indices['nir'] + data_indices['swir22']))

    data_indices['nbr+'] = ((data_indices['swir22'] - data_indices['nir08'] - data_indices['green'] - data_indices['blue'])/(data_indices['swir22'] + data_indices['nir08'] + data_indices['green'] + data_indices['blue']))

    data_indices['bais2'] = (1-(np.sqrt((data_indices['rededge2'] * data_indices['rededge3'] * data_indices['nir08'])/data_indices['red']))*((data_indices['swir22'] - data_indices['nir08'] )/ np.sqrt((data_indices['swir22'] + data_indices['nir08'] ))+1))

    F1= ((data_indices['swir22'] + data_indices['swir16']) - (data_indices['nir'] + data_indices['nir08'])/np.sqrt((data_indices['swir22'] + data_indices['swir16']) + (data_indices['nir'] + data_indices['nir08'])))
    F2= (2-(np.sqrt(data_indices['rededge2'] * data_indices['rededge3'] * (data_indices['nir'] + data_indices['nir08'])/data_indices['red'] + data_indices['rededge1'])))

    data_indices['badi'] = F1 * F2

    data_indices['ndwi'] = ((data_indices['green'] - data_indices['nir'])/(data_indices['green'] + data_indices['nir']))

    return(data_indices)

In [6]:
def find_image_stac(bbox,dates):
    client = psc.open(URL)
    search = client.search(
        collections=[collect_amazon],
        bbox=bbox,
        datetime=dates
        )

    print(f"{search.matched()} scenes Sentinel-2 L2A trouvées dans l'intervalle temporel")
    items = search.item_collection()
    sentinel_stack = stackstac.stack(
                            items,
                            bounds_latlon=[bbox[0], bbox[1],  bbox[2],  bbox[3]],

                            gdal_env=stackstac.DEFAULT_GDAL_ENV.updated(
                                {'GDAL_HTTP_MAX_RETRY': 3,
                                    'GDAL_HTTP_RETRY_DELAY': 5,
                                }),
                            epsg=4326
                            ).rename({'x': 'lon', 'y': 'lat'})
    data_indices = sentinel_stack.sel(band=["blue","rededge1","rededge2", "rededge3", "green","red", "nir","nir08","swir16","swir22", "scl"]).to_dataset(dim='band')
    return(data_indices,sentinel_stack)

In [7]:
def create_mask_poly(geometry, data_indices):
    ba_test_filter = gpd.GeoDataFrame({'geometry': [geometry]}, crs='EPSG:4326', index=[0])

    shapes = ba_test_filter['geometry'].apply(mapping)
    
    # Création du masque avec rasterio
    ShapeMask = rasterio.features.geometry_mask(
        shapes,
        out_shape=(len(data_indices.lat), len(data_indices.lon)),
        transform=data_indices.transform,
        invert=True
    )
    
    # Convertir le masque en DataArray de xarray
    ShapeMask = xr.DataArray(ShapeMask, dims=("lat", "lon"))
    return ShapeMask

In [8]:
def process_partition(partition):
    row = partition.iloc[0]
    date_ = pd.to_datetime(row['date_'], format='%Y-%m-%d').date()
    
    bbox = row["geometry"].bounds
    datemin = (date_ - timedelta(days=interval_before)).strftime('%Y-%m-%d')
    datemax = (date_ + timedelta(days=interval_after)).strftime('%Y-%m-%d')
    dates = f'{datemin}/{datemax}'

    data_indices, sentinel_stack = find_image_stac(bbox, dates)
    images_to_keep = []
    for i, time in enumerate(pd.to_datetime(sentinel_stack['time']).date):
        if time == date_:
            images_to_keep.append(i)
        else:
            scl_data = sentinel_stack.isel(time=i).sel(band="scl").values  
            cloud_classes = [3, 8, 9, 10, 11]
            cloud_mask = np.isin(scl_data, cloud_classes)
            cloud_coverage = np.sum(cloud_mask) / scl_data.size
            
            if cloud_coverage == 0:
                images_to_keep.append(i)

    data_to_keep = sentinel_stack.isel(time=images_to_keep)
    data_indices = data_to_keep.sel(band=["blue", "rededge1", "rededge2", "rededge3", "green", "red", "nir", "nir08", "swir16", "swir22", "scl"]).to_dataset(dim='band')
    
    data_indices = create_indicateur_spectraux(data_indices)
    ShapeMask = create_mask_poly(row['geometry'], data_indices)
    data_indices['mask'] = ShapeMask

    dataset_save = data_indices.drop_vars([c for c in data_indices.coords if not (c in ['time', 'lat', 'lon'])])

    dataset_save = dataset_save.drop_vars([v for v in dataset_save.data_vars if not (v in ['ndvi','badi','ndwi', 'nbr', 'nbr+', 'bais2', 'mask'])])
    dataset_save.attrs['spec'] = str(dataset_save.attrs['spec'])

    return dataset_save, row['surface_id_h3'], row['date_']

nb_line = len(gdf)

if offset >= 0 and limit > 0:    
    while offset < nb_line:
        upper_bound = min(offset + limit, nb_line)
        dask_gdf = dask_geopandas.from_geopandas(gdf.iloc[offset:upper_bound], npartitions=12)
        result = dask_gdf.map_partitions(process_partition, meta=pd.DataFrame({"success": []}, dtype=bool))

        result_computed = result.compute()

        for ds, surface_id_h3, date_ in result_computed:
            filename = f'{surface_id_h3}_{date_}.nc'
            full_path = os.path.join(output_path, filename)
            
            os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
            
            ds.to_netcdf(full_path)
            os.chmod(full_path, 0o666) 

            print(f'Saved {filename}')

        offset += limit

Saved L2A_T58KDB_20231227_8f9f5e5419360c1_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e541923a50_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e56a81310e_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e545b0041a_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e5662aeac3_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e56844391d_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e56852844c_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e568476c43_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e568c888a0_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e56c6d97a3_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e5654a6475_2023-12-27.nc
Saved L2A_T58KDB_20231227_8f9f5e1b1a44a95_2023-12-27.nc
Saved L2A_T58KHB_20231231_8f9f561767a56ae_2023-12-31.nc
Saved L2A_T58KDC_20231202_8f9f5ad9d8eb065_2023-12-02.nc
Saved L2A_T58KHB_20231231_8f9f56176d49b16_2023-12-31.nc
Saved L2A_T58KGB_20231231_8f9f5291562a564_2023-12-31.nc
Saved L2A_T58KGB_20231231_8f9f529066d11a4_2023-12-31.nc
Saved L2A_T58KDC_20231202_8f9e2d264d659b5_2023-1