In [8]:
# %% [code]
import ee
from google.cloud import storage
import time
import os
import rasterio
import numpy as np

In [3]:
# 1) Initialize Earth Engine
PROJECT_ID = None #project name 
ee.Initialize(project=PROJECT_ID)

# 2) Initialize GCS client
year = 2018

#set correct paths
GCS_BUCKET    = 'efm-ingest'
foldername = "train_" + str(year)
GCS_PREFIX    = 'rois/'+ foldername
GCS_OUTPUTS = f'efm_outputs/'
ASSET_FOLDER  = f'projects/{PROJECT_ID}/assets/'+ foldername


orig_path = None #path to rois (assumes tif-files)


storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET)

In [4]:
# create and start ingestion
def ingest_tif_to_ee(gs_uri, asset_id):
    manifest = {
        'name': asset_id,
        'tilesets': [{
            'sources': [{'uris': [gs_uri]}]
        }]
    }
    ee.data.startIngestion(None, manifest)
    #print(f'Ingestion started: {gs_uri} → {asset_id}')

# 1) List all blobs under the prefix
blobs = bucket.list_blobs(prefix=GCS_PREFIX)
tif_uris = [f'gs://{GCS_BUCKET}/{b.name}' for b in blobs if b.name.endswith('.tif')]

print(f'Found {len(tif_uris)} TIFF files.')

# 2) Ingest in batches
BATCH_SIZE = 200
for i in range(0, len(tif_uris), BATCH_SIZE):
    batch = tif_uris[i:i+BATCH_SIZE]
    for uri in batch:
        # derive a clean asset name from the filename
        fname = uri.split('/')[-1].replace('.tif','')
        asset_id = f'{ASSET_FOLDER}/{fname}'
        ingest_tif_to_ee(uri, asset_id)
    # avoid hammering the API
    print(f'Batch {i//BATCH_SIZE+1} ingested, sleeping 60s…')
    time.sleep(60)

Found 1961 TIFF files.
Batch 1 ingested, sleeping 60s…


In [None]:
def wait_if_many_running(threshold=95):
    running_tasks = [t for t in ee.data.getTaskList() if t['state'] == 'RUNNING']
    if len(running_tasks) >= threshold:
        print(f"Currently {len(running_tasks)} tasks running, pausing 60s to avoid overflow.")
        time.sleep(60)



def get_bbox_from_tif(tif_path):
    with rasterio.open(tif_path) as src:
        bounds = src.bounds
        crs = src.crs.to_string()
        width = src.width
        height = src.height
    return bounds, crs, width, height

def bounds_to_ee_rect(bounds, crs):
    
    """Convert rasterio bounds to ee.Geometry.Rectangle"""
    xmin, ymin, xmax, ymax = bounds.left, bounds.bottom, bounds.right, bounds.top
    return ee.Geometry.Rectangle([xmin, ymin, xmax, ymax], crs, False)


def process_tiff_asset(asset_id, output_name, bounds, crs, year, foldername):
    
    region = bounds_to_ee_rect(bounds,crs)
    
    img = ee.Image(asset_id)
    geom = img.geometry()

    efm_v2 = ee.ImageCollection("GOOGLE/SATELLITE_EMBEDDING/V1/ANNUAL")
    print(foldername, "year:", year)
    emb = (
        efm_v2
        .filterDate(f"{year}-01-01", f"{year}-12-31")
        .mosaic()
        .clip(region)
    )

    #region = ee.Geometry.Rectangle([381750.0, 7477910.0, 384310.0, 7480470.0], 'EPSG:32635', False)

    task = ee.batch.Export.image.toCloudStorage(
            image=emb,
            description=f'EFM_Embeddings_{year}_{output_name}',
            bucket=GCS_BUCKET,
            fileNamePrefix= f"{GCS_OUTPUTS}/{foldername}/{output_name}",
            region=region,
            dimensions=[256, 256],
            crs = 'EPSG:32635',
            maxPixels=1e13,
            fileFormat='GeoTIFF',
            formatOptions={'cloudOptimized': True}
        )
    task.start()
    return task

    
def running(foldername):
    

    #bounds, crs, width, height = get_bbox_from_tif(orig_path + asset_name + ".tif")
    #task = process_tiff_asset(asset_id, asset_name, bounds,crs,year=2021)
    
    # 1) List all assets in the test_agbm_with_year folder
    assets = ee.data.listAssets({'parent':ASSET_FOLDER})['assets']
    asset_ids = [a['id'] for a in assets if a['id'].endswith('_agbm') or '.tif' in a['id']] # projects/force-445212/assets/test_agbm_with_year/2018_00a28320_agbm
    #asset_years = [a.split('/')[-1].split('_')[0] for a in asset_ids]
    
    print(f'Found {len(asset_ids)} assets to process.')
    
    # 2) Process in batches to avoid too many concurrent tasks
    TASK_BATCH = 100
    processing_tasks = []
    
    for i in range(0, len(asset_ids), TASK_BATCH):
        wait_if_many_running()

        for aid in asset_ids[i:i+TASK_BATCH]:
            name = aid.split('/')[-1]
    
            tif_path = os.path.join(orig_path, name)
            if not tif_path.endswith(".tif"):
                tif_path += ".tif"
            bounds, crs, width, height = get_bbox_from_tif(tif_path)
            
            t = process_tiff_asset(aid, name, bounds,crs,year, foldername)
            #t = process_tiff_asset(aid, name, year=target_year)
            processing_tasks.append(t)
        print(f'Launched batch {i//TASK_BATCH+1}, sleeping…')
        # break
        time.sleep(60)



running(foldername)