This notebook contains code on collecting green spaces (approximated as NDVI) from Sentinel-2 for the generated grids and then calculating the NDVI value for each 100m x 100m grid. The developed code relies on google earth engine for processing and google cloud for intermediate storage of data. One can look into STAC APIs for alternate way of processing.

In [None]:
import ee
import geopandas as gpd
import time
import rasterio
from rasterio.windows import from_bounds
import os
import numpy as np
import logging
from multiprocess import Pool
import glob
from functools import partial

### data collection

In [None]:
# function to mask clouds in Sentinel-2 images using the Scene Classification Layer (SCL)
def maskS2CloudsUsingSCL(image):
    scl = image.select('SCL')
    cloud_mask = scl.gt(6).Or(scl.lt(4))  # Clouds are typically classified as values <4 or >6
    return image.updateMask(cloud_mask.Not()).divide(10000)  # Apply mask and scale reflectance

# function to add an NDVI band to a Sentinel-2 image
def addNDVI(image):
    ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI')  # B8 (NIR) and B4 (Red)
    return image.addBands(ndvi)

# function to calculate the first quartile of an image collection
def firstQuartile(imageCollection):
    return imageCollection.reduce(ee.Reducer.percentile([25]))  # Compute the 25th percentile

# convert a Shapely Polygon to an Earth Engine Geometry Polygon
def shapely_to_ee_polygon(shapely_geom):
    exterior_coords = list(shapely_geom.exterior.coords)
    return ee.Geometry.Polygon(exterior_coords)

# function to manage Earth Engine task submission, ensuring task queue does not overflow
def wait_for_slots(max_tasks=2000, check_interval=60):
    while True:
        tasks = ee.data.getTaskList()
        running_or_ready_tasks = [task for task in tasks if task['state'] in ('RUNNING', 'READY')]
        if len(running_or_ready_tasks) < max_tasks:
            break
        else:
            print(f"Queue full with {len(running_or_ready_tasks)} tasks. Waiting...")
            time.sleep(check_interval)

In [None]:
# authenticate and initialize the Earth Engine session
ee.Authenticate()
ee.Initialize()

# load and reproject the 100km x 100km grid
eu_gdf = gpd.read_file('data/grid_100km_surf.gpkg')
eu_4326 = eu_gdf.to_crs('epsg:4326')

In [None]:
# process each row in the reprojected geodataframe
for i, row in enumerate(eu_4326.itertuples(), start=1):
    # grid 611 condition (in order to not process everything)
    if i+1 == 611:
        geometry = shapely_to_ee_polygon(row.geometry)
        dataset = (ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED')
                   .filterDate('2018-03-01', '2018-09-30')
                   .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20))
                   .filterBounds(geometry)
                   .map(maskS2CloudsUsingSCL)
                   .map(addNDVI))
    
        if dataset.size().getInfo() == 0:
            print(f"No images found for index {i}, skipping...")
            continue
    
        firstQuartileComposite = firstQuartile(dataset)
        medianFiltered = firstQuartileComposite.focal_median(1000, 'circle', 'meters')
        filledComposite = firstQuartileComposite.unmask(medianFiltered).select(['NDVI_p25'])
    
        exportParams = {
            'image': filledComposite,
            'description': f"NDVI_10m_{i}",
            'bucket': 'cog-bucket-test',    # your bucket name
            'fileNamePrefix': f"eu_ndvi/NDVI_10m_{i}",
            'scale': 10,
            'region': geometry,
            'crs': 'EPSG:3035',
            'fileFormat': 'GeoTIFF',
            'formatOptions': {'cloudOptimized': True},
            'maxPixels': 1e13
        }
        
        task = ee.batch.Export.image.toCloudStorage(**exportParams)
        task.start()

You can monitor the progress on your associated account at https://code.earthengine.google.com/. Once the images are completed processing and stored on your googlecloud, you can move to the next part of the code.

### calculate NDVI for 100mx100m grids

In [None]:
# set the environment variable for Google Cloud credentials
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'path/to/your/credentials.json'

# list to hold the paths of the parquet files
grids_list = []
for parquet_file in glob.glob('data/*.parquet'):
    grids_list.append(parquet_file)

# # configure logging (recommended if you monitor processing over a lot of files)
# log_path = 'logs/green_spaces.log'

# # ensure log directory exists
# log_dir = os.path.dirname(log_path)
# if not os.path.exists(log_dir):
#     os.makedirs(log_dir)
    
# logging.basicConfig(filename=log_path, level=logging.INFO,
#                     format='%(asctime)s:%(levelname)s:%(message)s', force=True)

In [None]:
def calculate_ndvi_row(row, src):
    """
    Calculate the NDVI mean for a single row (geometry) of a GeoDataFrame.

    Parameters:
    - row: A single row from GeoDataFrame.
    - src: Raster source object to read data from.

    Returns:
    - float: The mean NDVI value for the geometry in the row.
    """
    window = from_bounds(*row.geometry.bounds, transform=src.transform)
    data = src.read(window=window)
    img = data[0]
    return np.nanmean(img)

def process_grid(grid_path):
    """
    Process each grid to calculate NDVI and save the results back to a parquet file.

    Parameters:
    - grid_path: The file path to the grid parquet file.
    """
    try:
        grid_number = grid_path.split('_')[-1].split('.')[0]
        grid_gdf = gpd.read_parquet(grid_path)

        if 'ndvi' in grid_gdf.columns:
            logging.info(f'Skipping grid {grid_number} as NDVI already calculated')
            return

        cog_path = f'gs://cog-bucket-test/eu_ndvi/NDVI_10m_{grid_number}.tif'
        # logging.info(f'Started processing grid {grid_number} and {cog_path}')
        print(f'Started processing grid {grid_number} and {cog_path}')
        
        with rasterio.open(cog_path) as src:
            if grid_gdf.crs != src.crs:
                grid_gdf = grid_gdf.to_crs(src.crs)

            calculate_with_src = partial(calculate_ndvi_row, src=src)
            grid_gdf['ndvi'] = grid_gdf.apply(lambda row: calculate_with_src(row), axis=1)

        grid_gdf.to_parquet(grid_path)
        # logging.info(f'Successfully processed grid {grid_path}')
        print(f'Successfully processed grid {grid_path}')
    except Exception as e:
        # logging.error(f'Error processing grid {grid_path}: {e}')
        print(f'Error processing grid {grid_path}: {e}')

In [None]:
# sequential
for elem in grids_list:
    process_grid(elem)

# # configure the number of parallel processes
# num_processes = 10 

# # create a pool of workers to process grids in parallel
# with Pool(processes=num_processes) as pool:
#     pool.map(process_grid, grids_list)