# PISM TERRA for AGU 2025

In [None]:
import s3fs
from pathlib import Path
import xarray as xr
import matplotlib.pylab as plt
import re
from functools import partial
from collections import OrderedDict
import numpy as np
import matplotlib as mpl
import warnings
import requests
from urllib.request import urlopen

In [None]:
def preprocess_config(
    ds,
    regexp: str = "id_(.+?)_",
    dim: str = "exp_id",
    drop_vars: list[str] | None = None,
    drop_dims: list[str] = ["nv4"],
) -> xr.Dataset:
    """
    Add experiment identifier to the dataset.

    This function processes the dataset by extracting an experiment identifier from the filename
    using a regular expression, adding it as a new dimension, and optionally dropping specified
    variables and dimensions from the dataset.

    Parameters
    ----------
    ds : xarray.Dataset
        The input dataset to be processed.
    regexp : str, optional
        The regular expression pattern to extract the experiment identifier from the filename, by default "id_(.+?)_".
    dim : str, optional
        The name of the new dimension to be added to the dataset, by default "exp_id".
    drop_vars : list[str]| None, optional
        A list of variable names to be dropped from the dataset, by default None.
    drop_dims : list[str], optional
        A list of dimension names to be dropped from the dataset, by default ["nv4"].

    Returns
    -------
    xarray.Dataset
        The processed dataset with the experiment identifier added as a new dimension, and specified variables and dimensions dropped.

    Raises
    ------
    AssertionError
        If the regular expression does not match any part of the filename.
    """

    if dim not in ds.dims:
        m_id_re = re.search(regexp, ds.encoding["source"])
        ds = ds.expand_dims(dim)
        assert m_id_re is not None
        m_id: str | int
        try:
            m_id = int(m_id_re.group(1))
        except:
            m_id = str(m_id_re.group(1))
        ds[dim] = [m_id]

    p_config = ds["pism_config"]

    # List of suffixes to exclude
    suffixes_to_exclude = ["_doc", "_type", "_units", "_option", "_choices"]

    # Filter the dictionary
    config = {k: v for k, v in p_config.attrs.items() if not any(k.endswith(suffix) for suffix in suffixes_to_exclude)}
    if "geometry.front_retreat.prescribed.file" not in config.keys():
        config["geometry.front_retreat.prescribed.file"] = "false"

    config_sorted = OrderedDict(sorted(config.items()))

    pc_keys = np.array(list(config_sorted.keys()))
    pc_vals = np.array(list(config_sorted.values()))

    pism_config = xr.DataArray(
        pc_vals.reshape(-1, 1),
        dims=["pism_config_axis", dim],
        coords={"pism_config_axis": pc_keys, dim: [m_id]},
        name="pism_config",
    )
    ds = xr.merge(
        [
            ds.drop_vars(["pism_config", "run_stats"], errors="ignore").drop_dims(
                ["pism_config_axis", "run_stats_axis"], errors="ignore"
            ),
            pism_config,
        ]
    )
    return ds.drop_vars(drop_vars, errors="ignore").drop_dims(drop_dims, errors="ignore")


def pick(files, pattern):
    for f in files:
        if pattern in Path(f).name:
            return f
    return None

@xr.register_dataset_accessor("utils")
class UtilsMethods:
    """
    Utils methods for xarray Dataset.

    This class is used to add custom methods to xarray Dataset objects. The methods can be accessed via the 'utils' attribute.

    Parameters
    ----------
    xarray_obj : xr.Dataset
        The xarray Dataset to which to add the custom methods.
    """

    def __init__(self, xarray_obj: xr.Dataset):
        """
        Initialize the UtilsMethods class.

        Parameters
        ----------
        xarray_obj : xr.Dataset
            The xarray Dataset to which to add the custom methods.
        """
        self._obj = xarray_obj

    def init(self):
        """
        Do-nothing method.

        This method is needed to work with joblib Parallel.
        """

    def drop_nonnumeric_vars(self, errors: str = "ignore") -> xr.Dataset:
        """
        Drop non-numeric variables from the xarray Dataset.

        This method removes all variables from the xarray Dataset that do not have a numeric data type.

        Parameters
        ----------
        errors : {'ignore', 'raise'}, optional
            If 'ignore', suppress error and only drop existing variables.
            If 'raise', raise an error if any of the variables are not found in the dataset.
            Default is 'ignore'.

        Returns
        -------
        xarray.Dataset
            A new xarray Dataset with only numeric variables.

        Examples
        --------
        >>> import xarray as xr
        >>> import numpy as np
        >>> data = xr.Dataset({
        ...     'temperature': (('x', 'y'), [[15.5, 16.2], [14.8, 15.1]]),
        ...     'humidity': (('x', 'y'), [[80, 85], [78, 82]]),
        ...     'location': (('x', 'y'), [['A', 'B'], ['C', 'D']])
        ... })
        >>> processor = DataProcessor(data)
        >>> numeric_data = processor.drop_nonnumeric_vars()
        >>> print(numeric_data)
        <xarray.Dataset>
        Dimensions:     (x: 2, y: 2)
        Dimensions without coordinates: x, y
        Data variables:
            temperature  (x, y) float64 15.5 16.2 14.8 15.1
            humidity     (x, y) int64 80 85 78 82
        """
        nonnumeric_vars = [v for v in self._obj.data_vars if not np.issubdtype(self._obj[v].dtype, np.number)]

        return self._obj.drop_vars(nonnumeric_vars, errors=errors)


