# Homework Assignment 2

In [304]:
from datetime import date, datetime, timedelta
from pathlib import Path
import random
import os
import cdsapi
import xarray as xr
import hpgeom as hpg
import numpy as np
from scipy.interpolate import griddata
import zarr

### Check necessary modules

In [318]:
try:
    import zarr
except ImportError:
    !pip install zarr
    import zarr

In [320]:
try:
    import hpgeom as hpg
except ImportError:
    !pip install hpgeom
    import hpgeom as hpg

In [322]:
try:
    import cdsapi
except ImportError:
    !pip install cdsapi
    import cdsapi

## Task 1 - Core control flow & mock-processing

### Date Logic

In [72]:
# Generator function to find and store all dates inside of the given range, inclusive the start and end date 
def daterange(start, end):
    
    current = start
    while current <= end:
        yield current
        current += timedelta(days=1)

### Status Check

In [294]:
# If necessary, a new directory will be created
BASE_DIR = Path("processed_data")
BASE_DIR.mkdir(exist_ok=True)

# Checks whether a date has already been processed
def is_day_processed(day: date) -> bool:
    # A date is considered processed if a folder exists for it
    return (BASE_DIR / day.isoformat()).exists()

### Mock-Processing

In [40]:
# Mock-processing that simulates download, processing and archiving of daily files
def mock_process_day(day, fail_probability=0.2):
    print(f"→ Processing {day}")

    # Simulating a random error during download
    if random.random() < fail_probability:
        raise RuntimeError("Mock download failed")

    # Save the daily mock file
    day_dir = BASE_DIR / day.isoformat()
    day_dir.mkdir(parents=True, exist_ok=True)

    with open(day_dir / "done.txt", "w") as f:
        f.write(f"Processed on {datetime.now()}\n")

    print(f"✓ {day} successfully processed")

### Find oldest missing day

In [17]:
# Find the oldest missing day of a given date range 
def find_oldest_missing_day(start, end) -> date | None:
    for d in daterange(start, end):
        # Use the established logic
        if not is_day_processed(d):
            return d
    return None

### Find oldest processed day

In [20]:
# Find the oldest fully processed day in the directory
def find_oldest_processed_day(processed_dir, date_format):
    
    oldest_date = None
    
    for entry in os.listdir(processed_dir):
        folder_path = os.path.join(processed_dir, entry)
        if not os.path.isdir(folder_path):
            continue  # Skip files, just looking for folders
        
        try:
            folder_date = datetime.strptime(entry, date_format).date() # Extract date from folder name
        except ValueError:
            # Ignore files that do not match the given date format
            continue

        # Check whether “Done.txt” exists in the folder
        done_file = os.path.join(folder_path, "Done.txt")
        if not os.path.isfile(done_file):
            continue  # Date has not been fully processed
            
        if oldest_date is None or folder_date < oldest_date:
            oldest_date = folder_date

    return oldest_date

### Check dates

In [250]:
def check_dates(
    start_date,
    end_date,
    target_date = None
):
    if end_date and start_date is not None:
        if end_date < start_date:
            raise ValueError("✗ End_date is before start_date")

### Central Workflow

In [238]:
# Central control flow that handles the daily data
def run_daily_workflow(
    start_date = None,
    end_date = None,
    target_date = None, 
    # ensure proper functionaility when script is executed w/o arguments
):

    # If the function is called without start_date, find a suitable start date
    if start_date is None:
        start_date = find_oldest_processed_day(BASE_DIR, "%Y-%m-%d")
    
    # If the function is called without end_date, find a suitable end date
    if end_date is None:
        end_date = date.today()
    
    # Check that the given date range is correct
    check_dates(start_date, end_date)
    
    # Check whether a target date or all days within the range should be processed
    if target_date is not None:
            
        print(f"Start workflow for individual target date: {target_date}")
        try:
            mock_process_day(target_date)
        except Exception as e:
            print(f"✗ Error on {target_date}: {e}")
        return

    print("Start workflow for all missing days")

    while True:
        missing_day = find_oldest_missing_day(start_date, end_date)
        if missing_day is None:
            print("✓ All days are processed")
            break

        try:
            mock_process_day(missing_day)
        except Exception as e:
            print(f"✗ Error on {missing_day}: {e}")
            print("→ Error logged, next day will be processed")

