In [26]:
def replace_special_characters(string):
    string = string.replace(" ", "_")
    string = string.replace("/", "_")
    string = string.replace("[", "")
    string = string.replace("]", "")
    string = string.replace("(", "")
    string = string.replace(")", "")
    string = string.replace("&", "and")
    string = string.replace("'", "")

    return string

In [27]:
import ee
import geopandas as gpd

from pathlib import Path
from IPython.display import display, clear_output

from utils.atmosheric_correction import apply_atmospheric_correction

Authentication

In [28]:
ee.Authenticate() # Trigger the authentication flow.
ee.Initialize() # Initialize the library.


Successfully saved authorization token.


Downloading all images to Google Drive

In [29]:
CRS = 'EPSG:3857'
START_DATE = '2023-01-01'
END_DATE = '2024-05-24'
CLOUD_FILTER = 15
CLD_PRB_THRESH = 50
NIR_DRK_THRESH = 0.15
CLD_PRJ_DIST = 1
BUFFER = 10
COLLECTION = 'COPERNICUS/S2_SR_HARMONIZED'

In [30]:
def get_s2_cld_col(aoi, start_date, end_date):
    # Import and filter S2 SR.
    s2_sr_col = (ee.ImageCollection(COLLECTION)
        .filterBounds(aoi)
        .filterDate(start_date, end_date)
        .filter(ee.Filter.lte('CLOUDY_PIXEL_PERCENTAGE', CLOUD_FILTER)))
    
    # Import and filter s2cloudless.
    s2_cloudless_col = (ee.ImageCollection('COPERNICUS/S2_CLOUD_PROBABILITY')
        .filterBounds(aoi)
        .filterDate(start_date, end_date))

    # Join the filtered s2cloudless collection to the SR collection by the 'system:index' property.
    return ee.ImageCollection(ee.Join.saveFirst('s2cloudless').apply(**{
        'primary': s2_sr_col,
        'secondary': s2_cloudless_col,
        'condition': ee.Filter.equals(**{
            'leftField': 'system:index',
            'rightField': 'system:index'
        })
    }))

def add_cloud_bands(img):
    # Get s2cloudless image, subset the probability band.
    cld_prb = ee.Image(img.get('s2cloudless')).select('probability')
    
    # Condition s2cloudless by the probability threshold value.
    is_cloud = cld_prb.gt(CLD_PRB_THRESH).rename('clouds')

    # Add the cloud probability layer and cloud mask as image bands.
    return img.addBands(ee.Image([cld_prb, is_cloud]))

def add_shadow_bands(img):
    # Identify water pixels from the SCL band.
    not_water = img.select('SCL').neq(6)

    # Identify dark NIR pixels that are not water (potential cloud shadow pixels).
    SR_BAND_SCALE = 1e4
    # dark_pixels = img.select('B8').lt(NIR_DRK_THRESH*SR_BAND_SCALE).multiply(not_water).rename('dark_pixels')
    dark_pixels = img.select('B8').lt(NIR_DRK_THRESH*SR_BAND_SCALE).rename('dark_pixels')

    # Determine the direction to project cloud shadow from clouds (assumes UTM projection).
    shadow_azimuth = ee.Number(90).subtract(ee.Number(img.get('MEAN_SOLAR_AZIMUTH_ANGLE')));

    # Project shadows from clouds for the distance specified by the CLD_PRJ_DIST input.
    cld_proj = (img.select('clouds').directionalDistanceTransform(shadow_azimuth, CLD_PRJ_DIST*10)
        .reproject(**{'crs': img.select(0).projection(), 'scale': 100})
        .select('distance')
        .mask()
        .rename('cloud_transform'))

    # Identify the intersection of dark pixels with cloud shadow projection.
    shadows = cld_proj.multiply(dark_pixels).rename('shadows')

    # Add dark pixels, cloud projection, and identified shadows as image bands.
    return img.addBands(ee.Image([dark_pixels, cld_proj, shadows]))

def add_cld_shdw_mask(img):
    # Add cloud component bands.
    img_cloud = add_cloud_bands(img)

    # Add cloud shadow component bands.
    img_cloud_shadow = add_shadow_bands(img_cloud)

    # Combine cloud and shadow mask, set cloud and shadow as value 1, else 0.
    is_cld_shdw = img_cloud_shadow.select('clouds').add(img_cloud_shadow.select('shadows')).gt(0)

    # Remove small cloud-shadow patches and dilate remaining pixels by BUFFER input.
    # 20 m scale is for speed, and assumes clouds don't require 10 m precision.
    is_cld_shdw = (is_cld_shdw.focalMin(2).focalMax(BUFFER*2/20)
        .reproject(**{'crs': img.select([0]).projection(), 'scale': 20})
        .rename('cloudmask'))

    # Add the final cloud-shadow mask to the image.
    return img_cloud_shadow.addBands(is_cld_shdw)

def apply_cld_shdw_mask(img):
    # Subset the cloudmask band and invert it so clouds/shadow are 0, else 1.
    not_cld_shdw = img.select('cloudmask').Not()

    # Subset reflectance bands and update their masks, return the result.
    return img.select('B.*').updateMask(not_cld_shdw)

In [31]:
# Dynaimc file region
sica_cities = '../data/0/SICA_DO_cities.parquet'
gdf = gpd.read_parquet(sica_cities)

num_cities = len(gdf.index)
print("Number of cities:", num_cities)

tasks = []
skipped_cities = []
task_number = 1