In [None]:
percentiles = [0.05, 0.95]
percentile_range = (percentiles[1] - percentiles[0]) * 100

fontsize = 6

sim_alpha = 0.6
sim_cmap = ["#CC6677", "#882255"]
obs_alpha = 1.0
obs_cmap = ["0.8", "0.9"]
hist_cmap = ["#a6cee3", "#1f78b4"]

In [None]:
import hyp3_sdk as sdk
import s3fs
from urllib.request import urlopen

PISM_CLOUD_BUCKET = 'hyp3-pism-cloud-test-contentbucket-zs9dctrqrlvx'

user_id = "aaschwanden"

RGI_IDS = [
    'RGI2000-v7.0-C-01-09429',  # Malaspina
    'RGI2000-v7.0-C-01-04374',  # Wrangle Mountains
    'RGI2000-v7.0-C-01-14907',  # TBD
]

campaign = "_era5_ec2_1year"
JOB_NAMES = [rgi_id + campaign for rgi_id in RGI_IDS]

hyp3 = sdk.HyP3('https://pism-cloud-test.asf.alaska.edu')
jobs = sdk.Batch()
for job_name in JOB_NAMES:
    jobs += hyp3.find_jobs(name=job_name, user_id=user_id, job_type='PISM_TERRA_EXECUTE')


s3_ids = {}
for job in jobs:
    if job.job_type == 'PISM_TERRA_EXECUTE':
        rgi_id = job.name.split(campaign)[0]
        print(rgi_id)
        print("-" * 80)
        print(job.status_code)
        if job.status_code == "SUCCEEDED":
            s3_id = f's3://{PISM_CLOUD_BUCKET}/{job.job_id}/'
            s3_ids[rgi_id] = s3_id
        # if job.logs is not None and len(job.logs) > 0:
        #     url = job.logs[0]
        #     with urlopen(url) as f:
        #         print(f.read().decode().splitlines()[-1])
        print("\n")

In [None]:
job


In [None]:
fs = s3fs.S3FileSystem(anon=True)  # or anon=True if public

scalar_files = []
spatial_files = []

for rgi_id, s3_id in s3_ids.items():
    print(rgi_id, s3_id)
    prefix = f"{s3_id}{rgi_id}/output/spatial/"
    files = fs.ls(prefix)  # returns 'bucket/key' style strings

    spatial_file = pick(files, "clipped_spatial_")
    scalar_file  = pick(files, "fldsum_spatial_")

    if spatial_file is not None:
        spatial_files.append(f"s3://{spatial_file}")
    if scalar_file is not None:
        scalar_files.append(f"s3://{scalar_file}")


scalar_ds = xr.open_mfdataset(scalar_files, preprocess=partial(preprocess_config, drop_vars=["wall_clock_time"]), parallel=True, engine="h5netcdf")
normalized_ds = scalar_ds.utils.drop_nonnumeric_vars() - scalar_ds.isel({"time": 0}).utils.drop_nonnumeric_vars()

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", r"All-NaN (slice|axis) encountered")
    sim_quantiles = {}
    for q in [percentiles[0], 0.5, percentiles[1]]:
        sim_quantiles[q] = normalized_ds.quantile(q, dim="exp_id", skipna=True)


In [None]:
add_median = True
with mpl.rc_context({"font.size": fontsize}):

    p_var = "thk"
    fig, ax = plt.subplots(1, 1, figsize=(6.4, 3.2))
    sim_cis = []
    sim_ci = ax.fill_between(
        sim_quantiles[0.5].time,
        sim_quantiles[percentiles[0]][p_var],
        sim_quantiles[percentiles[1]][p_var],
        alpha=sim_alpha,
        color=sim_cmap[0],
        lw=0,
        label=f"""({percentile_range:.0f}% credibility interval)""",
    )
    sim_cis.append(sim_ci)
    
    if add_median:
        sim_quantiles[0.5][p_var].plot(
            color=sim_cmap[1],
            add_legend=False,
            ax=ax,
            lw=1,
            ls="solid",
        )
    l = ax.legend(handles=sim_cis, loc="lower left")
    l.get_frame().set_linewidth(0.0)
    l.get_frame().set_alpha(0.0)
    ax.set_title(rgi_id)