## Task 2 - Flexible ERA5 donwload routine

### ERA5 configuration dictionary

In [85]:
# This dictionary is used to easily adapt the donwload routine to other parameter values as stated in the assignment.
# No hardcoded settings within the download routine of the core workflow  

ERA5_CONFIG = {
    "dataset": "reanalysis-era5-pressure-levels",
    "format": "netcdf",
    "variable": ["specific_humidity"],
    "pressure_levels": [975, 900, 800, 500, 300],
    "times": ["00:00", "06:00", "12:00", "18:00"],
    "filename": "era5_humidity.nc",
}

### Download ERA5 data

In [93]:
# Downloads ERA5 humidity data for a single given day
def download_era5_humidity(day):

    # Save the daily data
    day_dir = BASE_DIR / day.isoformat()
    day_dir.mkdir(exist_ok=True)

    target_file = day_dir / "era5_humidity.nc"

    # CDS API request
    c = cdsapi.Client()
    request = {
        "product_type": "reanalysis",
        "variable": ERA5_CONFIG["variable"],
        "pressure_level": [str(p) for p in ERA5_CONFIG["pressure_levels"]],
        "year": day.strftime("%Y"),
        "month": day.strftime("%m"),
        "day": day.strftime("%d"),
        "time": ERA5_CONFIG["times"],
        "format": ERA5_CONFIG["format"],
    }

    print(f"↓ Download ERA5 humidity data for {day}")
    c.retrieve(
        "reanalysis-era5-pressure-levels",
        request,
        str(target_file),
    )

    # Flag as processed
    (day_dir / "Done.txt").write_text("OK\n")

### Central workflow with ERA5 download

In [240]:
# Central control flow that handles the daily ERA5 data
def run_daily_era5_workflow(
    start_date = None,
    end_date = None,
    target_date = None, 
    # ensure proper functionaility when script is executed w/o arguments
):

    # If the function is called without start_date, find a suitable start date
    if start_date is None:
        start_date = find_oldest_processed_day(BASE_DIR, "%Y-%m-%d")
        
    # If the function is called without end_date, find a suitable end date    
    if end_date is None:
        end_date = date.today()
        
    # Check that the given date range is correct
    check_dates(start_date, end_date)
        
    # Check whether a target date or all days within the range should be processed       
    if target_date is not None:
        
        print(f"Start workflow for individual day: {target_date}")
        try:
            download_era5_humidity(target_date)
        except Exception as e:
            print(f"✗ Error on {target_date}: {e}")
        return

    print("Start workflow for all missing days")

    while True:
        missing_day = find_oldest_missing_day(start_date, end_date)
        if missing_day is None:
            print("✓ All days are processed")
            break

        try:
            download_era5_humidity(missing_day)
        except Exception as e:
            print(f"✗ Error on {missing_day}: {e}")
            print("→ Error logged, next day will be processed")

## Task 3 - Interpolation

### Interpolate daily data batches 

In [210]:
# Interpolates daily ERA5 data into a HEALPix grid of nside 8 and 16 using the hpgeom module
def interpolate_to_healpix(day, nside_list=[8, 16]):

    print("Start interpolation...")
    
    day_dir = BASE_DIR / day.isoformat()
    input_file = day_dir / "era5_humidity.nc"

    if not input_file.exists():
        raise FileNotFoundError(f"Missing ERA5 file for {day}")

    ds = xr.open_dataset(input_file)

    time_dim = "valid_time"
    level_dim = "pressure_level"
    q = ds["q"]
    lats = ds["latitude"].values
    lons = ds["longitude"].values

    print("...step 1...")
    
    lon_grid, lat_grid = np.meshgrid(lons, lats)
    src_points = np.column_stack([lat_grid.ravel(), lon_grid.ravel()])

    for nside in nside_list:
        
        print(f"...nside {nside}...")
        
        npix = hpg.nside_to_npixel(nside)
        pixels = np.arange(npix)

        theta, phi = hpg.pixel_to_angle(nside, pixels)
        hp_lats = 90.0 - np.degrees(theta)
        hp_lons = np.degrees(phi)
        tgt_points = np.column_stack([hp_lats, hp_lons])

        hp_q = np.empty(
            (q.sizes[time_dim], q.sizes[level_dim], npix)
        )

        for t in range(q.sizes[time_dim]):
            print(f"...t {t}...")
            for lev in range(q.sizes[level_dim]):
                print(f"...lev {lev}...")
                values = q.isel({time_dim: t, level_dim: lev}).values.ravel()

                interp = griddata(
                    src_points, values, tgt_points, method="linear"
                )

                nan_mask = np.isnan(interp)
                if np.any(nan_mask):
                    interp[nan_mask] = griddata(
                        src_points,
                        values,
                        tgt_points[nan_mask],
                        method="nearest"
                    )

                hp_q[t, lev, :] = interp

        out_ds = xr.Dataset(
            data_vars={
                "q": ((time_dim, level_dim, "pixels"), hp_q)
            },
            coords={
                time_dim: ds[time_dim],
                level_dim: ds[level_dim],
                "pixels": pixels,
            },
            attrs={"healpix_nside": nside},
        )

        out_file = day_dir / f"healpix_n{nside}.nc"
        out_ds.to_netcdf(out_file)

        print(f"✓ {day}: interpolated to HEALPix NSIDE={nside}")

