# This notebook processes, evaluates, and plots Clear Air Turbulence indices for BARPA-R and BARRA-R 
# VWS

# import packages

In [1]:
import xarray as xr
import glob
import intake
import numpy as np
import seaborn as sns
from scipy import stats
import os
import xesmf as xe
import inspect
import calendar
import pandas as pd
from turbulence_AUSCAT.cat_indices import calc_turbulence_indices, windspeed, VWS
from xarray.groupers import SeasonResampler

import warnings
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

import logging
logging.getLogger("flox").setLevel(logging.WARNING)

from plotting_maps.acs_plotting_maps import plot_acs_hazard_multi, plot_acs_hazard, plot_data, cmap_dict, regions_dict
from matplotlib import colors, cm
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import matplotlib.pyplot as plt

from dask.diagnostics import ProgressBar
# ProgressBar().register()

import dask
from dask.distributed import Client
# ensure lots of resources are requested. Eg 28 cores
# client = Client(threads_per_worker=120, n_workers=8)
# client = Client(threads_per_worker=4, n_workers=2) 
# client = Client(threads_per_worker=6, n_workers=2)

# client

# Choose index to process and evaluate

In [2]:
mid_lat_slice = slice(-50,-25)
lon_slice = slice(90,195)

baseline_time_range = np.arange(1990,2009+1)
baseline_time_slice = slice("1990", "2009")

P=200
p=P
p0 = P-50
p1 = P+50
P0 = 1000
step_size=  0.1545


In [3]:
turbulence_index_list = ["VWS"]
turbulence_index = turbulence_index_list[0]

# Prepare BARRA-R data for baseline evaluation

In [4]:
# ds = xr.open_dataset("/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_2000.nc")
# ds

In [5]:
ds_target = xr.open_dataset("/g/data/py18/BARPA/output/CMIP6/DD/AUS-15/BOM/ACCESS-CM2/historical/r4i1p1f1/BARPA-R/v1-r1/6hr/ua200/v20231001/ua200_AUS-15_ACCESS-CM2_historical_r4i1p1f1_BOM_BARPA-R_v1-r1_6hr_196001-196012.nc")\
.drop_vars("pressure").chunk({"time":120, "lat":-1, "lon":-1})


In [6]:
%%time
# best with one thread per worker
VARU = "wnd_ucmp"
VARV = "wnd_vcmp"
VART = "air_temp"
VARZ = "geop_ht"

barra_vars = {"wnd_ucmp":"ua", "wnd_vcmp": "va", "air_temp":"ta", "geop_ht":"zg"}

# start and end for BARRA-R specifically
lat_min, lat_max, lon_min, lon_max = (ds_target["lat"].values[0], ds_target["lat"].values[-1], 
                                      ds_target["lon"].values[0], ds_target["lon"].values[-1])
regridder=None

def _preprocess(ds):
    ds = ds.drop_vars( ["latitude_longitude"],)\
            .sel({"latitude": slice(lat_min-0.2 , lat_max+0.2), "longitude":slice(lon_min-0.2 , lon_max+0.2)})\
            .sel({"pressure":[100, 150, 200, 250, 300,]},  method = "nearest")
            # .sel({"pressure":[p0,p,p1]},  method = "nearest")
    try:
        ds = ds.drop_vars(["forecast_period", "forecast_reference_time",])
    except:
        pass
    return ds  

