# Automation Scripts

In [None]:
import sqlite3
from pathlib import Path
from observatorio_ipa.core.workflows import automation
from observatorio_ipa.utils import db

In [None]:
# general settings - Prefer reading settings from config.Settings object
#! JS code shows: users/observatorionieves/Cuencas/Andes # Verified on 2025-06-30

SENSOR = "MCD"
settings = {
    # User and credentials
    "user": "osn-imageautomation-dev@ee-observatorionieves.iam.gserviceaccount.com",
    "service_credentials_file": Path(
        "../secrets/ee-observatorionieves-288939dbc1cf.json"
    ),
    # DB
    "db_path": Path("../db/observatorio_ipa.db"),
    # ----- Image Export Input Assets ------
    "aoi_asset_path": Path("projects/ee-observatorionieves/assets/Modules/Andes"),
    "dem_asset_path": Path(
        "projects/ee-observatorionieves/assets/Modules/DEM_SRTM_reproj_MODIS_463_Andes"
    ),
    # Monthly Image Collections (Used for both Input and Output)
    "monthly_collection_path": Path(
        "projects/ee-observatorionieves/assets/Test/MODIS/Andes_MCDS4S5_Yearly_Monthly"
    ),
    "monthly_image_prefix": "Andes_MCDS4S5_Yearly_Monthly",
    "months_list": ["2024-07", "2024-08", "2024-09", "2024-10", "2024-11", "2024-12"],

    # Yearly Image Collections (Used for both Input and Output)
    "yearly_collection_path": Path(
        "projects/ee-observatorionieves/assets/Test/MODIS/Andes_MCDS4S5_Yearly"
    ),
    "yearly_image_prefix": "Andes_MCDS4S5_Yearly",

    # ----- Stats Export Input Assets ------
    "basins_asset_path": Path(
        "users/observatorionieves/DGA/Cuencas_BNA_Oficial"
    ),  # BASINS_BNA_ASSET_PATH
    "macrozones_asset_path": Path(
        "users/observatorionieves/DGA/Macrozonas_BNA_Oficial"
    ),  # MACROZONAS_BNA_ASSET_PATH
    "basins_cd_property": "COD_CUEN",
    "stats_dem_asset_path": Path(
        "users/observatorionieves/DEM/DEM_SRTM_reproj_Landsat_100_Andes"
    ),  # DEM_ASSET_PATH
    "salar_mask_asset_path": Path(
        "projects/ee-observatorionieves/assets/Vectores/Salar_mask"
    ),
    "snow_persistence_asset_path": Path(
        "users/observatorionieves/MODIS/MCD10A1_Andes_T48_Summary/MCD10A1_Andes_T48_Summary_SP"
    ),
    "snow_persistence_trend_asset_path": Path(
        "users/observatorionieves/MODIS/MCD10A1_Andes_T48_Summary/MCD10A1_Andes_T48_Summary_ST"
    ),
    # ----- Stats Common Output Config -----
    "stats_export_target": "storage",
    "stats_storage_bucket": "chompitest-odes",
    "stats_base_export_path": Path("OSN/stats/"),
    "basin_codes": ["023", "024", "030"],
    "exclude_basin_codes": None,
    "max_exports": None,
    "common_tbl_pre_prefix": "MCD",
    # ----- Elevation Output -----
    "elevation_tbl_export_path": Path("elev_ee"),
    "elev_basin_tbl_prefix": f"elev_BNA_",
    "sca_elev_basin_tbl_prefix": f"SCA_elev_BNA_",
    # ----- Month (across-years) Statistics Output -----
    "month_tbl_export_path": Path("month_ee"),
    "sca_m_basin_tbl_prefix": f"SCA_m_BNA_",
    "sca_m_elev_basin_tbl_prefix": f"SCA_m_elev_BNA_",
    "sca_m_trend_basin_tbl_prefix": f"SCA_m_trend_BNA_",
    # ----- Monthly (year/month) Statistics Output -----
    "year_month_tbl_export_path": Path("yearMonth_ee"),
    "sca_y_m_basin_tbl_prefix": f"SCA_y_m_BNA_",
    "sca_ym_basin_tbl_prefix": f"SCA_ym_BNA_",
    "sca_ym_elev_basin_tbl_prefix": f"SCA_ym_elev_BNA_",
    "snowline_ym_basin_tbl_prefix": f"snowline_ym_BNA_",
    # ----- Yearly Statistics Output -----
    "year_tbl_export_path": Path("year_ee"),
    "sca_y_basin_tbl_prefix": f"SCA_y_BNA_",
    "sca_y_elev_basin_tbl_prefix": f"SCA_y_elev_BNA_",
    "sca_y_t_area_basin_tbl_prefix": f"SCA_y_t_area_BNA_",
    "sca_y_t_elev_basin_tbl_prefix": f"SCA_y_t_elev_BNA_",
    "snowline_y_basin_tbl_prefix": f"snowline_y_BNA_",
    # ----- stats manifest -----
    "manifest_source": "file",  # file or storage
    "monthly_manifest_path": Path("../manifests/"),
}