### Central Workflow with ERA5 download and interpolation

In [258]:
def run_daily_era5_interpolation_workflow(
    start_date = None,
    end_date = None,
    target_date = None, 
    # ensure proper functionaility when script is executed w/o arguments
    nside_list=[8,16],
):

    # If the function is called without start_date, find a suitable start date
    if start_date is None:
        start_date = find_oldest_processed_day(BASE_DIR, "%Y-%m-%d")
    
    # If the function is called without end_date, find a suitable end date
    if end_date is None:
        end_date = date.today()
    
    # Check that the given date range is correct
    check_dates(start_date, end_date)
        
    # Check whether a target date or all days within the range should be processed
    if target_date is not None:
        
        print(f"Start workflow for individual day: {target_date}")
        try:
            download_era5_humidity(target_date)
            interpolate_to_healpix(target_date, nside_list=nside_list)  # HEALPix Interpolation
        except Exception as e:
            print(f"✗ Error on {target_date}: {e}")
        return

    print("Start workflow for all missing days")

    while True:
        missing_day = find_oldest_missing_day(start_date, end_date)
        if missing_day is None:
            print("✓ All days are processed")
            break

        try:
            download_era5_humidity(missing_day)
            interpolate_to_healpix(missing_day, nside_list=nside_list)
        except Exception as e:
            print(f"✗ Error on {missing_day}: {e}")
            print("→ Error logged, next day will be processed")

## Task 4 - Chunking and save as Zarr

### Chunking and save to Zarr

In [285]:
ZARR_DIR = Path("processed_data_zarr")
ZARR_DIR.mkdir(exist_ok=True)

# Stores the interpolated HEALPix data for one day in Zarr
def save_to_zarr(day, nside):
    day_dir = BASE_DIR / day.isoformat()
    input_file = day_dir / f"healpix_n{nside}.nc"
    
    if not input_file.exists():
        raise FileNotFoundError(f"No interpolated file for {day} NSIDE={nside} found")
    
    ds = xr.open_dataset(input_file)
    
    # Dynamisches Chunking basierend auf den vorhandenen Dimensionen
    # Defining of a chunking strategy, 1 day per chunk
    chunk_dict = {}
    for dim in ['time', 'level', 'pixels']:
        if dim in ds.dims:
            chunk_dict[dim] = 1 if dim == 'time' else ds.dims[dim]

    ds_chunked = ds.chunk(chunk_dict)
    
    zarr_dir = Path("processed_data_zarr") / f"healpix_n{nside}.zarr"
    zarr_dir.mkdir(parents=True, exist_ok=True)
    
    # Speichern (Append möglich, falls Zarr schon existiert)
    ds_chunked.to_zarr(zarr_dir, mode='a')  # 'a' = append / update
    print(f"✓ Saved day {day} NSIDE={nside} to Zarr: {zarr_dir}")

### Central workflow with ERA5 download, interpolation, chunking and saving as Zarr