In [None]:
normalized_ds

In [None]:
from pathlib import Path
from moviepy import ImageClip, CompositeVideoClip
from moviepy.video import fx as vfx

base = Path("/Users/andy") / "Google Drive" / "My Drive" / "Projects" / "terra"

BG_FILE    = base / "figures" / "ak_dem_8x.png"
CLOUD_FILE = base / "figures" / "clouds.jpg"
LOGO_FILE  = base / "figures" / "pism_logo.png"

duration    = 10.0      # total video length (s)

cloud_start = 0.5       # when clouds start appearing
cloud_in    = 2.0       # cloud fade-in duration
cloud_hold  = 4.0       # how long clouds stay fully visible
cloud_out   = 3.0       # cloud fade-out duration

logo_start  = 2.2       # when logo starts to appear
logo_in     = 1.5       # logo fade-in duration

# ---- background ----
bg = ImageClip(str(BG_FILE)).with_duration(duration)

# Force even width/height for H.264 / yuv420p
w_even = bg.w if bg.w % 2 == 0 else bg.w - 1
h_even = bg.h if bg.h % 2 == 0 else bg.h - 1
bg = bg.resized(new_size=(w_even, h_even))

# ---- clouds layer ----
cloud = (
    ImageClip(str(CLOUD_FILE))
    .resized(width=bg.w * 1.3)      # a bit larger than frame
    .with_duration(cloud_in + cloud_hold + cloud_out)
    .with_start(cloud_start)        # appear over the DEM
    .with_opacity(0.9)
)

def cloud_pos(t):
    # t is local time since cloud_start
    x = bg.w / 2 + 60 * (t / cloud.duration)   # drift slightly right
    y = bg.h / 2 - 20 * (t / cloud.duration)   # and slightly up
    return (x - cloud.w / 2, y - cloud.h / 2)

cloud = cloud.with_position(cloud_pos)
cloud = cloud.with_effects([vfx.CrossFadeIn(cloud_in), vfx.CrossFadeOut(cloud_out)])

# ---- logo base clip (no position yet) ----
logo_base = (
    ImageClip(str(LOGO_FILE))
    .resized(height=int(bg.h * 0.15))          # scale relative to frame
    .with_duration(duration - logo_start)
    .with_start(logo_start)
)

# Where you want the logo on screen
logo_pos = (100, 100)

# ---- shadow for logo ----
shadow_offset = (8, 8)  # pixels (x, y) offset of the shadow

shadow = (
    logo_base
    .resized(height=int(150))          # scale relative to frame
    .with_position((logo_pos[0] + shadow_offset[0],
                    logo_pos[1] + shadow_offset[1]))
    .with_opacity(0.4)                    # semi-transparent
    .with_effects([vfx.CrossFadeIn(logo_in)])  # fade in with the logo
)

# ---- actual logo layer (on top of shadow) ----
logo = (
    logo_base
    .resized(height=int(150))          # scale relative to frame
    .with_position(logo_pos)
    .with_effects([vfx.CrossFadeIn(logo_in)])  # rise out of cloud
)

# ---- composite & export ----
final = CompositeVideoClip([bg, cloud, shadow, logo], size=bg.size)

final.write_videofile(
    base / "animation" / "ak_cloud_logo.mp4",
    fps=30,
    codec="libx264",
    audio=False,
    ffmpeg_params=["-pix_fmt", "yuv420p"],
)

# last frame as PNG (assuming fps is defined)
fps = 30
final.save_frame(
    base / "animation" / "ak_cloud_logo.png",
    t=final.duration - 1.0 / fps,
)


In [None]:
bg.resized?

In [None]:
urlopen("s3://pism-cloud-data/terra/era5_ec2_1year.toml")

In [None]:
from copy import deepcopy
from pathlib import Path

import hyp3_sdk as sdk
import s3fs


PISM_CLOUD_BUCKET = 'hyp3-pism-cloud-test-contentbucket-zs9dctrqrlvx'

STAGE_TEMPLATE =     {
    # "name": "RGI2000-v7.0-C-01-09429_era5_agu_1year",
    "job_type": "PISM_TERRA_PREP_ENSEMBLE",
    "job_parameters": {
        # "rgi_id": "RGI2000-v7.0-C-01-09429",
        "rgi_gpkg": "s3://pism-cloud-data/terra/rgi.gpkg",
        "pism_config": "s3://pism-cloud-data/terra/era5_ec2_1year.toml",
        "run_template": "s3://pism-cloud-data/terra/ec2.j2",
        "uq_config": "s3://pism-cloud-data/terra/era5_agu.toml"
    }
}