## Connect to EE

In [None]:
# Connect to Google Earth Engine
from observatorio_ipa.services import connections

runtime_service_account = connections.GoogleServiceAccount(
    settings["service_credentials_file"].as_posix(),
)
connections.connect_to_gee(runtime_service_account)

### Daily Job Workflow

In [None]:
# Uses the following settings:

# - settings["db_path"]
# - settings["monthly_collection_path"]
# - settings["monthly_image_prefix"]
# - aoi_path = settings["aoi_asset_path"]
# - dem_path = settings["dem_asset_path"]
# - months_list = settings["months_list"]


auto_daily_job = automation.auto_daily_job(settings=settings)

In [None]:
# Identify running jobs

# Limiting to 1 job for testing this is no
db_path = settings["db_path"]

with db.db(db_path) as conn:
    running_jobs = conn.execute(
        """SELECT created_at, id FROM jobs WHERE job_status = 'RUNNING' ORDER BY created_at DESC"""
    ).fetchall()

    for job in running_jobs:
        print(f"Job: {job['id']} - started at {job['created_at']}")

In [None]:
def _print_job_details(conn, job_id):
    job = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
    print(dict(job))

    tasks_by_type_status = conn.execute(
        "SELECT type, state, COUNT(*) AS count FROM exports WHERE job_id = ? GROUP BY type, state",
        (job_id,),
    ).fetchall()
    for _task in tasks_by_type_status:
        print(dict(_task))

In [None]:
with db.db(db_path) as conn:
    _print_job_details(conn, "d258473f-b7ab-4a50-bc98-dbd91e6853e8")

{'id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8', 'job_status': 'RUNNING', 'image_export_status': 'PENDING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-21T19:22:39.979791+00:00', 'updated_at': '2025-08-21T19:22:39.979791+00:00'}
{'type': 'image', 'state': 'COMPLETED', 'count': 7}
{'type': 'image', 'state': 'FAILED', 'count': 7}


### Job Update Orchestration Workflow

In [None]:
automation.auto_orchestrate_job_updates(settings=settings)

Error generating report for job d258473f-b7ab-4a50-bc98-dbd91e6853e8: dictionary update sequence element #0 has length 18; 2 is required
Error generating report for job 9ac1ca09-894e-4fa2-adaf-192163caf189: dictionary update sequence element #0 has length 18; 2 is required