In [288]:
# 
def run_daily_era5_interpolation_workflow_zarr(
    start_date=None,
    end_date=None,
    target_date=None,
    # ensure proper functionaility when script is executed w/o arguments
    nside_list=[8],
):

    # If the function is called without start_date, find a suitable start date
    if start_date is None:
        start_date = find_oldest_processed_day(BASE_DIR, "%Y-%m-%d")
    
    # If the function is called without end_date, find a suitable end date
    if end_date is None:
        end_date = date.today()
        
    # Check that the given date range is correct
    check_dates(start_date, end_date)
    
    # Check whether a target date or all days within the range should be processed
    if target_date is not None:
        print(f"Start workflow for individual day: {target_date}")
        try:
            download_era5_humidity(target_date)
            interpolate_to_healpix(target_date, nside_list=nside_list)
            
            for nside in nside_list:
                save_to_zarr(target_date, nside)

        except Exception as e:
            print(f"✗ Error on {target_date}: {e}")
        return

    # Workflow when a date range is downloaded
    print("Start workflow for all missing days")

    while True:
        missing_day = find_oldest_missing_day(start_date, end_date)
        if missing_day is None:
            print("✓ All days are processed")
            break

        try:
            download_era5_humidity(missing_day)
            interpolate_to_healpix(missing_day, nside_list=nside_list)

            for nside in nside_list:
                save_to_zarr(missing_day, nside)

        except Exception as e:
            print(f"✗ Error on {missing_day}: {e}")
            print("→ Error logged, next day will be processed")


## Testing

In [279]:
# Test ERA5 download routine with interpolation

run_daily_era5_interpolation_workflow(
    #start_date=date(2024, 12, 1),
    #end_date=date(2024, 12, 1),
    target_date=date(2024, 12, 1),
)

FileNotFoundError: [WinError 3] Das System kann den angegebenen Pfad nicht finden: 'processed_data'

In [296]:
run_daily_era5_interpolation_workflow_zarr(
    target_date=date(2024, 12, 2),
)

Start workflow for individual day: 2024-12-02


2026-01-20 09:13:30,075 INFO Request ID is 1712da6d-ebdd-4d37-9e6f-780077b608aa
2026-01-20 09:13:30,124 INFO status has been updated to accepted


↓ Download ERA5 humidity data for 2024-12-02


2026-01-20 09:14:19,893 INFO status has been updated to running
2026-01-20 09:14:45,578 INFO status has been updated to successful


791846caa92b6f74f6d77e87ce88151f.nc:   0%|          | 0.00/33.3M [00:00<?, ?B/s]

Start interpolation...
...step 1...
...nside 8...
...t 0...
...lev 0...
...lev 1...
...lev 2...
...lev 3...
...lev 4...
...t 1...
...lev 0...
...lev 1...
...lev 2...
...lev 3...
...lev 4...
...t 2...
...lev 0...
...lev 1...
...lev 2...
...lev 3...
...lev 4...
...t 3...
...lev 0...
...lev 1...
...lev 2...
...lev 3...
...lev 4...
✓ 2024-12-02: interpolated to HEALPix NSIDE=8
✗ Error on 2024-12-02: Expected an integer or an iterable of integers. Got None instead.


In [325]:
run_daily_era5_interpolation_workflow_zarr(
    start_date=date(2024, 12, 1),
    end_date=date(2024, 12, 5),
    target_date=date(2024, 12, 3),
)

Start workflow for individual day: 2024-12-03
↓ Download ERA5 humidity data for 2024-12-03


2026-01-20 10:12:03,289 INFO Request ID is 65979b48-6cf7-4d43-ab48-b8e6730562ee
2026-01-20 10:12:03,361 INFO status has been updated to accepted
2026-01-20 10:12:24,517 INFO status has been updated to successful


e74b3502a99cbf498d1589e6e42a1f52.nc:   0%|          | 0.00/33.3M [00:00<?, ?B/s]

Start interpolation...
...step 1...
...nside 8...
...t 0...
...lev 0...
...lev 1...
...lev 2...
...lev 3...
...lev 4...
...t 1...
...lev 0...
...lev 1...
...lev 2...
...lev 3...
...lev 4...
...t 2...
...lev 0...
...lev 1...
...lev 2...
...lev 3...
...lev 4...
...t 3...
...lev 0...
...lev 1...
...lev 2...
...lev 3...
...lev 4...
✓ 2024-12-03: interpolated to HEALPix NSIDE=8
✗ Error on 2024-12-03: Expected an integer or an iterable of integers. Got None instead.