EXECUTE_TEMPLATE = {
    # "name": "RGI2000-v7.0-C-01-09429_era5_agu_1year",
    "job_type": "PISM_TERRA_EXECUTE",
    "job_parameters": {
        # "ensemble_job_id": "042ffcdc-2134-4b18-b1af-b22fdf7cbb52",
        # "run_script": "RGI2000-v7.0-C-01-09429/run_scripts/submit_g400m_RGI2000-v7.0-C-01-09429_id_0_1978-01-01_1979-01-01.sh"
    }
}

RGI_IDS = [
    'RGI2000-v7.0-C-01-09429',  # Malaspina
    'RGI2000-v7.0-C-01-04374',  # Wrangle Mountains
    'RGI2000-v7.0-C-01-14907',  # TBD
]


def get_run_scripts(job: sdk.Job) ->  list[str]:
    fs = s3fs.S3FileSystem(anon=True)
    files = fs.ls(f'{PISM_CLOUD_BUCKET}/{job.job_id}/{job.job_parameters["rgi_id"]}/run_scripts')
    return [str(Path(file).relative_to(f'{PISM_CLOUD_BUCKET}/{job.job_id}/')) for file in files]


hyp3 = sdk.HyP3('https://pism-cloud-test.asf.alaska.edu')

# prepared_jobs = []
# for rgi in RGI_IDS:
#     job_dict = deepcopy(STAGE_TEMPLATE)
#     job_dict['name'] = f'{rgi}_{Path(job_dict["job_parameters"]["pism_config"]).stem}'
#     job_dict['job_parameters']['rgi_id'] = rgi
#     prepared_jobs.append(job_dict)

# jobs = hyp3.submit_prepared_jobs(prepared_jobs)
# jobs = hyp3.watch(jobs)


# prepared_jobs = []
# for job in jobs:
#     run_scripts = get_run_scripts(job)
#     for script in run_scripts:
#         job_dict = deepcopy(EXECUTE_TEMPLATE)
#         job_dict['name'] = job.name
#         job_dict['job_parameters']['ensemble_job_id'] = job.job_id
#         job_dict['job_parameters']['run_script'] = script
#         prepared_jobs.append(job_dict)

# jobs += hyp3.submit_prepared_jobs(prepared_jobs)
# jobs = hyp3.watch(jobs)

# job_names = {job.name for job in jobs}
# print(job_names)

hyp3 = sdk.HyP3('https://pism-cloud-test.asf.alaska.edu')
jobs = sdk.Batch()
for job_name in {'RGI2000-v7.0-C-01-14907_era5_ec2_1year', 'RGI2000-v7.0-C-01-09429_era5_ec2_1year', 'RGI2000-v7.0-C-01-04374_era5_ec2_1year'}:
    jobs += hyp3.find_jobs(name=job_name, user_id='aaschwanden', job_type='PISM_TERRA_EXECUTE')

# ----------------------------------------------------------------------------------------------------------------------

jobs = hyp3.watch(jobs)

print('Path to job files:')
for job in jobs:
    if job.job_type == 'PISM_TERRA_EXECUTE':
        print(f'{job.name}: s3://{PISM_CLOUD_BUCKET}/{job.job_id}/')
        print(f'    Run Script: {job.job_parameters["run_script"]}')

In [None]:
import moviepy

In [None]:
hyp3 = sdk.HyP3('https://pism-cloud-test.asf.alaska.edu')
jobs = sdk.Batch()
for job_name in {'RGI2000-v7.0-C-01-14907_era5_ec2_1year', 'RGI2000-v7.0-C-01-09429_era5_ec2_1year', 'RGI2000-v7.0-C-01-04374_era5_ec2_1year'}:
    jobs += hyp3.find_jobs(name=job_name, user_id='aaschwanden', job_type='PISM_TERRA_EXECUTE')

In [None]:
print('Path to job files:')
for job in jobs:
    if job.job_type == 'PISM_TERRA_EXECUTE':
        print(f'{job.name}: s3://{PISM_CLOUD_BUCKET}/{job.job_id}/')
        print(f'    Run Script: {job.job_parameters["run_script"]}')

In [None]:
job.logs

In [None]:
job.logs

In [None]:
import s3fs

fs = s3fs.S3FileSystem()

with fs.open("s3://pism-cloud-data/terra/era5_ec2_1year.toml", "r") as f:
    last = None
    for line in f:
        print(line)
        last = line.rstrip()

print(last)