Updating status for job: d258473f-b7ab-4a50-bc98-dbd91e6853e8
Current job status: {'id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8', 'job_status': 'RUNNING', 'image_export_status': 'PENDING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-21T19:22:39.979791+00:00', 'updated_at': '2025-08-21T19:22:39.979791+00:00'}
Updated job status: {'id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8', 'job_status': 'FAILED', 'image_export_status': 'FAILED', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': ' | Cannot verify all exports were created/completed successfully', 'created_at': '2025-08-21T19:22:39.979791+00:00', 'updated_at': '2025-08-26T17:26:15.418968+00:00'}
{'id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8', 'job_status': 'FAILED', 'image_export_status': 'FAILED', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': ' | Cannot verify all exports were created/

In [None]:
with db.db(db_path) as conn:
    automation.auto_job_report(conn=conn, job_id="d258473f-b7ab-4a50-bc98-dbd91e6853e8")

{'id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8', 'job_status': 'RUNNING', 'image_export_status': 'PENDING', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': None, 'created_at': '2025-08-21T19:22:39.979791+00:00', 'updated_at': '2025-08-21T19:22:39.979791+00:00'}


In [None]:
with db.db(db_path) as conn:
    jobs = conn.execute("""SELECT * FROM jobs """).fetchall()
    for job in jobs:
       print(dict(job))

{'id': 'eb306128-0eae-4ed8-9ea4-5c21baf1a9ec', 'job_status': 'COMPLETED', 'image_export_status': 'COMPLETED', 'stats_export_status': 'COMPLETED', 'report_status': 'COMPLETED', 'email_to': None, 'error': None, 'created_at': '2025-08-20T23:49:04.612341+00:00', 'updated_at': '2025-08-21T01:11:09.657679+00:00'}
{'id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8', 'job_status': 'FAILED', 'image_export_status': 'FAILED', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': '| Cannot verify all exports were created/completed successfully | Error generating report for job d258473f-b7ab-4a50-bc98-dbd91e6853e8: dictionary update sequence element #0 has length 18; 2 is required', 'created_at': '2025-08-21T19:22:39.979791+00:00', 'updated_at': '2025-08-26T17:26:15.419950+00:00'}
{'id': '9ac1ca09-894e-4fa2-adaf-192163caf189', 'job_status': 'COMPLETED', 'image_export_status': 'COMPLETED', 'stats_export_status': 'NOT_REQUIRED', 'report_status': 'FAILED', 'email_to': None

In [None]:
with db.db(db_path) as conn:
    conn.execute("""UPDATE jobs set report_status = "PENDING" WHERE id= ? """, ('d258473f-b7ab-4a50-bc98-dbd91e6853e8',))

In [None]:
job_id='d258473f-b7ab-4a50-bc98-dbd91e6853e8'
with db.db(db_path) as conn:
    job = conn.execute("SELECT * FROM jobs WHERE id=? LIMIT 1", (job_id,)).fetchone()
    print(dict(job))

    # Report only if Job has finished and reporting is pending
    if not (
        job["job_status"] in ("COMPLETED", "FAILED")
        and job["report_status"] in ("PENDING")
    ):
        print('return')

{'id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8', 'job_status': 'FAILED', 'image_export_status': 'FAILED', 'stats_export_status': 'PENDING', 'report_status': 'PENDING', 'email_to': None, 'error': '| Cannot verify all exports were created/completed successfully | Error generating report for job d258473f-b7ab-4a50-bc98-dbd91e6853e8: dictionary update sequence element #0 has length 18; 2 is required', 'created_at': '2025-08-21T19:22:39.979791+00:00', 'updated_at': '2025-08-26T17:26:15.419950+00:00'}


In [None]:
from pprint import pprint

In [None]:
with db.db(db_path) as conn:
    tasks = conn.execute(
                "SELECT * FROM exports WHERE job_id=? ORDER BY type, state", (job_id,)
            ).fetchall()
    full_job = {"job": dict(job), "tasks": [dict(task) for task in tasks]}
    pprint(full_job)

{'job': {'created_at': '2025-08-21T19:22:39.979791+00:00',
         'email_to': None,
         'error': '| Cannot verify all exports were created/completed '
                  'successfully | Error generating report for job '
                  'd258473f-b7ab-4a50-bc98-dbd91e6853e8: dictionary update '
                  'sequence element #0 has length 18; 2 is required',
         'id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8',
         'image_export_status': 'FAILED',
         'job_status': 'FAILED',
         'report_status': 'PENDING',
         'stats_export_status': 'PENDING',
         'updated_at': '2025-08-26T17:26:15.419950+00:00'},
 'tasks': [{'attempts': 0,
            'created_at': '2025-08-21T19:23:57.341910+00:00',
            'deadline_at': None,
            'error': None,
            'id': '3ae57c56-3f84-49b4-b29d-1ac4694a3ff3',
            'job_id': 'd258473f-b7ab-4a50-bc98-dbd91e6853e8',
            'last_error': None,
            'lease_until': '2025-08-21T20:04:07.323248+0