# Averages daily nldas soilMoisture
- from hourly nc files 

In [1]:
import dask
import xarray as xr
import numpy as np
import pandas as pd
import datetime
import os 
import glob

In [3]:
def select_soilm(ds):
    '''
    preprocess each dataset to keep only the SoilM_0_100cm variable
    inputs:
    ds
    '''
    return ds[['SoilM_0_100cm']]
    
def average_daily_soilm(date, model, nldas_path, out_path, log_path):
    """
    inputs:
    - date
    - model: Vic, mos, or noah (nldas lsms)
    - nldas_path: where hourly nldas files are stored
    - out_path: where to save files
    - log_path: where to log exceptions 
    """
    
    yymmdd = date.strftime("%Y%m%d")
    output_file = f"{out_path}/NLDAS_{model}0125_H.A{date_strf}.nc"
    
    # Skip processing if output already exists
    #if os.path.isfile(output_file):
    #    return None

    # get all hourly files for the day (pattern must match the file naming convention)
    files = sorted(glob.glob(f"{nldas_path}/NLDAS_{model}0125_H.A{date_strf}*"))
    
    # Check if there are at least 23 files, otherwise log and exit
    if len(files) < 23:
        with open(f"{log_path}/NLDAS_{model}0125_H_{date_strf}.txt", "w") as f:
            f.write(f"Only {len(files)} files found")
        return None
    
    try:
        
        ds = xr.concat([xr.open_dataset(f) for f in files], dim="time")
        ds = ds[['SoilM_0_100cm']]
        
        daily_soilm = ds['SoilM_0_100cm'].resample(time="1D").mean(skipna=True)
        daily_soilm.to_netcdf(output_file)
    
    except Exception as e:
        with open(f"{log_path}/NLDAS_{model}0125_H_{date_strf}.txt", "w") as f:
            f.write(str(e))
        return None
    

def day_iteration(year, model, nldas_path, out_path, log_path):
    '''
    given some year & nldas file path, iterates through each day
    inputs:
    year
    file_path_base
    '''
    start_dt = datetime.datetime(year, 1, 1, 0, 0)
    end_dt = datetime.datetime(year, 12, 31, 0, 0)
    year_downloads_dir = (f"{downloads_dir}/{year}")
    os.makedirs(year_downloads_dir, exist_ok=True)
    
    while start_dt <= end_dt:
        average_hourly(start_dt, model, nldas_path, year_downloads_dir, log_path)
        start_dt += datetime.timedelta(days=1)

## Inputs

In [1]:
NLDAS_lsm = 'VIC'
nldas_path = f"/storage/group/pches/default/public/NLDAS/{NLDAS_lsm}/hourly/"
downloads_dir = f"/storage/home/cta5244/work/pyWBM_yield_data/{NLDAS_lsm}_daily/"
log_path = f"/storage/home/cta5244/work/pyWBM_yield_data/{NLDAS_lsm}_daily/log_path/"
start_year, end_year = 1979, 2025

## dask delayed 

In [5]:
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    account="open",
    cores=1,
    memory="4GiB",
    walltime="03:00:00",
    local_directory="/scratch/$USER/dask-workers"
)
cluster.scale(jobs=10)
print(cluster.dashboard_link)

http://146.186.150.11:8787/status


2025-03-14 22:42:31,291 - distributed.scheduler - ERROR - Task day_iteration-c8ab9df3-ed1d-4652-93f6-27c41e62eb5b marked as failed because 4 workers died while trying to run it


In [20]:
from dask.distributed import Client

client = Client(cluster)

In [21]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://146.186.150.11:8787/status,

0,1
Dashboard: http://146.186.150.11:8787/status,Workers: 10
Total threads: 10,Total memory: 40.00 GiB

0,1
Comm: tcp://146.186.150.11:36245,Workers: 10
Dashboard: http://146.186.150.11:8787/status,Total threads: 10
Started: Just now,Total memory: 40.00 GiB

0,1
Comm: tcp://10.6.8.41:45813,Total threads: 1
Dashboard: http://10.6.8.41:39091/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.41:34799,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-dh7_ji4u,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-dh7_ji4u

0,1
Comm: tcp://10.6.8.37:33711,Total threads: 1
Dashboard: http://10.6.8.37:38505/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.37:33677,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-2h5g7w3g,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-2h5g7w3g

0,1
Comm: tcp://10.6.8.32:35705,Total threads: 1
Dashboard: http://10.6.8.32:44935/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.32:44185,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-pi3arv2b,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-pi3arv2b

0,1
Comm: tcp://10.6.8.31:41763,Total threads: 1
Dashboard: http://10.6.8.31:41833/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.31:36121,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-kstpuk5n,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-kstpuk5n

0,1
Comm: tcp://10.6.8.44:37125,Total threads: 1
Dashboard: http://10.6.8.44:39369/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.44:37313,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-fyo9yn63,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-fyo9yn63

0,1
Comm: tcp://10.6.8.26:39699,Total threads: 1
Dashboard: http://10.6.8.26:45199/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.26:35419,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-1_tmwysn,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-1_tmwysn

0,1
Comm: tcp://10.6.8.13:34523,Total threads: 1
Dashboard: http://10.6.8.13:43821/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.13:35665,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-g1_52o2k,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-g1_52o2k

0,1
Comm: tcp://10.6.8.33:41791,Total threads: 1
Dashboard: http://10.6.8.33:34879/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.33:38981,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-7friftb4,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-7friftb4

0,1
Comm: tcp://10.6.8.39:37359,Total threads: 1
Dashboard: http://10.6.8.39:34747/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.39:45951,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-ykqqo9w_,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-ykqqo9w_

0,1
Comm: tcp://10.6.8.39:37099,Total threads: 1
Dashboard: http://10.6.8.39:38329/status,Memory: 4.00 GiB
Nanny: tcp://10.6.8.39:34201,
Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-auaib40s,Local directory: /scratch/cta5244/dask-workers/dask-scratch-space/worker-auaib40s


In [1]:
results = []
for year in np.arange(start_year, end_year, 1):
    out = dask.delayed(day_iteration)(year=year, model=NLDAS_lsm, nldas_path=nldas_path, downloads_dir=downloads_dir, log_path=log_path)
    results.append(out)

NameError: name 'np' is not defined

In [23]:
results = dask.compute(*results)

KilledWorker: Attempted to run task 'day_iteration-c8ab9df3-ed1d-4652-93f6-27c41e62eb5b' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.6.8.39:42551. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.