## 2026 EY AI & Data Challenge - TerraClimate Data Extraction Notebook

This notebooks demonstrates how to access the TerraClimate dataset. TerraClimate is a dataset of monthly climate and climatic water balance for global terrestrial surfaces from 1958 to the present. These data provide important inputs for ecological and hydrological studies at global scales that require high spatial resolution and time-varying data. All data have monthly temporal resolution and a ~4-km (1/24th degree) spatial resolution. This dataset is provided in Zarr format. 

For more information, visit: [terraclimate- overview](https://planetarycomputer.microsoft.com/dataset/terraclimate#overview) 

## Load In Dependencies
The following code installs the required Python libraries (found in the requirements.txt file) in the Snowflake environment to allow successful execution of the remaining notebook code. After running this code for the first time, it is required to ‚Äúrestart‚Äù the kernal so the Python libraries are available in the environment. This is done by selecting the ‚ÄúConnected‚Äù menu above the notebook (next to ‚ÄúRun all‚Äù) and selecting the ‚Äúrestart kernal‚Äù link. Subsequent runs of the notebook do not require this ‚Äúrestart‚Äù process.

In [None]:
!pip install uv
!uv pip install  -r requirements.txt 

In [1]:
import snowflake
from snowflake.snowpark.context import get_active_session
session = get_active_session()

import warnings
warnings.filterwarnings("ignore")

# Data manipulation and analysis
import numpy as np
import pandas as pd

# Multi-dimensional arrays and datasets (e.g., NetCDF, Zarr)
import xarray as xr

from scipy.spatial import cKDTree

# Planetary Computer tools for STAC API access and authentication
import pystac_client
import planetary_computer as pc

from datetime import date
from tqdm import tqdm
import time
import os

## Extracting TerraClimate Data Using API Calls

The API-based method allows us to efficiently access **TerraClimate** data for specific regions and time periods through the [Microsoft Planetary Computer](https://planetarycomputer.microsoft.com/), ensuring scalability and reproducibility of the process.

Through the API, we can extract climate variables such as **Potential Evapotranspiration (PET)**, which represents the atmospheric demand for water. This variable provides important insights into surface moisture balance and helps improve the accuracy of water quality modeling.

This approach ensures consistent, automated retrieval of high-resolution climate data that can be easily integrated with satellite-derived features for comprehensive environmental and hydrological analysis.

### Loading and Mapping TerraClimate Data

This section demonstrates how **TerraClimate climate variables**, such as **Potential Evapotranspiration (PET)**, are loaded and mapped to sampling locations.

- The **load_terraclimate_dataset** function opens the TerraClimate Zarr/NetCDF dataset from the Microsoft Planetary Computer, handling storage options automatically.
- The **filterg** function filters the dataset for the desired time range (2011‚Äì2015) and the spatial extent corresponding to the study region. The resulting data is converted into a pandas DataFrame with standardized column names.
- The **assign_nearest_climate** function maps each sampling location to its **nearest TerraClimate grid point** using a KD-tree and assigns the climate variable values corresponding to the closest timestamp.

This workflow ensures efficient, reproducible retrieval of climate variables, while allowing participants to work with pre-extracted CSV files for faster benchmarking and analysis.

In [2]:
def load_terraclimate_dataset(sleep_sec=1):
    # Pause before opening remote dataset to avoid hammering the API
    time.sleep(sleep_sec)
    catalog = pystac_client.Client.open(
        "https://planetarycomputer.microsoft.com/api/stac/v1",
        modifier=pc.sign_inplace,
    )
    collection = catalog.get_collection("terraclimate")
    asset = collection.assets["zarr-abfs"]

    if "xarray:storage_options" in asset.extra_fields:
        ds = xr.open_zarr(
            asset.href,
            storage_options=asset.extra_fields["xarray:storage_options"],
            consolidated=True,
        )
    else:
        ds = xr.open_dataset(
            asset.href,
            **asset.extra_fields["xarray:open_kwargs"],
        )

    return ds

In [3]:
# --- Filtering function (kept identical) ---
def filterg(ds, var):
    ds_2011_2015 = ds[var].sel(time=slice("2011-01-01", "2015-12-31"))

    df_var_append = []
    for i in tqdm(range(len(ds_2011_2015.time))):
        df_var = ds_2011_2015.isel(time=i).to_dataframe().reset_index()
        df_var_filter = df_var[
            (df_var['lat'] > -35.18) & (df_var['lat'] < -21.72) &
            (df_var['lon'] > 14.97) & (df_var['lon'] < 32.79)
        ]
        df_var_append.append(df_var_filter)

    df_var_final = pd.concat(df_var_append, ignore_index=True)
    print(f"Filtering for {var} completed")

    df_var_final['time'] = df_var_final['time'].astype(str)

    # Column mapping
    col_mapping = {"lat": "Latitude", "lon": "Longitude", "time": "Sample Date"}
    df_var_final = df_var_final.rename(columns=col_mapping)

    return df_var_final

In [4]:
# --- Climate variable assignment function (unchanged logic) ---
def assign_nearest_climate(sa_df, climate_df, var_name):
    """
    Map nearest climate variable values to a new DataFrame 
    containing only the specified variable column.
    """
    sa_coords = np.radians(sa_df[['Latitude', 'Longitude']].values)
    climate_coords = np.radians(climate_df[['Latitude', 'Longitude']].values)

    tree = cKDTree(climate_coords)
    dist, idx = tree.query(sa_coords, k=1)

    nearest_points = climate_df.iloc[idx].reset_index(drop=True)

    sa_df = sa_df.reset_index(drop=True)
    sa_df[['nearest_lat', 'nearest_lon']] = nearest_points[['Latitude', 'Longitude']]

    sa_df['Sample Date'] = pd.to_datetime(sa_df['Sample Date'], dayfirst=True, errors='coerce')
    climate_df['Sample Date'] = pd.to_datetime(climate_df['Sample Date'], dayfirst=True, errors='coerce')

    climate_values = []

    for i in tqdm(range(len(sa_df)), desc=f"Mapping {var_name.upper()} values"):
        sample_date = sa_df.loc[i, 'Sample Date']
        nearest_lat = sa_df.loc[i, 'nearest_lat']
        nearest_lon = sa_df.loc[i, 'nearest_lon']

        subset = climate_df[
            (climate_df['Latitude'] == nearest_lat) &
            (climate_df['Longitude'] == nearest_lon)
        ]

        if subset.empty:
            climate_values.append(np.nan)
            continue

        nearest_idx = (subset['Sample Date'] - sample_date).abs().idxmin()
        climate_values.append(subset.loc[nearest_idx, var_name])

    output_df = pd.DataFrame({var_name: climate_values})

    
    return output_df

### Extracting features for the training dataset

In [5]:
Water_Quality_df = pd.read_csv("water_quality_training_dataset.csv")
display(Water_Quality_df.head(5))

In [6]:
Water_Quality_df.shape

In [7]:
VARS_TO_EXTRACT   = ['ppt', 'soil', 'def', 'aet', 'q']
chunksize         = 200
sleep_between     = 2
output_path       = "terraclimate_new_features_training.csv"
tmp_path          = "/tmp/terraclimate_new_features_training.csv"
progress_path     = "terraclimate_train_progress.txt"
tmp_progress_path = "/tmp/terraclimate_train_progress.txt"

ds = load_terraclimate_dataset()

# ‚îÄ‚îÄ Resume: read how many rows were already confirmed saved ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
if os.path.exists(progress_path):
    with open(progress_path) as f:
        rows_done = int(f.read().strip())
    print(f"üìÇ Resuming from row {rows_done}")
else:
    rows_done = 0
    print("üÜï Starting from scratch")

remaining_df  = Water_Quality_df.iloc[rows_done:].reset_index(drop=True)
total_batches = (len(remaining_df) + chunksize - 1) // chunksize
print(f"üìä Remaining: {len(remaining_df)} rows | Batches: {total_batches}\n")

if len(remaining_df) == 0:
    print("‚úÖ Already complete!")
    Terraclimate_training_df = pd.read_csv(output_path)
else:
    all_dfs = [pd.read_csv(output_path)] if rows_done > 0 else []

    for batch_num, i in enumerate(tqdm(range(0, len(remaining_df), chunksize),
                                       desc="Batches", total=total_batches), start=1):
        chunk = remaining_df.iloc[i : i + chunksize].reset_index(drop=True)

        print(f"\n‚è≥ Batch {batch_num}/{total_batches} ‚Äî global rows {rows_done + i} to {rows_done + i + len(chunk) - 1}")
        t0 = time.time()

        try:
            var_dfs = []
            for var in VARS_TO_EXTRACT:
                tc_param = filterg(ds, var)
                result   = assign_nearest_climate(chunk, tc_param, var)
                var_dfs.append(result[var])

            chunk_features = pd.concat(var_dfs, axis=1)
            chunk_features['Latitude']    = chunk['Latitude'].values
            chunk_features['Longitude']   = chunk['Longitude'].values
            chunk_features['Sample Date'] = chunk['Sample Date'].values
            all_dfs.append(chunk_features)

            elapsed = time.time() - t0
            print(f"   ‚úÖ Batch {batch_num} done in {elapsed:.1f}s")

            partial   = pd.concat(all_dfs, ignore_index=True)
            confirmed = rows_done + i + len(chunk)

            partial.to_csv(output_path, index=False)
            partial.to_csv(tmp_path, index=False)

            with open(progress_path, "w") as f:
                f.write(str(confirmed))
            with open(tmp_progress_path, "w") as f:
                f.write(str(confirmed))

            print(f"   üíæ {len(partial)} rows saved | progress: {confirmed}")

            session.sql(f"PUT file://{tmp_path} 'snow://workspace/USER$.PUBLIC.\"EY-AI-and-Data-Challenge\"/versions/live/' AUTO_COMPRESS=FALSE OVERWRITE=TRUE").collect()
            session.sql(f"PUT file://{tmp_progress_path} 'snow://workspace/USER$.PUBLIC.\"EY-AI-and-Data-Challenge\"/versions/live/' AUTO_COMPRESS=FALSE OVERWRITE=TRUE").collect()
            print(f"   ‚òÅÔ∏è  Snowflake updated (batch {batch_num}/{total_batches})")

        except Exception as e:
            print(f"   ‚ùå Error on batch {batch_num}: {e}")
            print(f"   ‚ñ∂Ô∏è  Re-run this cell to resume from row {confirmed if 'confirmed' in dir() else rows_done}")
            break

        if batch_num < total_batches:
            print(f"   üí§ Sleeping {sleep_between}s...")
            time.sleep(sleep_between)

    Terraclimate_training_df = pd.concat(all_dfs, ignore_index=True)
    print(f"\nüéâ Done! Total rows: {len(Terraclimate_training_df)}")

In [9]:
# Preview File
display(Terraclimate_training_df.head())

In [None]:
Terraclimate_training_df.to_csv("/tmp/terraclimate_new_features_training.csv", index=False)

In [None]:
session.sql("""
    PUT file:///tmp/terraclimate_new_features_training.csv
    'snow://workspace/USER$.PUBLIC."EY-AI-and-Data-Challenge"/versions/live/'
    AUTO_COMPRESS=FALSE
    OVERWRITE=TRUE
""").collect()

print("File saved! Refresh the browser to see the files in the sidebar")

**Note:** If you're using your own workspace, remember to replace "EY-AI-and-Data-Challenge" with your workspace name in the file path.

### Extracting features for the validation dataset

In [10]:
Validation_df=pd.read_csv('submission_template.csv')
display(Validation_df.head())

In [11]:
Validation_df.shape

In [12]:
VARS_TO_EXTRACT   = ['ppt', 'soil', 'def', 'aet', 'q']
chunksize         = 200
sleep_between     = 2
val_output_path   = "terraclimate_new_features_validation.csv"
val_tmp_path      = "/tmp/terraclimate_new_features_validation.csv"
progress_path     = "terraclimate_val_progress.txt"
tmp_progress_path = "/tmp/terraclimate_val_progress.txt"

# ‚îÄ‚îÄ Resume: read how many rows were already confirmed saved ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
if os.path.exists(progress_path):
    with open(progress_path) as f:
        rows_done = int(f.read().strip())
    print(f"üìÇ Resuming from row {rows_done}")
else:
    rows_done = 0
    print("üÜï Starting from scratch")

remaining_df  = Validation_df.iloc[rows_done:].reset_index(drop=True)
total_batches = (len(remaining_df) + chunksize - 1) // chunksize
print(f"üìä Remaining: {len(remaining_df)} rows | Batches: {total_batches}\n")

if len(remaining_df) == 0:
    print("‚úÖ Already complete!")
    Terraclimate_validation_df = pd.read_csv(val_output_path)
else:
    val_all_dfs = [pd.read_csv(val_output_path)] if rows_done > 0 else []

    for batch_num, i in enumerate(tqdm(range(0, len(remaining_df), chunksize),
                                       desc="Batches", total=total_batches), start=1):
        chunk = remaining_df.iloc[i : i + chunksize].reset_index(drop=True)

        print(f"\n‚è≥ Batch {batch_num}/{total_batches} ‚Äî global rows {rows_done + i} to {rows_done + i + len(chunk) - 1}")
        t0 = time.time()

        try:
            var_dfs = []
            for var in VARS_TO_EXTRACT:
                tc_param = filterg(ds, var)
                result   = assign_nearest_climate(chunk, tc_param, var)
                var_dfs.append(result[var])

            chunk_features = pd.concat(var_dfs, axis=1)
            chunk_features['Latitude']    = chunk['Latitude'].values
            chunk_features['Longitude']   = chunk['Longitude'].values
            chunk_features['Sample Date'] = chunk['Sample Date'].values
            val_all_dfs.append(chunk_features)

            elapsed = time.time() - t0
            print(f"   ‚úÖ Batch {batch_num} done in {elapsed:.1f}s")

            partial   = pd.concat(val_all_dfs, ignore_index=True)
            confirmed = rows_done + i + len(chunk)

            partial.to_csv(val_output_path, index=False)
            partial.to_csv(val_tmp_path, index=False)

            with open(progress_path, "w") as f:
                f.write(str(confirmed))
            with open(tmp_progress_path, "w") as f:
                f.write(str(confirmed))

            print(f"   üíæ {len(partial)} rows saved | progress: {confirmed}")

            session.sql(f"PUT file://{val_tmp_path} 'snow://workspace/USER$.PUBLIC.\"EY-AI-and-Data-Challenge\"/versions/live/' AUTO_COMPRESS=FALSE OVERWRITE=TRUE").collect()
            session.sql(f"PUT file://{tmp_progress_path} 'snow://workspace/USER$.PUBLIC.\"EY-AI-and-Data-Challenge\"/versions/live/' AUTO_COMPRESS=FALSE OVERWRITE=TRUE").collect()
            print(f"   ‚òÅÔ∏è  Snowflake updated (batch {batch_num}/{total_batches})")

        except Exception as e:
            print(f"   ‚ùå Error on batch {batch_num}: {e}")
            print(f"   ‚ñ∂Ô∏è  Re-run this cell to resume from row {confirmed if 'confirmed' in dir() else rows_done}")
            break

        if batch_num < total_batches:
            print(f"   üí§ Sleeping {sleep_between}s...")
            time.sleep(sleep_between)

    Terraclimate_validation_df = pd.concat(val_all_dfs, ignore_index=True)
    print(f"\nüéâ Done! Total rows: {len(Terraclimate_validation_df)}")

In [13]:
Terraclimate_validation_df['Latitude'] = Validation_df['Latitude']
Terraclimate_validation_df['Longitude'] = Validation_df['Longitude']
Terraclimate_validation_df['Sample Date'] = Validation_df['Sample Date']
Terraclimate_validation_df = Terraclimate_validation_df[['Latitude', 'Longitude', 'Sample Date', 'ppt', 'soil', 'def', 'aet', 'q']]
Terraclimate_validation_df.to_csv('terraclimate_new_features_validation.csv', index=False)

In [14]:
# Preview File
display(Terraclimate_validation_df.head())

In [None]:
Terraclimate_validation_df.to_csv("/tmp/terraclimate_new_features_validation.csv", index=False)

In [None]:
session.sql("""
    PUT file:///tmp/terraclimate_new_features_validation.csv
    'snow://workspace/USER$.PUBLIC."EY-AI-and-Data-Challenge"/versions/live/'
    AUTO_COMPRESS=FALSE
    OVERWRITE=TRUE
""").collect()

print("File saved! Refresh the browser to see the files in the sidebar")

**Note:** If you're using your own workspace, remember to replace "EY-AI-and-Data-Challenge" with your workspace name in the file path.