In [7]:
import ee
from google.cloud import storage
import time
import json

# 1) Initialize Earth Engine
PROJECT_ID = 'force-445212'
ee.Initialize(project=PROJECT_ID)

# 2) Initialize GCS client
GCS_BUCKET = 'tessera-representation-workshop'
GCS_PREFIX = '2020'  # folder in the bucket
ASSET_FOLDER = f'projects/{PROJECT_ID}/assets/tessera_embeddings_{GCS_PREFIX}'
COLLECTION_ID = f'projects/{PROJECT_ID}/assets/tessera_embeddings_{GCS_PREFIX}_collection'

storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET)

In [9]:

# Helper: create and start ingestion with proper band configuration
def ingest_tif_to_ee(gs_uri, asset_id):
    """Ingest a single TIFF file to Earth Engine with 128 bands."""
    
    # Configure bands for 128-channel float32 data
    bands = []
    for i in range(128):
        bands.append({
            'id': f'b{i+1}',  # Band names: b1, b2, ..., b128
            'tileset_band_index': i,
            'pyramiding_policy': 'MODE'  # or 'MEAN' depending on your data
        })
    
    manifest = {
        'name': asset_id,
        'tilesets': [{
            'sources': [{
                'uris': [gs_uri]
            }]
        }],
        'bands': bands,
        'pyramiding_policy': 'MODE',  # Global pyramiding policy
        'start_time': '2020-01-01T00:00:00Z',  # Optional: add time metadata
        'end_time': '2020-12-31T23:59:59Z'
    }
    
    try:
        ee.data.startIngestion(None, manifest)
        print(f'Ingestion started: {gs_uri} → {asset_id}')
        return True
    except Exception as e:
        print(f'Failed to start ingestion for {gs_uri}: {e}')
        return False

# Alternative simpler version - let EE auto-detect bands
def ingest_tif_to_ee_simple(gs_uri, asset_id):
    """Simpler ingestion - let Earth Engine auto-detect the 128 bands."""
    
    manifest = {
        'name': asset_id,
        'tilesets': [{
            'sources': [{
                'uris': [gs_uri]
            }]
        }],
        'pyramiding_policy': 'MODE',
        'properties': {
            'source_file': gs_uri.split('/')[-1],
            'year': 2020
        }
    }
    
    try:
        ee.data.startIngestion(None, manifest)
        print(f'Ingestion started: {gs_uri} → {asset_id}')
        return True
    except Exception as e:
        print(f'Failed to start ingestion for {gs_uri}: {e}')
        return False
def check_ingestion_status(asset_id):
    """Check if an asset exists and get its ingestion status."""
    try:
        asset_info = ee.data.getAsset(asset_id)
        return 'READY'  # Asset exists and is ready
    except:
        # Check if there's an ongoing ingestion task
        try:
            tasks = ee.data.listOperations()
            for task in tasks:
                if asset_id in task.get('name', ''):
                    return task.get('metadata', {}).get('state', 'UNKNOWN')
            return 'NOT_FOUND'
        except:
            return 'ERROR'

# Helper: wait for all ingestions to complete
def wait_for_ingestions(asset_ids, check_interval=120, max_wait=7200):
    """Wait for all assets to be ingested."""
    start_time = time.time()
    pending = asset_ids.copy()
    
    while pending and (time.time() - start_time) < max_wait:
        still_pending = []
        for asset_id in pending:
            status = check_ingestion_status(asset_id)
            if status == 'READY':
                print(f'✓ {asset_id} is ready')
            elif status in ['RUNNING', 'PENDING']:
                still_pending.append(asset_id)
            else:
                print(f'✗ {asset_id} failed or not found')
        
        pending = still_pending
        if pending:
            print(f'{len(pending)} assets still pending, waiting {check_interval}s...')
            time.sleep(check_interval)
    
    return len(pending) == 0

# Helper: create ImageCollection from ingested assets
def create_image_collection(asset_ids, collection_id):
    """Create an ImageCollection from individual Image assets."""
    
    # First, check if the collection already exists
    try:
        ee.data.getAsset(collection_id)
        print(f'Collection {collection_id} already exists')
        # Optionally delete it: ee.data.deleteAsset(collection_id)
        return False
    except:
        pass  # Collection doesn't exist, we can create it
    
    # Create the ImageCollection
    try:
        # Method 1: Create empty collection and add images
        ee.data.createAsset({
            'type': 'IMAGE_COLLECTION',
            'name': collection_id
        })
        print(f'Created empty ImageCollection: {collection_id}')
        
        # Add each image to the collection
        for asset_id in asset_ids:
            try:
                # Copy image to collection
                image_name = asset_id.split('/')[-1]
                collection_image_id = f'{collection_id}/{image_name}'
                ee.data.copyAsset(asset_id, collection_image_id)
                print(f'Added {image_name} to collection')
            except Exception as e:
                print(f'Failed to add {asset_id} to collection: {e}')
        
        return True
        
    except Exception as e:
        print(f'Failed to create ImageCollection: {e}')
        return False

