## Welcome to your notebook.


#### Run this cell to connect to your GIS and get started:

In [1]:
from arcgis.gis import GIS
gis = GIS("home")

In [2]:
!pip install earthengine-api geemap --q

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)


[33mDEPRECATION: Loading egg at /opt/conda/lib/python3.11/site-packages/tflite_model_maker-0.3.4-py3.11.egg is deprecated. pip 25.1 will enforce this behaviour change. A possible replacement is to use pip for package installation. Discussion can be found at https://github.com/pypa/pip/issues/12330[0m[33m
[0m

In [3]:
import ee
import os
import numpy as np
import pandas as pd
from datetime import datetime
from datetime import timedelta
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
from arcgis.features import FeatureLayerCollection

In [4]:
# Replace with your actual values
SERVICE_ACCOUNT = 'gee-daily-weather@sre-2025.iam.gserviceaccount.com'
KEY_PATH = '/arcgis/home/data/key/sre-2025-2b6af5c825e3.json'

# Authenticate and initialize Earth Engine
credentials = ee.ServiceAccountCredentials(SERVICE_ACCOUNT, KEY_PATH)
ee.Initialize(credentials)

In [5]:
# yesterday = (datetime.now() - timedelta(days=1)).date()

@dataclass
class Config:

    grid_asset_path: str = "projects/ee-thaimunhoz98/assets/MS_basin" #location in my GEE Assets

    # DataSource Collection names and configurations
    precipitation_dataset = ["precipitation", "RTMA", "NOAA/NWS/RTMA", "ACPC01", "hourly"]
    temperature_mean_dataset = ["temperature_mean", "RTMA", "NOAA/NWS/RTMA", "TMP", "hourly", "celsius"]
    temperature_min_dataset = ["temperature_min", "RTMA", "NOAA/NWS/RTMA", "TMP", "hourly", "celsius"]
    temperature_max_dataset = ["temperature_max", "RTMA", "NOAA/NWS/RTMA", "TMP", "hourly", "celsius"]
    humidity_dataset = ["humidity", "RTMA", "NOAA/NWS/RTMA", "SPFH", "hourly"]
    wind_dataset = ["wind", "RTMA", "NOAA/NWS/RTMA", "WIND", "hourly"]
    solar_rad_dataset = ["solar_rad", "GRIDMET", "IDAHO_EPSCOR/GRIDMET", "srad", "daily"]
    soil_moisture_dataset =["soil_moisture", "SMAP", "NASA/SMAP/SPL4SMGP/007", "sm_surface", "hourly"]
    evapo_dataset = ["evapotranspiration", "SMAP","NASA/SMAP/SPL4SMGP/007", "land_evapotranspiration_flux", "hourly"]


In [6]:
def split_collection(feature_collection, batch_size):
    ''' Split grid into more than one collection to handle GEE data limit'''
    size = feature_collection.size().getInfo()
    return [
        feature_collection.toList(batch_size, i)
        for i in range(0, size, batch_size)
    ]

def load_asset(asset_path):
    '''Load the full grid from a GEE asset'''
    grid_fc = ee.FeatureCollection(asset_path)
    return grid_fc

def celsius_to_fahrenheit(image, input_band, output_band="LST_F"):
    ''' Convert Celsius degrees temperature to Fahrenheit'''
    if output_band is None:
        output_band = input_band.replace("TMP", "LST_F")
    lst_f = image.expression(
        "(x * (9 / 5)) + 32",
        {"x": image.select(input_band)}
    ).rename(output_band)
    return image.addBands(lst_f)

def kelvin_to_fahrenheit(image, input_band, output_band="LST_F"):
    lst_f = image.expression(
        "((x - 273.15) * (9 / 5)) + 32",
        {
            "x": image.select(input_band)
        }
    ).rename(output_band)
    return image.addBands(lst_f)

def export_table(input_results, var_name, band_name, output_file):
    ''' Export results from GEE as a table to feed the Dashboard'''
    flat_data = input_results[0]
    df = pd.json_normalize(flat_data)
    try:
        result_table = df.drop(columns=['geometry.type', 'geometry.coordinates']).rename(columns={band_name: var_name})
    except:
        print(result_table)
        result_table = df.rename(columns={band_name: var_name})
    #result_table = pd.DataFrame(input_results).rename(columns={band_name: var_name})
    result_table['date_str'] = result_table['date'].dt.strftime('%m/%d/%Y')

    result_table = result_table.drop_duplicates(subset=['TARGET_FID', 'date'], keep='last')

    new_order = ['date', 'date_str', 'TARGET_FID', var_name]
    result_table = result_table[new_order]
    result_table.to_csv(output_file)

def return_last_day(gpd_file_path):
    ''' Read a shapefile and return the last day of available data'''
    gdf = gpd.read_file(gpd_file_path)
    return gdf.columns[-2]

In [7]:
def run_historical(variable, grid_fc, start_date, end_date, collection_name, band_name, temporal_resolution, data_source, input_unit):

    ''' Access data from collection and return a table related to each grid cell in the fishnet '''

    date_list = pd.date_range(start=start_date, end=end_date, freq='D')

    #grid_fc = toolbox.gdf2ee(grid_shapefile)

    all_results = []

    for i in np.arange(0, len(date_list) - 1):

        if variable == "precipitation" or variable == "evapotranspiration":
            if temporal_resolution == "hourly":
                dataset = ee.ImageCollection(collection_name) \
                    .filterDate(date_list[0], date_list[1]) \
                    .select(band_name) \
                    .sum()
            else:
                dataset = ee.ImageCollection(collection_name) \
                    .filterDate(date_list[0], date_list[1]) \
                    .select(band_name) \
                    .first()

        elif variable == "temperature_min":
            dataset = ee.ImageCollection(collection_name) \
                    .filterDate(date_list[0], date_list[1]) \
                    .select(band_name) \
                    .min()
        elif variable == "temperature_max":
            dataset = ee.ImageCollection(collection_name) \
                    .filterDate(date_list[0], date_list[1]) \
                    .select(band_name) \
                    .max()
        else:
            if temporal_resolution == "hourly":
                dataset = ee.ImageCollection(collection_name) \
                    .filterDate(date_list[0], date_list[1]) \
                    .select(band_name) \
                    .mean()
            else:
                dataset = ee.ImageCollection(collection_name) \
                    .filterDate(date_list[0], date_list[1]) \
                    .select(band_name) \
                    .first()
        dataset = ee.Image(dataset)

        # Check for null image
        info = dataset.getInfo()
        if not info or 'bands' not in info:
            print(f"[Error] Null image encountered for {variable} on {date_list[i]}. Skipping...")
            return None  # Signal the main loop to break or skip


        if input_unit is not None:
            if input_unit == "celsius":
                dataset = celsius_to_fahrenheit(dataset, input_band=band_name).select("LST_F")
            elif input_unit == "kelvin":
                dataset = kelvin_to_fahrenheit(dataset, input_band=band_name).select("LST_F")

        def make_compute_mean(dataset):
            def compute_mean(feature):
                mean = dataset.reduceRegion(
                    reducer=ee.Reducer.mean(),
                    geometry=feature.geometry(),
                    scale=9000,
                    maxPixels=1e9
                )
                return feature.set(mean)
            return compute_mean

        try:
            compute_mean = make_compute_mean(dataset)
            reduced = grid_fc.map(compute_mean).getInfo()

            print(f"Starting export for date {date_list[i]}")
            for feat in reduced['features']:
                props = feat['properties']
                props['date'] = date_list[i]
                props['geometry'] = feat['geometry']
                all_results.append(props)

        except Exception as e:
            print(f"Temp [Warning] Date {date_list[i]} failed on full grid_fc: {e}")
            grid_batches = split_collection(grid_fc, batch_size=500)

            def process_batch(batch):
                try:
                    compute_mean = make_compute_mean(dataset)
                    grid_fc_batch = ee.FeatureCollection(batch)
                    reduced = grid_fc_batch.map(compute_mean).getInfo()

                    results = []
                    if reduced and 'features' in reduced:
                        for feat in reduced['features']:
                            props = feat['properties']
                            props['date'] = date_list[i]
                            props['geometry'] = feat['geometry']
                            results.append(props)
                    return results
                except Exception as err:
                    print(f"[Error] Failed batch for date {date_list[i]}: {err}")

            with ThreadPoolExecutor(max_workers=5) as executor:
                futures = [executor.submit(process_batch, batch) for batch in grid_batches]
                for future in as_completed(futures):
                    try:
                        result = future.result()
                        all_results.extend(result)
                    except:
                        break

        for item in all_results:
            if "LST_F" in item and variable not in item:
                item[variable] = item.pop("LST_F")

    return all_results

In [8]:
# === CONFIG & DATE SETUP ===
config = Config()
tracking_file = "/arcgis/home/data/date_check/last_run_bulk.csv"

# Load the last run date
if os.path.exists(tracking_file):
    df_out = pd.read_csv(tracking_file)
    df_out['last_run_date'] = pd.to_datetime(df_out['last_run_date'])
    latest_date = df_out['last_run_date'].max().date()
    start_date = latest_date + timedelta(days=1)
else:
    # First run: manually define the start date
    start_date = pd.to_datetime("2023-02-03").date()

# Manually define the end date (not inclusive)
end_date = pd.to_datetime(pd.Timestamp.today()).date()

print(f"Running from {start_date} to {end_date - timedelta(days=1)}")

# === DATASETS & GRID SETUP ===
datasets = [
    (*config.solar_rad_dataset, None),
    (*config.humidity_dataset, None),
    (*config.wind_dataset, None),
    (*config.precipitation_dataset, None),
    config.temperature_max_dataset,
    config.temperature_min_dataset,
    config.temperature_mean_dataset
]

grid_fc = load_asset(config.grid_asset_path)

# === DAILY LOOP ===
current = start_date
while current < end_date:
    print(f"\n=== Processing {current} ===")
    time1 = time.time()

    skip_day = False

    for variable, data_source, collection_name, band_name, temporal_resolution, input_unit in datasets:
        print(f"Running {variable} for {current}")
        result = run_historical(
            variable=variable,
            grid_fc=grid_fc,
            start_date=current,
            end_date=current + timedelta(days=1),
            collection_name=collection_name,
            band_name=band_name,
            temporal_resolution=temporal_resolution,
            data_source=data_source,
            input_unit=input_unit
        )

        if result is None:
            print(f"Skipping day {current} due to null image.")
            skip_day = True
            break  # Skip the rest of the variables

        output_table = f"/arcgis/home/data/first_day/{variable}.csv"
        try:
            export_table([result], variable, band_name, output_table)
        except Exception as e:
            print(f"[Error] Failed to export {variable} on {current}: {e}")
            skip_day = True
            break

    # If we successfully ran all variables, merge and upload
    if not skip_day:
        # === MERGE ===
        variables = [
            "humidity", "precipitation", "solar_rad",
            "temperature_max", "temperature_mean", "temperature_min", "wind"
        ]
        base_path = "/arcgis/home/data/first_day"
        merged_df = pd.read_csv(os.path.join(base_path, f"{variables[0]}.csv"))
        for var in variables[1:]:
            df = pd.read_csv(os.path.join(base_path, f"{var}.csv"))
            merged_df = pd.merge(merged_df, df, on="TARGET_FID", how="outer", suffixes=('', '_dup'))
            merged_df = merged_df.loc[:, ~merged_df.columns.str.endswith('_dup')]
        merged_df.drop(columns=['Unnamed: 0', 'date_str'], errors='ignore', inplace=True)
        merged_df.to_csv("/arcgis/home/data/first_day/merged_output.csv", index=False)
    
        # === PUSH TO FEATURE TABLE ===
        item_id = "e7c7151adfd549239c31f9cc2363831c"
        item = gis.content.get(item_id)
        flc = FeatureLayerCollection.fromitem(item)
        related_table_combined = flc.tables[0]
    
        df_combined = pd.read_csv("/arcgis/home/data/first_day/merged_output.csv")
        df_combined['date'] = pd.to_datetime(df_combined['date'], errors='coerce')
        df_combined['date'] = df_combined['date'].apply(lambda dt: dt.replace(hour=8) if pd.notnull(dt) else dt)
    
        features_combined = [{"attributes": rec} for rec in df_combined.to_dict(orient='records')]
    
        def parallel_batch_edit_features(layer, features, label="", batch_size=500, max_workers=5, max_retries=3):
            batches = [features[i:i + batch_size] for i in range(0, len(features), batch_size)]
            def upload_batch(batch_index, batch, label):
                for attempt in range(1, max_retries + 1):
                    try:
                        result = layer.edit_features(adds=batch)
                        if result.get("addResults", []):
                            print(f"[Success] {label} Uploaded batch {batch_index + 1}/{len(batches)} on attempt {attempt}")
                            return result
                        else:
                            raise Exception(f"No addResults: {result}")
                    except Exception as e:
                        print(f"[Retry {attempt}] {label} Batch {batch_index + 1} failed: {e}")
                        time.sleep(2)
                print(f"[Failed] {label} Batch {batch_index + 1} permanently failed.")
                return None
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = [executor.submit(upload_batch, i, b, label) for i, b in enumerate(batches)]
                for f in as_completed(futures): f.result()
    
        parallel_batch_edit_features(related_table_combined, features_combined, label="combined")
    
        # === UPDATE DATE TRACKER ===
        last_date = df_combined['date'].max()
    
        if os.path.exists(tracking_file):
            df_out = pd.read_csv(tracking_file)
            df_out['last_run_date'] = pd.to_datetime(df_out['last_run_date'])
            if last_date not in df_out['last_run_date'].values:
                df_out = pd.concat([df_out, pd.DataFrame({'last_run_date': [last_date]})], ignore_index=True)
        else:
            df_out = pd.DataFrame({'last_run_date': [last_date]})
    
        df_out.sort_values('last_run_date', inplace=True)
        df_out.to_csv(tracking_file, index=False)
    
        print(f"✅✅✅✅✅ Appended {last_date.date()} to {tracking_file}")
        print(f"⏱️  Elapsed time for {current}: {time.time() - time1:.2f} seconds")
    else:
        print(f"⏭️  Skipped {current} due to null input data.")
    
    # Always increment date
    current += timedelta(days=1)


Running from 2025-07-27 to 2025-07-29

=== Processing 2025-07-27 ===
Running solar_rad for 2025-07-27




Running humidity for 2025-07-27




Running wind for 2025-07-27




Running precipitation for 2025-07-27




Running temperature_max for 2025-07-27




Running temperature_min for 2025-07-27




Running temperature_mean for 2025-07-27




[Success] combined Uploaded batch 1/83 on attempt 1


[Success] combined Uploaded batch 2/83 on attempt 1
[Success] combined Uploaded batch 5/83 on attempt 1
[Success] combined Uploaded batch 3/83 on attempt 1
[Success] combined Uploaded batch 4/83 on attempt 1


[Success] combined Uploaded batch 6/83 on attempt 1
[Success] combined Uploaded batch 9/83 on attempt 1
[Success] combined Uploaded batch 8/83 on attempt 1


[Success] combined Uploaded batch 10/83 on attempt 1
[Success] combined Uploaded batch 7/83 on attempt 1


[Success] combined Uploaded batch 11/83 on attempt 1


[Success] combined Uploaded batch 13/83 on attempt 1
[Success] combined Uploaded batch 14/83 on attempt 1
[Success] combined Uploaded batch 12/83 on attempt 1


[Success] combined Uploaded batch 15/83 on attempt 1


[Success] combined Uploaded batch 17/83 on attempt 1
[Success] combined Uploaded batch 16/83 on attempt 1
[Success] combined Uploaded batch 18/83 on attempt 1
[Success] combined Uploaded batch 19/83 on attempt 1


[Success] combined Uploaded batch 20/83 on attempt 1


[Success] combined Uploaded batch 24/83 on attempt 1
[Success] combined Uploaded batch 21/83 on attempt 1


[Success] combined Uploaded batch 23/83 on attempt 1
[Success] combined Uploaded batch 22/83 on attempt 1


[Success] combined Uploaded batch 25/83 on attempt 1


[Success] combined Uploaded batch 27/83 on attempt 1
[Success] combined Uploaded batch 26/83 on attempt 1
[Success] combined Uploaded batch 29/83 on attempt 1


[Success] combined Uploaded batch 28/83 on attempt 1


[Success] combined Uploaded batch 30/83 on attempt 1
[Success] combined Uploaded batch 31/83 on attempt 1


[Success] combined Uploaded batch 33/83 on attempt 1
[Success] combined Uploaded batch 34/83 on attempt 1
[Success] combined Uploaded batch 32/83 on attempt 1


[Success] combined Uploaded batch 35/83 on attempt 1


[Success] combined Uploaded batch 36/83 on attempt 1
[Success] combined Uploaded batch 37/83 on attempt 1


[Success] combined Uploaded batch 38/83 on attempt 1
[Success] combined Uploaded batch 39/83 on attempt 1


[Success] combined Uploaded batch 40/83 on attempt 1


[Success] combined Uploaded batch 42/83 on attempt 1
[Success] combined Uploaded batch 41/83 on attempt 1


[Success] combined Uploaded batch 43/83 on attempt 1
[Success] combined Uploaded batch 44/83 on attempt 1


[Success] combined Uploaded batch 45/83 on attempt 1
[Success] combined Uploaded batch 46/83 on attempt 1
[Success] combined Uploaded batch 47/83 on attempt 1


[Success] combined Uploaded batch 48/83 on attempt 1
[Success] combined Uploaded batch 49/83 on attempt 1


[Success] combined Uploaded batch 52/83 on attempt 1
[Success] combined Uploaded batch 50/83 on attempt 1
[Success] combined Uploaded batch 53/83 on attempt 1


[Success] combined Uploaded batch 51/83 on attempt 1


[Success] combined Uploaded batch 54/83 on attempt 1


[Success] combined Uploaded batch 55/83 on attempt 1
[Success] combined Uploaded batch 56/83 on attempt 1
[Success] combined Uploaded batch 57/83 on attempt 1


[Success] combined Uploaded batch 58/83 on attempt 1


[Success] combined Uploaded batch 59/83 on attempt 1


[Success] combined Uploaded batch 60/83 on attempt 1
[Success] combined Uploaded batch 61/83 on attempt 1


[Success] combined Uploaded batch 62/83 on attempt 1


[Success] combined Uploaded batch 63/83 on attempt 1


[Success] combined Uploaded batch 65/83 on attempt 1


[Success] combined Uploaded batch 66/83 on attempt 1


[Success] combined Uploaded batch 64/83 on attempt 1
[Success] combined Uploaded batch 67/83 on attempt 1


[Success] combined Uploaded batch 68/83 on attempt 1


[Success] combined Uploaded batch 69/83 on attempt 1
[Success] combined Uploaded batch 70/83 on attempt 1


[Success] combined Uploaded batch 73/83 on attempt 1
[Success] combined Uploaded batch 72/83 on attempt 1
[Success] combined Uploaded batch 71/83 on attempt 1


[Success] combined Uploaded batch 74/83 on attempt 1
[Success] combined Uploaded batch 75/83 on attempt 1


[Success] combined Uploaded batch 78/83 on attempt 1
[Success] combined Uploaded batch 76/83 on attempt 1


[Success] combined Uploaded batch 77/83 on attempt 1


[Success] combined Uploaded batch 80/83 on attempt 1


[Success] combined Uploaded batch 79/83 on attempt 1
[Success] combined Uploaded batch 81/83 on attempt 1
[Success] combined Uploaded batch 82/83 on attempt 1


[Success] combined Uploaded batch 83/83 on attempt 1


✅✅✅✅✅ Appended 2025-07-27 to /arcgis/home/data/date_check/last_run_bulk.csv
⏱️  Elapsed time for 2025-07-27: 1349.02 seconds

=== Processing 2025-07-28 ===
Running solar_rad for 2025-07-28


[Error] Null image encountered for solar_rad on 2025-07-28 00:00:00. Skipping...
Skipping day 2025-07-28 due to null image.
⏭️  Skipped 2025-07-28 due to null input data.

=== Processing 2025-07-29 ===
Running solar_rad for 2025-07-29
[Error] Null image encountered for solar_rad on 2025-07-29 00:00:00. Skipping...
Skipping day 2025-07-29 due to null image.
⏭️  Skipped 2025-07-29 due to null input data.