with Client(threads_per_worker=1, n_workers=30) as client:
    # retry loop because sometimes this loop fails (randomly?) and can be resolved by simply retrying
    max_retries = 10
    retry_count = 0
    while retry_count < max_retries:
        try:
            ds = None
            for VAR in [VARU, VARV, VARZ, VART]:
                regridder = None
                for year in baseline_time_range:
                    new_file = f"/scratch/v46/gt3409/BARRA-R/TMP_{barra_vars[VAR]}_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
                    if os.path.exists(new_file):
                        print(f"File '{new_file}' already exists.")
                        continue
                        
                    filelist = glob.glob(f"/g/data/cj37/BARRA/BARRA_R/v1/analysis/prs/{VAR}/{year}/*/{VAR}-an-prs-PT0H-BARRA_R-v1-*T*00Z.sub.nc")
                    filelist.sort()
                
                    print(f"open_mfdataset {VAR} {year}...")
                    ds = xr.open_mfdataset(filelist, preprocess=_preprocess, combine="nested", concat_dim="time", 
                                           parallel=True, decode_timedelta=False)\
                            .rename({"latitude":"lat", "longitude":"lon", VAR:barra_vars[VAR]})\
                            .chunk({"pressure":1, "time":120, "lat":-1, "lon":-1})
    
                    if regridder is None:
                        print("calculate regridder...")
                        regridder = xe.Regridder(ds, ds_target, "bilinear",)
                    print(f"regrid and compute {year}...")
                    ds_regridded = regridder(ds)
                    # for plvl in [p0, p, p1]:
                    new_file = f"/scratch/v46/gt3409/BARRA-R/TMP_{barra_vars[VAR]}_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_{year}.nc"                   
                    try:
                        print("saving netcdf...")
                        # ds_regridded.sel({"pressure":plvl}, method="nearest").to_netcdf(new_file)
                        ds_regridded.to_netcdf(new_file)
                        print(f"Saved {new_file}")
                        
                        # check
                        ds = xr.open_dataset(new_file)
                        nulls = ds.isel({"pressure":0, "lat":0, "lon":0})[barra_vars[VAR]].isnull().sum().values
                        print(f"nulls = {nulls} for {new_file}")
                        if nulls == 0:
                            pass
                        else:
                            os.remove(new_file)
                            print(f"nulls found. Removed file: {new_file}")
                            break    
                    except:
                        # os.remove(new_file)
                        print(f"Problem saving {new_file}")
                        continue
                    ds.close()
                    ds_regridded.close()  
                
        except:
            retry_count += 1
            print(f"Error. Retry count = {retry_count}")
        break
    else:
        print(f"Operation failed after {max_retries} attempts.")
            
                    

  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml
  import pynvml


File '/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1990.nc' already exists.
File '/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1991.nc' already exists.
File '/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1992.nc' already exists.
File '/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1993.nc' already exists.
File '/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1994.nc' already exists.
File '/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1995.nc' already exists.
File '/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1996.nc' already exists.
File '/scratch/v46/gt3409/BARRA-R/TMP_ua_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1997.nc' already exists.
File '/scratch/v

In [7]:
# test for nans

In [8]:
# %%time
# for VAR in [VARU, VARV, VARZ, VART]:
#     print(VAR)
#     for year in baseline_time_range:
#         file = f"/scratch/v46/gt3409/BARRA-R/TMP_{barra_vars[VAR]}_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
#         print(year)
#         ds = xr.open_dataset(file)
#         nulls = ds.isel({"pressure":0, "lat":0, "lon":0})[barra_vars[VAR]].isnull().sum().values
#         if nulls == 0:
#             pass
#         else:
#             print(f"nulls = {nulls} for {file}. Removing ...")
#             os.remove(file)
        

# Identify available BARPA-R experiments

In [9]:
# get information of available datasets

cat_name = "barpa"
col = intake.open_esm_datastore(f"/g/data/lp01/collections/py3.9_dev/nci-{cat_name}.json")

var_list = [f"{var}{pressure}" for var in ["ta", "ua", "va", "zg"] for pressure in [P-50, P, P+50]]

table_id = "6hr"
scenarios = ["historical","ssp126", "ssp370", "ssp585", "evaluation"]

# change this query to select a subset of the data you are interested in
query = dict(variable_id = var_list,
             table_id = table_id,
             experiment_id = scenarios,
            )

cat = col.search(**query)
cat.unique()