# Alternative: Create collection using ee.ImageCollection
def create_collection_from_list(asset_ids):
    """Create an ImageCollection programmatically from a list of asset IDs."""
    
    # Load all images
    images = []
    for asset_id in asset_ids:
        try:
            img = ee.Image(asset_id)
            # Add properties if needed
            img = img.set({
                'system:id': asset_id.split('/')[-1],
                'ingestion_time': ee.Date(time.time() * 1000)
            })
            images.append(img)
        except Exception as e:
            print(f'Failed to load {asset_id}: {e}')
    
    # Create collection
    collection = ee.ImageCollection(images)
    print(f'Created ImageCollection with {len(images)} images')
    
    return collection

# Main execution
if __name__ == '__main__':
    
    # Step 1: List all TIFF files
    blobs = bucket.list_blobs(prefix=GCS_PREFIX)
    tif_uris = [f'gs://{GCS_BUCKET}/{b.name}' for b in blobs if b.name.endswith('.tif')]
    
    print(f'Found {len(tif_uris)} TIFF files.')
    
    # Step 2: Prepare asset IDs
    asset_ids = []
    for uri in tif_uris:
        fname = uri.split('/')[-1].replace('.tif', '')
        # 把"."换为"_"，因为不被允许
        fname = fname.replace('.', '_')
        asset_id = f'{ASSET_FOLDER}/{fname}'
        asset_ids.append(asset_id)
    
    # Step 3: Check which assets already exist
    existing_assets = []
    new_assets = []
    
    for asset_id in asset_ids:
        if check_ingestion_status(asset_id) == 'READY':
            existing_assets.append(asset_id)
        else:
            new_assets.append(asset_id)
    
    print(f'Already ingested: {len(existing_assets)} assets')
    print(f'Need to ingest: {len(new_assets)} assets')
    
    # Step 4: Ingest new assets in batches
    if new_assets:
        BATCH_SIZE = 200
        ingested_asset_ids = []
        
        for i in range(0, len(new_assets), BATCH_SIZE):
            batch_asset_ids = new_assets[i:i+BATCH_SIZE]
            batch_uris = [tif_uris[asset_ids.index(aid)] for aid in batch_asset_ids]
            
            for uri, asset_id in zip(batch_uris, batch_asset_ids):
                # Try the detailed version first, if it fails, you can switch to simple version
                if ingest_tif_to_ee(uri, asset_id):  # or use ingest_tif_to_ee_simple
                    ingested_asset_ids.append(asset_id)
            
            # Avoid hitting API rate limits
            print(f'Batch {i//BATCH_SIZE+1} submitted, sleeping 30s...')
            time.sleep(30)
        
        # Step 5: Wait for all ingestions to complete
        print('\nWaiting for all ingestions to complete...')
        if wait_for_ingestions(ingested_asset_ids, check_interval=60):
            print('All ingestions completed successfully!')
        else:
            print('Some ingestions may have failed or timed out.')
    
    # Step 6: Create ImageCollection
    all_ready_assets = existing_assets + [aid for aid in new_assets if check_ingestion_status(aid) == 'READY']
    
    print(f'\nTotal ready assets: {len(all_ready_assets)}')
    
    # Option A: Create persistent collection asset
    if create_image_collection(all_ready_assets, COLLECTION_ID):
        print(f'Successfully created ImageCollection: {COLLECTION_ID}')
    
    # Option B: Create collection programmatically (for immediate use)
    collection = create_collection_from_list(all_ready_assets)
    
    # Test the collection
    print('\nTesting the collection:')
    print(f'Collection size: {collection.size().getInfo()}')
    first_image = ee.Image(collection.first())
    print(f'First image bands: {first_image.bandNames().getInfo()}')
    print(f'First image pixel type: {first_image.select(0).projection().getInfo()}')

Found 2 TIFF files.
Already ingested: 0 assets
Need to ingest: 2 assets
Ingestion started: gs://tessera-representation-workshop/2020/grid_117.75_4.95.tif → projects/force-445212/assets/tessera_embeddings_2020/grid_117_75_4_95
Ingestion started: gs://tessera-representation-workshop/2020/grid_117.85_4.95.tif → projects/force-445212/assets/tessera_embeddings_2020/grid_117_85_4_95
Batch 1 submitted, sleeping 30s...

Waiting for all ingestions to complete...
✗ projects/force-445212/assets/tessera_embeddings_2020/grid_117_75_4_95 failed or not found
✗ projects/force-445212/assets/tessera_embeddings_2020/grid_117_85_4_95 failed or not found
All ingestions completed successfully!

Total ready assets: 0
Collection projects/force-445212/assets/tessera_embeddings_2020_collection already exists
Created ImageCollection with 0 images

Testing the collection:
Collection size: 0


EEException: Image.bandNames: Parameter 'image' is required and may not be null.