# Running FEISTY with time-series forcing from CESM

This notebook doesn't really need to be a notebook, I'm hoping it can be converted to a script instead.
All the parameter settings are handled in the first (non-`import`) cell, where we read in `feisty-config.TL319_t13.4p2z.001.yml`,
though in the loop over years the start / end dates and file I/O parameters are changed.
This was run on a casper login node with 4 GB of memory, but all data is kept distributed on the dask cluster.

## Imports

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os

import dask
import matplotlib.pyplot as plt
import nc_time_axis  # needed for time series plot for some reason
import numpy as np
import xarray as xr
import yaml
from dask.distributed import Client, wait
from dask_jobqueue import PBSCluster

import feisty

## Configure run

The FEISTY command `config_and_run_from_yaml()` needs a dictionary pointing to forcing streams and initial conditions.
We provide a few `YAML` files containing acceptable configurations.
There are also several parameters controlling how the run is set up.

In [None]:
# Read settings from YAML
feisty_config_file = 'feisty-config.TL319_t13.4p2z.001.yml'
with open(feisty_config_file) as f:
    feisty_config_in = yaml.safe_load(f)

outdir = os.path.join(os.path.sep, 'glade', 'scratch', os.environ['USER'], 'feisty_output')

In [None]:
def modify_forcing_dict(feisty_config, start_year, nyears, outdir='temp_forcing_streams'):
    """
    The highres run has a large list of files to read for the forcing dataset,
    and we want to create
    """

    import copy

    newfile_list = []
    years = np.arange(np.maximum(1980, start_year - 1), np.minimum(2021, start_year + nyears + 1))
    if not os.path.isdir(outdir):
        os.mkdir(outdir)
    for file in feisty_config['forcing']['streams']:
        newfile = f'{outdir}/{file.split("/")[-1]}'
        with open(file) as f:
            forcing_dict = yaml.safe_load(f)
        forcing_file_dict = {}
        for n, forcing_file in enumerate(forcing_dict['files']):
            forcing_file_dict[1980 + n] = forcing_file
        forcing_dict['files'] = [forcing_file_dict[year] for year in years]
        with open(newfile, 'w') as outfile:
            yaml.dump(forcing_dict, outfile)
        newfile_list.append(newfile)

    feisty_config_out = copy.deepcopy(feisty_config)
    feisty_config_out['forcing']['streams'] = newfile_list
    return feisty_config_out

## Set up Dask cluster

Since the data in `ds` is chunked in (`nlat`, `nlon`), we use a `dask` cluster to configure the parallelization

In [None]:
mem = 40  # GB
dask.config.set({'distributed.dashboard.link': 'proxy/{port}/status'})
cluster = PBSCluster(
    memory=f'{mem} GB',
    processes=1,
    cores=1,
    queue='casper',
    walltime='1:30:00',
    resource_spec=f'select=1:ncpus=1:mem={mem}GB',
    log_directory='./dask-logs',
)

cluster.scale(feisty_config_in['num_workers'])
client = Client(cluster)
client

In [None]:
%%time

min_workers = feisty_config_in.get('min_workers', feisty_config_in['num_workers'])
worker_cnt = int(np.minimum(min_workers, feisty_config_in['num_workers']))
print(f"Waiting for {worker_cnt} workers (requested {feisty_config_in['num_workers']} total)")
client.wait_for_workers(worker_cnt)

## Run the model

In [None]:
%%time

start_year = 1981
nyears = 1
ds_list = list()
feisty_config = modify_forcing_dict(feisty_config_in, start_year, nyears)
for year in range(start_year, start_year + nyears):
    print(f'Configuring FEISTY for {year}...')

    # Start and End dates
    feisty_config['start_date'] = f'{year}-01-01'
    # feisty_config['end_date'] = f'{year}-12-31'
    feisty_config['end_date'] = f'{year}-01-05'

    # Initialize from restart (unless this is first year)
    previous_restart = f'highres.{year-1}-12-31.zarr'
    if os.path.exists(os.path.join(outdir, 'rest', previous_restart)):
        feisty_config['initial_conditions'] = {
            'root_dir': os.path.join(outdir, 'rest'),
            'ic_file': previous_restart,
        }

    # Set up history and restart files for output
    feisty_config['output']['hist_dir'] = os.path.join(outdir, 'hist')
    feisty_config['output']['hist_file'] = f'highres.{year}.zarr'
    feisty_config['output']['rest_dir'] = os.path.join(outdir, 'rest')
    feisty_config['output']['rest_file'] = f'highres.{year}-12-31.zarr'

    # map_blocks lets us run in parallel over our dask cluster
    print(f'Running FEISTY for year {year}...')
    ds_list.append(feisty.config_and_run_from_yaml(feisty_config))

ds_out = xr.concat(ds_list, dim='time')
ds_out["biomass"]

<!-- ### Plotting -->

Make a plot of `biomass` over time at a specified column,
then do the same for `fish_yield`

In [None]:
%%time

# Select column for time series plot
nlat = 91
nlon = 3175

fig, ax = plt.subplots(1, 1, figsize=(8, 4))
for group in ds_out.group.data:
    ds_out['biomass'].isel(nlat=nlat, nlon=nlon).sel(group=group).plot()
ax.set_ylim([5e-7, 50])
ax.set_yscale("log")
ax.set_title("python")
plt.legend(ds_out.group.data, bbox_to_anchor=(1.025, 0.5), loc=6)
fig.suptitle(f"biomass at ({nlat}, {nlon})");

In [None]:
%%time

# Select column for time series plot
nlat = 91
nlon = 3175

fig, ax = plt.subplots(1, 1, figsize=(8, 4))
plotted_fish = []
for fish in ds_out.fish.data:
    if np.nanmax(ds_out['fish_yield'].isel(nlat=nlat, nlon=nlon).sel(fish=fish).values) <= 0:
        continue
    plotted_fish.append(fish)
    ds_out['fish_yield'].isel(nlat=nlat, nlon=nlon).sel(fish=fish).plot()
ax.set_ylim([1e-6, 1e-2])
ax.set_yscale("log")
plt.legend(plotted_fish, bbox_to_anchor=(1.025, 0.5), loc=6)
fig.suptitle(f"Fish yield at ({nlat}, {nlon})");

In [None]:
# %%time

ds_out["biomass"].isel(time=0, group=0).max().values

In [None]:
# %%time

ds_out["biomass"].isel(time=180, group=0).plot()

In [None]:
# %%time

ds_out["biomass"].isel(time=(nyears - 1) * 365 + 180, group=0).plot()

In [None]:
# %%time

ds_out["biomass"].isel(time=-1, group=0).plot()

In [None]:
ds_test = xr.open_dataset(f'{outdir}/rest/highres.1980-12-31.zarr')
ds_test

In [None]:
np.sum(np.isnan(ds_test['bent_ic']))

In [None]:
np.sum(np.isnan(ds_test['fish_ic']))

In [None]:
25899528 / 3237441