activity_id                                                    [BARPA-R]
institution_id                                                     [BOM]
version                                                      [v20231001]
variable_id            [ta150, ta200, ta250, ua150, ua200, ua250, va1...
table_id                                                           [6hr]
source_id              [ACCESS-CM2, ACCESS-ESM1-5, CESM2, CMCC-ESM2, ...
experiment_id           [historical, ssp126, ssp370, ssp585, evaluation]
member_id                      [r4i1p1f1, r6i1p1f1, r11i1p1f1, r1i1p1f1]
grid_label                                                      [AUS-15]
time_range             [196001-196012, 196101-196112, 196201-196212, ...
path                   [/g/data/py18/BARPA/output/CMIP6/DD/AUS-15/BOM...
derived_variable_id                                                   []
dtype: object

In [10]:
cat_df_max = cat.df.groupby(["variable_id", "experiment_id", "source_id", "member_id"]).max().reset_index()
# cat_df_max["index"] = cat_df_max.variable_id + "_" + cat_df_max.experiment_id + "_" + cat_df_max.source_id + "_" + cat_df_max.member_id
cat_df_max["index"] = [f'{cat_df_max.iloc[i]["experiment_id"]}_{cat_df_max.iloc[i]["source_id"]}_{cat_df_max.iloc[i]["member_id"]}' for i in np.arange(len(cat_df_max))]
cat_df_max = cat_df_max.set_index("index")


# indices for evaluation, historical and future groups. These will share time ranges
i_evaluation = cat_df_max.loc[cat_df_max["experiment_id"].isin(["evaluation"])].index
i_historical = cat_df_max.loc[cat_df_max["experiment_id"].isin(["historical"])].index
i_future = cat_df_max.loc[cat_df_max["experiment_id"].isin(["ssp126", "ssp370", "ssp585"])].index

In [11]:
list_evaluation = ['evaluation_BARRA-R_r1i1p1f1',
                   'evaluation_ERA5_r1i1p1f1',
                  ]

list_historical = ['historical_ACCESS-CM2_r4i1p1f1', 
                   'historical_ACCESS-ESM1-5_r6i1p1f1',
                   'historical_CESM2_r11i1p1f1', 
                   'historical_CMCC-ESM2_r1i1p1f1',
                   'historical_EC-Earth3_r1i1p1f1',
                   'historical_MPI-ESM1-2-HR_r1i1p1f1',
                   'historical_NorESM2-MM_r1i1p1f1',
                  ]

list_ssp126 = [
                 'ssp126_ACCESS-CM2_r4i1p1f1', 
                 'ssp126_ACCESS-ESM1-5_r6i1p1f1',
                 'ssp126_CESM2_r11i1p1f1',
                 'ssp126_CMCC-ESM2_r1i1p1f1',
                 'ssp126_EC-Earth3_r1i1p1f1',
                 'ssp126_MPI-ESM1-2-HR_r1i1p1f1',
                 'ssp126_NorESM2-MM_r1i1p1f1',
              ]

list_ssp370 = ['ssp370_ACCESS-CM2_r4i1p1f1',
                 'ssp370_ACCESS-ESM1-5_r6i1p1f1',
                 'ssp370_CESM2_r11i1p1f1',
                 'ssp370_CMCC-ESM2_r1i1p1f1',
                 'ssp370_EC-Earth3_r1i1p1f1',
                 'ssp370_MPI-ESM1-2-HR_r1i1p1f1',
                 'ssp370_NorESM2-MM_r1i1p1f1',
              ]

list_ssp585 = ['ssp585_ACCESS-CM2_r4i1p1f1',
                 'ssp585_EC-Earth3_r1i1p1f1']

list_future = list_ssp126 + list_ssp370 + list_ssp585

# Calculate indices from standard variables

In [12]:
def dti_preprocess(ds):
    ds = ds.drop_vars(["crs"])
    # ds = ds.dropna("time") # too slow
    ds = ds.convert_calendar("standard")
    try:
        ds = ds.expand_dims({"pressure":[int(ds.variable_id[2:])]})
    except:
        pass
    return ds
    
def delayed_turbulence_index(turbulence_index=None,
                             year=None,
                             path = "/g/data/py18/BARPA/output/CMIP6/DD/AUS-15/BOM",
                             source_id=None,
                             experiment_id=None,
                             member_id=None,
                             table_id = "6hr", 
                             version="v20231001", 
                             P=250,
                             outfile = None,
                             ):
    """Use this function to compute the turbulence indices in a delayed routine"""
    # get a list of the variables needed for calculating this turbulence index (var_list)
    var_list = [f"{var}{pressure}" for var in ["ta", "ua", "va", "zg"] for pressure in [P-50, P, P+50]]
    dict_variables = {"t":"ta","u":"ua","v":"va","z":"zg",
                      "p":int(P),"p0":int(P-50),"p1":int(P+50),}
    inverted_dict = {value: key for key, value in dict_variables.items()}

    def _filename(var, pressure, source_id, experiment_id):
        VAR = f"{dict_variables[var]}{dict_variables[pressure]}"
        if source_id == "BARRA-R" and experiment_id == "evaluation":
            filename = f"/scratch/v46/gt3409/BARRA-R/TMP_{dict_variables[var]}_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
        else:
            filename = f"{path}/{source_id}/{experiment_id}/{member_id}/BARPA-R/v1-r1/{table_id}/{VAR}/{version}/\
{VAR}_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}01-{year}12.nc"
        return filename

    def _open_it(file):
        ds = xr.open_dataset(file, decode_times=True, chunks ={"time":-1, "lat":-1, "lon":-1})\
            .drop_vars(["crs"])\
            .astype("float32")\
            .convert_calendar("standard")

        VAR=list(ds.variables)[0]
        ds = ds.rename({VAR: inverted_dict[''.join(char for char in VAR if char.isalpha())]})
        try:
            ds = ds.expand_dims("pressure")
        except:
            pass
        ds["pressure"] = ds["pressure"].astype("int")
        return ds

    turbulence_index_vars = set()
    turbulence_index_vars.update(list(inspect.signature(globals()[turbulence_index]).parameters.keys()))
    
    params = turbulence_index_vars.intersection(["t", "u", "v", "z"])
    plvls = turbulence_index_vars.intersection(["p0", "p", "p1", "P0"])
    if len(plvls)==0:
        plvls=set(["p"])
    
    var_list = [f"{dict_variables[var]}{dict_variables[pressure]}" for var in list(params) for pressure in list(plvls)]

    
    ds = xr.merge([xr.concat([_open_it(_filename(var, pressure, source_id, experiment_id)).sel({"pressure":dict_variables[pressure]}, method="nearest") 
                              for pressure in list(plvls)], 
                             dim="pressure") 
                   for var in list(params)], join='outer')

    
    # delayed write to file. Compute outside this function
    
    # lazy calculate indices
    ds = calc_turbulence_indices(ds, which= turbulence_index, p=P, u="u", v="v", t="t", z="z",)

    if outfile is None:
        outfile = f"/scratch/v46/gt3409/TMP_{turbulence_index}/TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc",

    ds_to_write = ds[[turbulence_index]].sel({"time":str(year)})
    try:
        ds_to_write = ds_to_write.sel({"pressure":P}, method="nearest",)
    except:
        pass
    delayed_write = ds_to_write.to_netcdf(outfile, mode="a", compute=False,)
    return delayed_write

In [13]:
# calculate temporary turbulence index for baselines

# best with one worker, many threads for barra
# use 28 cores 4 threads, 2 workers


# dict_years = {"evaluation":(1990, 2009), 
#               "historical":(1979, 2014), 
#               "future":(2015, 2100)}
dict_model_index = {"evaluation":["evaluation_BARRA-R_r1i1p1f1"], 
                    "historical":list_historical,
                    "future":list_future}


## Calculate index for BARRA-R

In [14]:
%%time

scenario ="evaluation"
for run in dict_model_index[scenario]:
    experiment_id, source_id, member_id = run.split("_")        

    p99_file = f"/scratch/v46/gt3409/turbulence_AUSCAT/{turbulence_index}-{P}hPa-monthly-freq-above-p99_AUS-15_{run}_BOM_BARPA-R_v1-r1_6hr.nc"
    percentile_file = f"/scratch/v46/gt3409/turbulence_AUSCAT/{turbulence_index}-{P}hPa-monthly-percentiles_AUS-15_{run}_BOM_BARPA-R_v1-r1_6hr.nc"
    if os.path.exists(p99_file) and os.path.exists(percentile_file):
        print(f"Files '{p99_file} and {percentile_file}' already exist.")
        continue
    
    with dask.config.set({
        "distributed.scheduler.locks.lease-timeout": "300s",
        "distributed.scheduler.locks.lease-validation-interval": "30s",
    }):

        with Client(threads_per_worker=4, n_workers=1) as client:
            delayed_list=[]
            for year in baseline_time_range:
                os.makedirs(f"/scratch/v46/gt3409/TMP_{turbulence_index}", exist_ok=True)
                new_file = f"/scratch/v46/gt3409/TMP_{turbulence_index}/TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
                if os.path.exists(new_file):
                    print(f"File '{new_file}' already exists.")
                    continue
                
                print(f"Making {new_file}")
                delayed_list.append(delayed_turbulence_index(turbulence_index=turbulence_index,
                                                             year=year,
                                                             source_id=source_id,
                                                             experiment_id=experiment_id,
                                                             member_id=member_id,
                                                             P=P,
                                                             outfile=new_file,
                                                            ))
            print("compute ... ")
            dask.compute(*delayed_list)

  import pynvml


File '/scratch/v46/gt3409/TMP_VWS/TMP_VWS-200hPa_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1990.nc' already exists.
File '/scratch/v46/gt3409/TMP_VWS/TMP_VWS-200hPa_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1991.nc' already exists.
File '/scratch/v46/gt3409/TMP_VWS/TMP_VWS-200hPa_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1992.nc' already exists.
File '/scratch/v46/gt3409/TMP_VWS/TMP_VWS-200hPa_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1993.nc' already exists.
File '/scratch/v46/gt3409/TMP_VWS/TMP_VWS-200hPa_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1994.nc' already exists.
File '/scratch/v46/gt3409/TMP_VWS/TMP_VWS-200hPa_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1995.nc' already exists.
File '/scratch/v46/gt3409/TMP_VWS/TMP_VWS-200hPa_AUS-15_BARRA-R_evaluation_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr_1996.nc' already exists.
File '/scratch/v46/gt3409/TMP_VWS/TMP_VWS-200hPa_AUS-15_BARRA-R_evaluation_r1i1p1f1

In [15]:
# scenario ="evaluation"
# for run in dict_model_index[scenario]:
#     experiment_id, source_id, member_id = run.split("_")     
    
#     for year in baseline_time_range:
#         file = f"/scratch/v46/gt3409/TMP_{turbulence_index}/TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
#         ds = xr.open_dataset(file)
#         nulls = ds.isel({"lat":100, "lon":100})[turbulence_index].isnull().sum().values
#         if nulls == 0:
#             pass
#         else:
#             print(f"nulls = {nulls} for {file}")
#             os.remove(file)

## Define MOG p99 from BARRA-R


In [16]:
p99 = 0.00978766

In [17]:
%%time
# 20 year mid lat box p99 MOG

# calculate p99, for windspeed-250hPa = 75.92640943
# Determine threshold MOG from evaluation dataset
run = "evaluation_BARRA-R_r1i1p1f1"
experiment_id, source_id, member_id = run.split("_")

filelist = [f"/scratch/v46/gt3409/TMP_{turbulence_index}/TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc" 
 for year in baseline_time_range]

def _preprocess(ds, q=[0.99],):
    return ds.sel(lat=mid_lat_slice, lon=lon_slice).chunk({"time":-1, "lat":-1, "lon":-1}).quantile(q, dim=["time", "lat", "lon"])

ds = xr.open_mfdataset(filelist, decode_times=True, preprocess=_preprocess, combine="nested", concat_dim="time")
# ds = ds.compute()

nulls = ds[turbulence_index].isnull().sum().values
assert nulls==0,  print(f"nulls = {nulls}...")

p99 = ds.mean("time")[turbulence_index].values
p99


CPU times: user 56.3 s, sys: 47 s, total: 1min 43s
Wall time: 3min 4s


array([0.00978766])

# Now run all the calculations to make the files you need
## Frequency above p99
## and quantiles for evaluation

In [18]:
%%time
# for all scenarios, calculate the index, then calculate the monthly frequency above the BARRA-R baseline 99th percentile
# for baseline periods, also calculate the percentiles 
# output saved to /scratch/v46/gt3409/{turbulence_index}/{P}hPa/percentiles
# and /scratch/v46/gt3409/{turbulence_index}/{P}hPa/freq-above-p99
# TMP files removed 

# best with one worker, many threads for barra
# use 28 cores 4 threads, 2 workers

dict_years = {"evaluation":(1990, 2009), 
              "historical":(1979, 2014), 
              "future":(2015, 2100)}
dict_model_index = {"evaluation":["evaluation_BARRA-R_r1i1p1f1"], 
                    "historical":list_historical,
                    "future":list_future}

def p99freq_preprocess(ds):
    """Calculate annual frequency of exceeding p99 threshold"""
    return (ds>p99).resample({"time":"ME"},).mean(["time"], skipna=True)

def quantiles_preprocess(ds):
    """For evaluation from historical, calculate the monthly values of 1st to 99th quantiles within the mid lat box"""
    ds = ds.convert_calendar("standard")
    # rechunk such that there are as many chunks as there are years, 
    ds = ds.chunk({"time": -1, "lat": -1, "lon": -1})
    ds = ds.sel(lat=mid_lat_slice, lon=lon_slice)\
            .resample(time="ME")\
            .apply(lambda ds: ds.quantile(np.arange(0,1,0.01), dim=["lat", "lon", "time"]))
    return ds


# with Client(threads_per_worker=4, n_workers=2) as client:
#     for scenario in [ "historical", "future", "evaluation", ]:
#         # start_year, end_year = dict_years[scenario]
#         for run in dict_model_index[scenario]:
#             experiment_id, source_id, member_id = run.split("_")   
#             os.makedirs(f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa", exist_ok=True)
#             os.makedirs(f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/freq-above-p99", exist_ok=True)
#             p99_filename = f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/freq-above-p99/\
# {turbulence_index}-{P}hPa-monthly-freq-above-p99_AUS-15_{run}_BOM_BARPA-R_v1-r1_6hr.nc"
            
#             if scenario =="historical" or scenario=="evaluation":
#                 frequency = "monthly"
#                 perc_filename = f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/percentiles/\
# {turbulence_index}-{P}hPa-{frequency}-percentiles_AUS-15_{run}_BOM_BARPA-R_v1-r1_6hr.nc"
#                 perc_filename_exists = os.path.exists(perc_filename)
#             else:
#                 perc_filename_exists = True
            
#             if os.path.exists(p99_filename) and perc_filename_exists:
#                 print(f"{p99_filename} already exists, skipping {run}")
#                 continue
    
#             delayed_list=[]
#             start_year, end_year = dict_years[scenario]
#             for year in np.arange(start_year, end_year+1):
#                 tmp_file = f"/scratch/v46/gt3409/TMP_{turbulence_index}/\
# TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
#                 if os.path.exists(tmp_file):
#                     print(f"File '{tmp_file}' already exists.")
#                 else:
#                     print(f"Making {tmp_file}")
#                     delayed_list.append(delayed_turbulence_index(turbulence_index=turbulence_index,
#                                                           year=year,
#                                                           source_id=source_id,
#                                                           experiment_id=experiment_id,
#                                                           member_id=member_id,
#                                                                  P=P,
#                                                                  outfile=tmp_file
#                                                                 ))
                    
#             if len(delayed_list)>0:
#                 print("compute ... ")
#                 dask.compute(*delayed_list)
    
#             # compute freq
#             # multi years
#             filelist = [f"/scratch/v46/gt3409/TMP_{turbulence_index}/\
# TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc" 
#                         for year in np.arange(start_year, end_year+1)]
            
#             if os.path.exists(p99_filename):
#                 print(f"File '{p99_filename}' already exists.")
                
#             else:
#                 # calculate monthly freq above p99
#                 ds =  xr.open_mfdataset(filelist,
#                                        preprocess=p99freq_preprocess, 
#                                        combine="nested", 
#                                        concat_dim="time",
#                                        )\
#                     .assign_coords({"run":run})\
#                     .to_netcdf(p99_filename, compute=True)
#                 print(f"made {p99_filename}")
    
#             if scenario =="historical" or scenario=="evaluation":
#                 # calculate quantiles for evaluation period only
#                 frequency = "monthly"
#                 os.makedirs(f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/percentiles", exist_ok=True)
#                 perc_filename = f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/percentiles/\
# {turbulence_index}-{P}hPa-{frequency}-percentiles_AUS-15_{run}_BOM_BARPA-R_v1-r1_6hr.nc"
#                 if os.path.exists(perc_filename):
#                     print(f"File '{perc_filename}' already exists.")
#                 else:
#                     filelist_baseline = [f"/scratch/v46/gt3409/TMP_{turbulence_index}/\
# TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
#                                 for year in baseline_time_range]
                    
#                     ds = xr.open_mfdataset(filelist_baseline, 
#                                            combine="nested", 
#                                            concat_dim="time",
#                                            preprocess=quantiles_preprocess,
#                                           )\
#                             .assign_coords({"run":run})
#                     null= ds.isnull()[turbulence_index].sum().values
#                     # assert null==0, print(f"Number of null values: {null} in {run}.")
                    
#                     try:
#                         ds.to_netcdf(perc_filename, compute=True)
#                         print(f"Made '{perc_filename}'")
#                     except Exception as e:
#                         print(f"Error in {run}: {e}")
               
                    
    
#             # check that freq calculated correctly
#             # then delete all in filelist
#             for file in filelist:
#                 if os.path.exists(file):
#                     os.remove(file)
#                     print(f"File removed: {file}")
#                 else:
#                     print(f"File does not exist: {file}")
#                     pass
    


CPU times: user 8 μs, sys: 13 μs, total: 21 μs
Wall time: 35.5 μs


In [19]:
def run_p99freq_and_quantiles(scenario):
    """expects scenario as one of  [ "historical", "future", "evaluation", ]"""
    for run in dict_model_index[scenario]:
        experiment_id, source_id, member_id = run.split("_")   
        os.makedirs(f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa", exist_ok=True)
        os.makedirs(f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/freq-above-p99", exist_ok=True)
        p99_filename = f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/freq-above-p99/\
{turbulence_index}-{P}hPa-monthly-freq-above-p99_AUS-15_{run}_BOM_BARPA-R_v1-r1_6hr.nc"
        
        if scenario =="historical" or scenario=="evaluation":
            frequency = "monthly"
            perc_filename = f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/percentiles/\
{turbulence_index}-{P}hPa-{frequency}-percentiles_AUS-15_{run}_BOM_BARPA-R_v1-r1_6hr.nc"
            perc_filename_exists = os.path.exists(perc_filename)
        else:
            perc_filename_exists = True
        
        if os.path.exists(p99_filename) and perc_filename_exists:
            print(f"{p99_filename} already exists, skipping {run}")
            continue

        delayed_list=[]
        start_year, end_year = dict_years[scenario]
        for year in np.arange(start_year, end_year+1):
            tmp_file = f"/scratch/v46/gt3409/TMP_{turbulence_index}/\
TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
            if os.path.exists(tmp_file):
                print(f"File '{tmp_file}' already exists.")
            else:
                print(f"Making {tmp_file}")
                delayed_list.append(delayed_turbulence_index(turbulence_index=turbulence_index,
                                                      year=year,
                                                      source_id=source_id,
                                                      experiment_id=experiment_id,
                                                      member_id=member_id,
                                                             P=P,
                                                             outfile=tmp_file
                                                            ))
                
        if len(delayed_list)>0:
            print("compute ... ")
            dask.compute(*delayed_list)

        # compute freq
        # multi years
        filelist = [f"/scratch/v46/gt3409/TMP_{turbulence_index}/\
TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc" 
                    for year in np.arange(start_year, end_year+1)]
        
        if os.path.exists(p99_filename):
            print(f"File '{p99_filename}' already exists.")
            
        else:
            # calculate monthly freq above p99
            ds =  xr.open_mfdataset(filelist,
                                   preprocess=p99freq_preprocess, 
                                   combine="nested", 
                                   concat_dim="time",
                                   )\
                .assign_coords({"run":run})\
                .to_netcdf(p99_filename, compute=True)
            print(f"made {p99_filename}")

        if scenario =="historical" or scenario=="evaluation":
            # calculate quantiles for evaluation period only
            frequency = "monthly"
            os.makedirs(f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/percentiles", exist_ok=True)
            perc_filename = f"/scratch/v46/gt3409/{turbulence_index}/{P}hPa/percentiles/\
{turbulence_index}-{P}hPa-{frequency}-percentiles_AUS-15_{run}_BOM_BARPA-R_v1-r1_6hr.nc"
            if os.path.exists(perc_filename):
                print(f"File '{perc_filename}' already exists.")
            else:
                filelist_baseline = [f"/scratch/v46/gt3409/TMP_{turbulence_index}/\
TMP_{turbulence_index}-{P}hPa_AUS-15_{source_id}_{experiment_id}_{member_id}_BOM_BARPA-R_v1-r1_6hr_{year}.nc"
                            for year in baseline_time_range]
                
                ds = xr.open_mfdataset(filelist_baseline, 
                                       combine="nested", 
                                       concat_dim="time",
                                       preprocess=quantiles_preprocess,
                                      )\
                        .assign_coords({"run":run})
                null= ds.isnull()[turbulence_index].sum().values
                # assert null==0, print(f"Number of null values: {null} in {run}.")
                
                try:
                    ds.to_netcdf(perc_filename, compute=True)
                    print(f"Made '{perc_filename}'")
                except Exception as e:
                    print(f"Error in {run}: {e}")
           
                

        # check that freq calculated correctly
        # then delete all in filelist
        for file in filelist:
            if os.path.exists(file):
                os.remove(file)
                print(f"File removed: {file}")
            else:
                print(f"File does not exist: {file}")
                pass
    return


In [None]:
with Client(threads_per_worker=4, n_workers=1) as client:
    for scenario in [ "evaluation",]:
        run_p99freq_and_quantiles(scenario)

with Client(threads_per_worker=4, n_workers=2) as client:
    for scenario in [ "historical", "future",]:
        run_p99freq_and_quantiles(scenario)

  import pynvml


/scratch/v46/gt3409/VWS/200hPa/freq-above-p99/VWS-200hPa-monthly-freq-above-p99_AUS-15_evaluation_BARRA-R_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr.nc already exists, skipping evaluation_BARRA-R_r1i1p1f1


  import pynvml
  import pynvml


/scratch/v46/gt3409/VWS/200hPa/freq-above-p99/VWS-200hPa-monthly-freq-above-p99_AUS-15_historical_ACCESS-CM2_r4i1p1f1_BOM_BARPA-R_v1-r1_6hr.nc already exists, skipping historical_ACCESS-CM2_r4i1p1f1
/scratch/v46/gt3409/VWS/200hPa/freq-above-p99/VWS-200hPa-monthly-freq-above-p99_AUS-15_historical_ACCESS-ESM1-5_r6i1p1f1_BOM_BARPA-R_v1-r1_6hr.nc already exists, skipping historical_ACCESS-ESM1-5_r6i1p1f1
/scratch/v46/gt3409/VWS/200hPa/freq-above-p99/VWS-200hPa-monthly-freq-above-p99_AUS-15_historical_CESM2_r11i1p1f1_BOM_BARPA-R_v1-r1_6hr.nc already exists, skipping historical_CESM2_r11i1p1f1
/scratch/v46/gt3409/VWS/200hPa/freq-above-p99/VWS-200hPa-monthly-freq-above-p99_AUS-15_historical_CMCC-ESM2_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr.nc already exists, skipping historical_CMCC-ESM2_r1i1p1f1
/scratch/v46/gt3409/VWS/200hPa/freq-above-p99/VWS-200hPa-monthly-freq-above-p99_AUS-15_historical_EC-Earth3_r1i1p1f1_BOM_BARPA-R_v1-r1_6hr.nc already exists, skipping historical_EC-Earth3_r1i1p1f1
/scratch/v4

2026-01-22 21:56:46,100 - tornado.application - ERROR - Uncaught exception GET /individual-workers-memory/ws (127.0.0.1)
HTTPServerRequest(protocol='http', host='gadi-cpu-bdw-0602.gadi.nci.org.au:5338', method='GET', uri='/individual-workers-memory/ws', version='HTTP/1.1', remote_ip='127.0.0.1')
Traceback (most recent call last):
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.12/lib/python3.11/site-packages/tornado/websocket.py", line 965, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.12/lib/python3.11/site-packages/tornado/web.py", line 3375, in wrapper
    return method(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/g/data/xp65/public/apps/med_conda/envs/analysis3-25.12/lib/python3.11/site-packages/bokeh/server/views/ws.py", line 149, in open
    raise ProtocolEr