In [14]:
import dask
from dask.diagnostics import ProgressBar
import xarray as xr
import fsspec
import dask.array as da
from sklearn.ensemble import RandomForestRegressor
import joblib
import ee
import numpy as np
import rasterio
from rasterio.enums import Resampling

# Initialize the Earth Engine API
ee.Initialize()

# load sentinel-1 data 
spring = ee.Filter.date('2022-03-01', '2022-04-20');
lateSpring = ee.Filter.date('2022-04-21', '2022-06-10');
summer = ee.Filter.date('2022-06-11', '2022-08-31');

# Define a function to filter and export GEE data to GeoTIFF
def export_gee_to_geotiff(dataset, band_names, region, start_date, end_date, output_path, cloud_mask_function=None):
    if dataset == 'COPERNICUS/S1_GRD':
        # Define a masking function
        def mask_edges(image):
            edge = image.lt(-30.0)  # Define an edge mask where values are less than -30
            masked_image = image.mask().And(edge.Not())  # Mask out edges
            return image.updateMask(masked_image)  # Apply the mask
            
        filtered = ee.ImageCollection('COPERNICUS/S1_GRD')\
                    .filterDate(start_date, end_date)\
                    .filterBounds(region)\
                    .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VV'))\
                    .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VH'))\
                    .filter(ee.Filter.eq('instrumentMode', 'IW'))\
                    .filter(ee.Filter.inList('orbitProperties_pass', ['ASCENDING', 'DESCENDING']))
        
        # Select the VV and VH bands
        filtered = filtered.select('VV', 'VH')

        # Apply the masking function to each image in the collection
        filtered = filtered.map(mask_edges)
        # filtered = ee.Image.cat(
        #         filtered.filter(spring).mean(),
        #         filtered.filter(lateSpring).mean(),
        #         filtered.filter(summer).mean());
    else:
        collection = ee.ImageCollection(dataset)
        filtered = (collection
                    .filterDate(start_date, end_date)
                    .filterBounds(region)
                    .select(band_names))
    
    # Apply cloud masking function if provided
    if cloud_mask_function:
        filtered = filtered.map(cloud_mask_function)

    def to_geotiff(image, idx):
        print(type(image))
        # Coarser resolution
        # Reproject the image
        reprojected = image.reproject(
            crs='EPSG:4326',  # Coordinate Reference System
            scale=100  # Coarser resolution (in meters per pixel)
        )
        
        # Export the reprojected image to Google Cloud Storage
        task = ee.batch.Export.image.toCloudStorage(
            image=reprojected,
            description='ReprojectedImageToStorage',
            bucket='test-agb-bucket',  # Replace with your Cloud Storage bucket name
            fileNamePrefix='Data_2023/sentinel1-data',  # Replace with desired folder and file name
            scale=100,
            region=region,
            maxPixels=1e13  # Increase the maximum pixel limit
        )
        
        # Start the export task
        task.start()
        print("Export task started. Check your Google Cloud Storage bucket for results.")


    # Download each image and save as GeoTIFF
    for i in range(filtered.size().getInfo()):
        image = ee.Image(filtered.toList(filtered.size()).get(i))
        # Call the download function to fetch and save the image as GeoTIFF
        to_geotiff(image, i)

# Cloud mask function for Sentinel-2 using the QA60 band
def sentinel2_cloud_mask(image):
    qa60 = image.select('QA60')
    cloud_mask = qa60.bitwiseAnd(1).eq(0)  # 0 indicates clear pixels, 1 indicates cloud
    return image.updateMask(cloud_mask)

# Cloud mask function for Sentinel-1 based on metadata (example: quality mask)
def sentinel1_cloud_mask(image):
    # Assuming a simple quality mask, you can customize it based on your needs
    # For example, mask out pixels with low coherence or other criteria
    quality_mask = image.select('VV').gt(0)  # Example: keep pixels with valid VV value
    return image.updateMask(quality_mask)

# Define Amazon Basin region and temporal filter
amazon_basin = ee.Geometry.Polygon([
    [[-75.0, -5.0], [-75.0, -10.0], [-65.0, -10.0], [-65.0, -5.0], [-75.0, -5.0]]
])
start_date = '2023-03-01'
end_date = '2023-08-31'

# Export Sentinel-1 with cloud mask
export_gee_to_geotiff(
    "COPERNICUS/S1_GRD",
    ["VV", "VH"],
    amazon_basin,
    start_date,
    end_date,
    "gs://test-agb-bucket/Data_2023/sentinel1-data"
)

# Export Sentinel-2 with cloud mask
export_gee_to_geotiff(
    "COPERNICUS/S2",
    ["B2", "B3", "B4", "B8", "QA60"],
    amazon_basin,
    start_date,
    end_date,
    "gs://test-agb-bucket/Data_2023/sentinel2-data",
    cloud_mask_function=sentinel2_cloud_mask
)

# Export Landsat
export_gee_to_geotiff(
    "LANDSAT/LC08/C01/T1_SR",
    ["B2", "B3", "B4", "B5"],
    amazon_basin,
    start_date,
    end_date,
    "gs://test-agb-bucket/Data_2023/landsat-data"
)

