In [1]:
import geopandas as gpd
from shapely import box
import ee 
import geemap

ee.Initialize()

In [2]:
gdf = gpd.read_file("data/Ethiopia_AdminBoundaries.shp").to_crs(4326)
study_regions = ["Tigray", "Amhara"]
study_area_gdf = gdf[gdf["R_NAME"].isin(study_regions)]

In [3]:
# Build uniform 1° grid for study area bounds
min_x, min_y, max_x, max_y = [round(coord) for coord in study_area_gdf.total_bounds.tolist()]
cell_size = 1
x_cells = int((max_x - min_x) / cell_size)
y_cells = int((max_y - min_y) / cell_size)

polygons = []
tile_ids = []
for i in range(x_cells):
    for j in range(y_cells):

        x1 = min_x + i * cell_size
        y1 = min_y + j * cell_size

        poly = box(x1, y1, x1 + cell_size, y1 + cell_size)
        tile_id = f"{x1}_{y1 + cell_size}"
        
        polygons.append(poly)
        tile_ids.append(tile_id)

tiles = gpd.GeoDataFrame({"tile_id": tile_ids, "geometry": polygons}, crs="EPSG:4326")

In [4]:
toi = tiles.iloc[27]

In [5]:
def cloudmask_landsat(image):
    qa = image.select('QA_PIXEL')
    
    # Bit flags for Landsat-8 C2 QA_PIXEL
    fill_bit = 1 << 0        # Bit 0: Fill
    dilated_cloud = 1 << 1   # Bit 1: Dilated Cloud
    cirrus = 1 << 2          # Bit 2: Cirrus
    cloud = 1 << 3           # Bit 3: Cloud
    cloud_shadow = 1 << 4    # Bit 4: Cloud Shadow
    
    mask_bits = fill_bit | dilated_cloud | cirrus | cloud | cloud_shadow  # = 31
    qa_mask = qa.bitwiseAnd(mask_bits).eq(0)
    
    return image.updateMask(qa_mask)

def get_landsat8(geometry, start, end):
    """Load and preprocess Landsat-8 C2 imagery"""
    l8 = (ee.ImageCollection('LANDSAT/LC08/C02/T1_TOA')
            .filterDate(start, end)
            .filterBounds(geometry)
            .select(
                  ['B2', 'B3', 'B4','B5', 'B6', 'B7', 'QA_PIXEL'],
                  ['blue', 'green', 'red', 'nir', 'swir1', 'swir2', 'QA_PIXEL']
            )
            .map(lambda img: (
                    img.addBands(img.normalizedDifference(['nir', 'red']).select(['nd'], ['NDVI']))
                )
            )
        )
    
    l8_cloud_masked = l8.map(cloudmask_landsat)
    
    return l8

In [6]:
def visualize_landsat_collection(collection, geometry):
    # Create the composite
    composite = collection.qualityMosaic('NDVI').clip(geometry).clip(geometry)

    # Create an interactive map
    Map = geemap.Map()
    Map.centerObject(tile_geom, zoom=11)

    # Add layers
    Map.addLayer(tile_geom, {'color': 'red'}, 'AOI')
    Map.addLayer(
        composite, 
        {'bands': ['red', 'green', 'blue'], 'min': 0, 'max': 0.3}, 
        'Landsat Composite'
    )

    # Display the map
    return Map

In [7]:
def gdf_to_ee_geometry(gdf_row):
    """Convert a GeoDataFrame row geometry to Earth Engine geometry"""
    return ee.Geometry.Polygon(gdf_row.geometry.__geo_interface__['coordinates'])

# build collection and visualize
tile_geom = gdf_to_ee_geometry(toi)
start_2020 = ee.Date('2020-08-01')
end_2020 = ee.Date('2020-10-30')
start_2022 = ee.Date('2022-09-01')
end_2022 = ee.Date('2022-09-30')
l8_2020 = get_landsat8(tile_geom, start_2020, end_2020)
l8_2022 = get_landsat8(tile_geom, start_2022, end_2022)

In [8]:
#visualize_landsat_collection(l8_2020, tile_geom)

In [9]:
#visualize_landsat_collection(l8_2022, tile_geom)

In [None]:
def cloudmask_sentinel2(max_cloud_probability):
    clouds = ee.Image(img.get('cloud_mask')).select('probability')
    isNotCloud = clouds.lt(max_cloud_probability)
    return img.updateMake(isNotCloud)

    

def get_sentinel2(geometry, start, end):
    """Load and preprocess Sentinel-2 imagery."""
    s2 = (ee.ImageCollection('COPERNICUS/S2_HARMONIZED')
        .filterBounds(geometry)
        .filterDate(start, end)
        .map(lambda img: img
            .addBands(img.normalizedDifference(['B8', 'B4']).select(['nd'], ['NDVI']))
            .addBands(img.metadata('system:time_start'))
        )
        .map(lambda img: (
            img.select(['B1','B2','B3','B4','B8','B10','B11','B12'])
            .divide(10000)
            .addBands(img.select(['QA60']))
            .addBands(img.select(['NDVI']))
            .copyProperties(img, ['system:time_start'])
        ))
        .select(
            ['NDVI','QA60','B1','B2','B3','B4','B8','B10','B11','B12'],
            ['NDVI','QA60','cb','blue','green','red','nir','cirrus','swir1','swir2']
        )
    )

    s2Clouds = (ee.ImageCollection('COPERNICUS/S2_CLOUD_PROBABILITY')
                .filterBounds(geometry)
                .filterDate(start, end)
    )



    return s2_with_cloudless

s2 = get_sentinel2(tile_geom, start_2020, end_2020)

In [11]:
def visualize_sentinel2_collection(collection, geometry):
    composite = collection.qualityMosaic('NDVI').clip(geometry)

    # Create an interactive map
    Map = geemap.Map()
    Map.centerObject(tile_geom, zoom=11)

    # Add layers
    Map.addLayer(tile_geom, {'color': 'red'}, 'AOI')
    Map.addLayer(
        composite, 
        {'bands': ['red', 'green', 'blue'], 'min': 0, 'max': 0.3}, 
        'Sentinel Composite'
    )

    # Display the map
    return Map

In [12]:
tile_geom = gdf_to_ee_geometry(toi)
start_2020 = ee.Date('2020-08-01')
end_2020 = ee.Date('2020-10-30')
start_2022 = ee.Date('2022-08-01')
end_2022 = ee.Date('2022-10-30')
s2_2020 = get_sentinel2(tile_geom, start_2020, end_2020)
s2_2022 = get_sentinel2(tile_geom, start_2022, end_2022)

In [13]:
visualize_sentinel2_collection(s2_2020, tile_geom)

Map(center=[12.500138500181183, 39.50000000000036], controls=(WidgetControl(options=['position', 'transparent_…