for _, row in gdf.iterrows():
    city_poly = row.geometry
    city_name = row["city_ascii"]
    country_code = row["iso3"]
    minx, miny, maxx, maxy = city_poly.bounds
    city_bbox = [[[minx, miny],
                  [maxx, miny],
                  [maxx, maxy],
                  [minx, maxy],
                  [minx, miny]]]
    geometry = ee.Geometry.Polygon(city_bbox)

    AOI = geometry
    FILENAME = country_code + '_' + replace_special_characters(city_name) + f'_{END_DATE[:4]}'

    s2_cld_col = get_s2_cld_col(AOI, START_DATE, END_DATE)
    try:
        s2_cld_col = s2_cld_col.map(lambda x: x.clip(AOI))
        
        s2_cld_corr = (s2_cld_col.map(add_cld_shdw_mask)
                                 .map(apply_cld_shdw_mask)
                      )

        if COLLECTION == 'COPERNICUS/S2_HARMONIZED' or COLLECTION == 'COPERNICUS/S2':
            s2_sr_median = apply_atmospheric_correction(img_collection=s2_cld_corr, geometry=geometry).median()
        else:
            s2_sr_median = s2_cld_corr.median()
    except:
        print(f"Skipping file {FILENAME}")
        skipped_cities.append(FILENAME)
        task_number += 1
        continue
    
    s2_sr_sub = s2_sr_median.select(["B2", "B3", "B4", "B8"])

    task = ee.batch.Export.image.toDrive(image=s2_sr_sub,
                                        description=FILENAME,
                                        folder="SICA_UNITAC_cities_tifs",
                                        scale=10,
                                        region=AOI,
                                        fileNamePrefix=FILENAME,
                                        crs=CRS,
                                        fileFormat='GeoTIFF')
    tasks.append(task)
    clear_output(wait=True)
    display(f"Starting task {task_number}/{num_cities}")
    task_number += 1
    task.start()

clear_output(wait=True)
print(f"Started {num_cities-len(skipped_cities)}/{num_cities} tasks.")
if len(skipped_cities) > 0:
    print("Skipped the following cities:")
    for city in skipped_cities:
        print(city)

Started 11/11 tasks.


In [19]:
# gdf = gpd.read_file("../data/1/UNITAC_data/SantoDomingo_PS_modified.geojson")
# minx, miny, maxx, maxy = gdf.total_bounds
# city_bbox = [[[minx, miny],
#                 [maxx, miny],
#                 [maxx, maxy],
#                 [minx, maxy],
#                 [minx, miny]]]
# geometry = ee.Geometry.Polygon(city_bbox)
# AOI = geometry
# FILENAME = 'SantoDomingo_' + f'_{START_DATE, END_DATE[:4]}'

# s2_cld_col = get_s2_cld_col(AOI, START_DATE, END_DATE)
# try:
#     s2_cld_col = s2_cld_col.map(lambda x: x.clip(AOI))
    
#     s2_cld_corr = (s2_cld_col.map(add_cld_shdw_mask)
#                                 .map(apply_cld_shdw_mask))
#     s2_sr_median = s2_cld_corr.median()
# except:
#     print("Skipping file")

# s2_cld_col = get_s2_cld_col(AOI, START_DATE, END_DATE)
# s2_sr_sub = s2_sr_median.select(["B2", "B3", "B4", "B8"])

# task = ee.batch.Export.image.toDrive(image=s2_sr_sub,
#                                     description="unitacSDfile",
#                                     folder="UNITAC_pro_tifs",
#                                     scale=10,
#                                     region=AOI,
#                                     fileNamePrefix=FILENAME,
#                                     crs=CRS,
#                                     fileFormat='GeoTIFF')
# clear_output(wait=True)
# display(f"Starting task SD")
# task.start()

In [22]:
# # Median cloudless
# minx, miny, maxx, maxy = gdf.total_bounds
# city_bbox = [[[minx, miny],
#               [maxx, miny],
#               [maxx, maxy],
#               [minx, maxy],
#               [minx, miny]]]
# geometry = ee.Geometry.Polygon(city_bbox)
# AOI = geometry

# # Define the date range
# START_DATE = '2023-01-01'
# END_DATE = '2024-05-24'

# # Function to mask clouds using the QA60 band
# def maskS2clouds(image):
#     qa = image.select('QA60')
#     cloudBitMask = 1 << 10
#     cirrusBitMask = 1 << 11
#     mask = qa.bitwiseAnd(cloudBitMask).eq(0).And(qa.bitwiseAnd(cirrusBitMask).eq(0))
#     return image.updateMask(mask).divide(10000)

# # Load Sentinel-2 surface reflectance data
# collection = ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED') \
#     .filterDate(START_DATE, END_DATE) \
#     .filterBounds(AOI) \
#     .map(maskS2clouds)

# # Compute the median composite to reduce cloud cover
# median_image = collection.median().clip(AOI)

# # Define export parameters
# FILENAME = 'unitacSDfile_median'
# CRS = 'EPSG:4326'  # You can change to your preferred coordinate reference system

# # Export the image to Google Drive
# task = ee.batch.Export.image.toDrive(image=median_image,
#                                      description="unitacSDfile",
#                                      folder="UNITAC_pro_tifs",
#                                      scale=10,
#                                      region=AOI,
#                                      fileNamePrefix=FILENAME,
#                                      crs=CRS,
#                                      fileFormat='GeoTIFF')

# # Start the task
# task.start()

# # Print a message
# print("Starting task SD")

In [24]:
clear_output(wait=True)