# Export DEM without cloud mask function
export_gee_to_geotiff(
    "USGS/SRTMGL1_003",  # DEM dataset in Google Earth Engine
    ["elevation", "slope", "aspect"],  # DEM bands
    amazon_basin,
    start_date,
    end_date,
    "gs://test-agb-bucket/Data_2023/dem-data"
)

# Define GCS paths for input data (GeoTIFF files)
gcs_paths = {
    "sentinel1": "gs://test-agb-bucket/Data_2023/sentinel1-data",
    "sentinel2": "gs://test-agb-bucket/Data_2023/sentinel2-data",
    "landsat": "gs://test-agb-bucket/Data_2023/landsat-data",
    "dem": "gs://test-agb-bucket/Data_2023/dem-data",
    "gedi": [f"gs://test-agb-bucket/GEDIL4A2023/gedi-data-{i}.h5" for i in range(1, 57)]
}

# Set up Dask cluster (can be configured further if running on a large VM)
from dask.distributed import Client
client = Client()
print(client)

# Use fsspec to list all the files in the GEDI data path
fs = fsspec.filesystem('gs')

# List all GEDI HDF5 files in the GCS path (you can specify a folder here)
gedi_files = fs.glob("gs://test-agb-bucket/GEDIL4A2023/GEDI04_A_*.h5")  # Use a wildcard to match all files with the prefix

# Open the GEDI files and combine them
gedi_datasets = []

for gedi_file in gedi_files:
    with fs.open(gedi_file, 'rb') as f:
        gedi_datasets.append(xr.open_dataset(f, engine='h5netcdf', chunks={"lon": 100, "lat": 100}))

# Combine all GEDI datasets into one
gedi_combined = xr.concat(gedi_datasets, dim="time")

# Inspect the combined dataset
print(len(gedi_combined))

# Filter GEDI data where AGBD is not null and l4_flag is 1
gedi_filtered = gedi_combined.where(
    (gedi_combined['agbd'].notnull()) & (gedi_combined['l4_quality_flag'] == 1),
    drop=True
)

# Inspect the filtered dataset
print(len(gedi_filtered))

# Extract the filtered AGBD (Above-Ground Biomass Density)
gedi_agbd_filtered = gedi_filtered['agbd']


# Load Sentinel-1, Sentinel-2, Landsat, and DEM data
sentinel1 = xr.open_rasterio(gcs_paths['sentinel1'], chunks={"x": 100, "y": 100})
sentinel2 = xr.open_rasterio(gcs_paths['sentinel2'], chunks={"x": 100, "y": 100})
landsat = xr.open_rasterio(gcs_paths['landsat'], chunks={"x": 100, "y": 100})
dem = xr.open_rasterio(gcs_paths['dem'], chunks={"x": 100, "y": 100})

# Calculate predictor variables: NDVI, EVI, VV, VH, Slope, Aspect, and Sentinel-2 Bands
ndvi = (landsat['nir'] - landsat['red']) / (landsat['nir'] + landsat['red'])
evi = 2.5 * (landsat['nir'] - landsat['red']) / (landsat['nir'] + 6 * landsat['red'] - 7.5 * landsat['blue'] + 1)
s2_band2 = sentinel2['B2']
s2_band3 = sentinel2['B3']
s2_band4 = sentinel2['B4']
s2_band8 = sentinel2['B8']
vv = sentinel1['vv']
vh = sentinel1['vh']
slope = dem['slope']
aspect = dem['aspect']

# Combine predictor variables into a single dataset
data_vars = {
    "ndvi": ndvi,
    "evi": evi,
    "s2_band2": s2_band2,
    "s2_band3": s2_band3,
    "s2_band4": s2_band4,
    "s2_band8": s2_band8,
    "vv": vv,
    "vh": vh,
    "slope": slope,
    "aspect": aspect,
}
data_combined = xr.Dataset(data_vars)

# Flatten the combined data for ML model input
features = data_combined.to_array().stack(points=("lon", "lat")).transpose("points", "variable").values

# Mask and align the features and labels
gedi_agbd_flat = gedi_agbd.stack(points=("lon", "lat")).values
mask = ~np.isnan(gedi_agbd_flat)
features = features[mask]
labels = gedi_agbd_flat[mask]

# Train a RandomForest model
rf_model = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
rf_model.fit(features, labels)

# Save the trained model to GCS
model_path = "gs://your-bucket-name/rf_agbd_model.pkl"
with fs.open(model_path, 'wb') as f_out:
    joblib.dump(rf_model, f_out)

print("Model training complete. Saved to:", model_path)

# Predict AGBD using the trained model
predictions = rf_model.predict(features)

# Reshape predictions back to spatial grid
agbd_map = xr.DataArray(predictions, coords=[gedi_combined['lon'], gedi_combined['lat']], dims=["lon", "lat"])

# Save the result to NetCDF
out_path = "gs://your-bucket-name/agbd_map.nc"
with fs.open(out_path, 'wb') as f_out:
    agbd_map.to_netcdf(f_out)

print("AGBD map prediction complete! File saved to:", out_path)


<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Ima


Attention required for COPERNICUS/S2! You are using a deprecated asset.
To ensure continued functionality, please update it.
Learn more: https://developers.google.com/earth-engine/datasets/catalog/COPERNICUS_S2



<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Image'>
Export task started. Check your Google Cloud Storage bucket for results.
<class 'ee.image.Ima

EEException: Too many tasks already in the queue (3000, limit 3000).