# Script to extract images from sentinel dataset-2 and dynamic world v1 as tf records.

# Initialize Google earth engine module

In [2]:
import ee
import pandas as pd
import time

ee.Authenticate()


Successfully saved authorization token.


In [3]:
# Initialize the Earth Engine module.
ee.Initialize()

# Read CSV file where there are 6 columns:
- year
- cell id
- centroid latitude
- centroid longtitude
- label of wealth index

In [None]:
df = pd.read_csv('label.csv')

# Common variables
PREFIX = 'asset-index-3'
SCALE = 30  # spatial resolution., meaning 30m per pixel.
KSIZE = 112  # half the size of the desired patch in pixels (225 / 2)
BANDS = ['B2', 'B3', 'B4', 'B8', 'B11','B12'] # Selected band names from sentinel dataset 2
SCALE_FACTOR = 10000  # Scaling factor for Sentinel-2 reflectance values (This is from the documentation of API)
DATASET = 'COPERNICUS/S2' # Sentinel-2
DYNAMIC = 'GOOGLE/DYNAMICWORLD/V1' # Dynamic world v1 dataset


NUM_BATCH = 20 # Number of batches to extract at once. This was done to make sure if there is failure, we can restart from where it failed. 
BATCH_SIZE = len(df) // NUM_BATCH

In [7]:
def export_image(longtitude, latitude, year, wi):
    """
    Exports an image for a given location and year.

    Args:
        longtitude (float): Longitude of the location.
        latitude (float): Latitude of the location.
        year (int): Year for which the image is to be exported.
        wi (str): Additional identifier for the image.

    """
    
    # Define the point of interest and create a buffer region around it
    point = ee.Geometry.Point([longtitude, latitude])
    buffered_point = point.buffer(SCALE * KSIZE)

    def cloudiness(image):
        """
        Estimates cloud coverage over the region of interest.

        Args:
            image (ee.Image): Image to be processed.

        Returns:
            ee.Image: Image with an additional 'cloudiness' property.
        """
        qa_band = image.select('QA60')
        opaque_clouds = qa_band.bitwiseAnd(1 << 10).neq(0)
        cirrus_clouds = qa_band.bitwiseAnd(1 << 11).neq(0)
        total_cloud_mask = opaque_clouds.Or(cirrus_clouds)
        cloud_coverage_ratio = total_cloud_mask.reduceRegion(
            reducer=ee.Reducer.mean(),
            geometry=buffered_point,
            scale=SCALE
        ).get('QA60')
        return image.set('cloudiness', cloud_coverage_ratio)
    
    quarterly_images = []

     # Retrieve the least cloudy image of the year as a fallback
    yearly_collection = ee.ImageCollection(DATASET) \
        .filterDate(f'{year}-01-01', f'{year}-12-31') \
        .filterBounds(buffered_point) \
        .map(cloudiness)

    yearly_least_cloudy = yearly_collection.sort('cloudiness').first().divide(SCALE_FACTOR)

    for i, (start_date, end_date) in enumerate(get_quarter_dates(year)):
        collection = ee.ImageCollection(DATASET) \
            .filterDate(start_date, end_date) \
            .filterBounds(buffered_point) \
            .map(cloudiness)
        
        # Check if the collection is empty
        count = collection.size().getInfo()
        if count == 0:
            least_cloudy_image = yearly_least_cloudy
        else:
            least_cloudy_image = collection.sort('cloudiness').first().divide(SCALE_FACTOR)

        quarter_suffix = f'_{i+1}'
        renamed_bands = [band + quarter_suffix for band in BANDS]
        least_cloudy_image = least_cloudy_image.select(BANDS, renamed_bands)


        quarterly_images.append(least_cloudy_image)
    
    # Yearly Dynamic World collection filtered by date and bounds
    # Compute the median of the 'label' band over the year
    # Rename band into landcover
    dwYearly = ee.ImageCollection(DYNAMIC) \
        .filterDate(f'{year}-01-01', f'{year}-12-31') \
        .filterBounds(buffered_point) \
        .select('label') \
        .median()
    
    quarterly_images.append(dwYearly)

    # Combine the quarterly images into a single image
    combined_image = ee.Image.cat(quarterly_images)
    
    # Define the point of interest as a feature collection
    points = ee.FeatureCollection([ee.Feature(point, {'longtitude': longtitude, 'latitude': latitude})])

        # Set up the export task
    task = get_array_patches(
        combined_image,
        points,
        longtitude,
        latitude,
        year,
        wi
    )
    # Start the export
    task.start()

def get_quarter_dates(year):
    """
    Returns a list of date ranges for each quarter.

    Args:
        year (int): The year for which the quarters are to be calculated.

    Returns:
        list: List of tuples representing the start and end dates of each quarter.
    """
    return [
        (f'{year}-01-01', f'{year}-03-31'), # Q1
        (f'{year}-04-01', f'{year}-06-30'), # Q2
        (f'{year}-07-01', f'{year}-09-30'), # Q3
        (f'{year}-10-01', f'{year}-12-31')  # Q4
    ]

def get_array_patches(img, points, longtitude, latitude, year, wi):
    """
    Creates an array of image patches and sets up an export task.

    Args:
        img (ee.Image): Combined image of quarterly and dynamic world data.
        points (ee.FeatureCollection): Feature collection of the point of interest.
        longtitude (float): Longitude of the location.
        latitude (float): Latitude of the location.
        year (int): Year for which the image is to be exported.
        wi (str): Additional identifier for the image.

    Returns:
        ee.batch.Task: Task to export the patches to Google Drive.
    """
    kern = ee.Kernel.square(radius=KSIZE, units='pixels')
    patches_array = img.neighborhoodToArray(kern)
    samples = points.map(lambda pt: sample_patch(pt, patches_array, longtitude, latitude, wi))

    # Export to TFRecord file
    task = ee.batch.Export.table.toDrive(
        collection=samples,
        description=PREFIX,
        folder=PREFIX,  
        fileNamePrefix=f'{year}_{longtitude}_{latitude}_{wi}',
        fileFormat='TFRecord'
    )
    return task

def sample_patch(point, patches_array, longtitude, latitude, wi):
    """
    Samples a patch and includes additional properties.

    Args:
        point (ee.Feature): Feature representing the point of interest.
        patches_array (ee.Image): Image array of patches.
        longtitude (float): Longitude of the location.
        latitude (float): Latitude of the location.
        wi (str): Additional identifier for the image.

    Returns:
        ee.Feature: Feature with sampled patch and additional properties.
    """
    arrays_samples = patches_array.sample(
        region=point.geometry(),
        scale=SCALE,
        projection='EPSG:3857',
        factor=None,
        numPixels=None,
        dropNulls=False,
        tileScale=12)
    sample_feature = arrays_samples.first()

    # Set additional properties to the feature
    sample_feature = sample_feature.set({
        'longtitude': longtitude,
        'latitude': latitude,
        'wi' : wi
    })
    return sample_feature

In [8]:
for i in range(NUM_BATCH):
    batch_df = df[i * BATCH_SIZE:(i + 1) * BATCH_SIZE]
    print(i)
    for idx, row in batch_df.iterrows():
        longtitude = row['centroid_longtitude']
        latitude = row['centroid_latitude']
        year = row['year']
        wi = row['wi']
        try:
            export_image(longtitude, latitude, 2016 if year == 2014 else year, wi)
        except:
            print(f'Error at {longtitude}, {latitude}, {year}')
            time.sleep(30) # sleep for 30s to avoid limitation from the google earth engine.

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
