## UKCP 18 rainfall processing

- Author: Sam Hardy (modified from Changgui Wang's original Python code)

- Import Python code from `ukcp18_download_and_processing`

In [1]:
import os
import xarray as xr
from ukcp18_download_and_processing import call_api_and_process_ukcp_data
from netCDF4 import Dataset
from urllib.parse import urlparse

### Define variables

In [2]:
year = 2021
month = 3
month1 = 2
ensemble_member_id = 1
projection_id = 2 # 2021-2040 time slice
var_id = 'pr'
ukcp_url=f'https://dap.ceda.ac.uk/badc/ukcp18/data/land-cpm/uk/2.2km/rcp85/01/{var_id}/1hr/v20210615/{var_id}_rcp85_land-cpm_uk_2.2km_01_1hr_{year}{month}01-{year}{month}30.nc'
ukcp_local=[f'/mnt/metdata/2024s1475/InputData/{var_id}_rcp85_land-cpm_uk_2.2km_01_1hr_{year}{month:02d}01-{year}{month:02d}30.nc',
            f'/mnt/metdata/2024s1475/InputData/{var_id}_rcp85_land-cpm_uk_2.2km_01_1hr_{year}{month1:02d}01-{year}{month1:02d}30.nc']
out_file_path='./'
config_file_path=f'/mnt/metdata/2024s1475/InputData/config.json'

### Run the Python code using functions

In [3]:
nc_datasets = []

for filename in ukcp_local:
    nc_datasets.append(Dataset(filename))

xarray_datasets = []
for nc_data in nc_datasets:
    if nc_data is None:
        continue
    ds = xr.open_dataset(xr.backends.NetCDF4DataStore(nc_data), chunks={"time": 30})
    xarray_datasets.append(ds)

# combine datasets using xarray if required, and re-chunk
if len(xarray_datasets) > 1:
    combined_ds = xr.concat(xarray_datasets, dim="time").chunk({"time": 60})
elif xarray_datasets:
    xarray_datasets = xarray_datasets[0]
else:
    print("No datasets were opened!")

In [4]:
from typing import Dict
import dask
import xarray as xr
import dask.config
import timeit
import json
import convert_ll2str as c2str
from typing import Dict
from ukcp18_processing_functions import get_dry_days, get_month_total, get_start_year, rolling_window_sum, get_bin_counts

def load_config(config_path: str):
    with open(config_path, 'r') as file: 
        config = json.load(file)
        return config 

def process_ukcp_data(ds: xr.Dataset,
                      output_file_path: str, 
                      config: Dict,
                      year: int, 
                      month: int,
                      member_id: int,
                      proj_id: str,
                      mask_1D: xr.Dataset):
    """
    Read UKCP18 (climate model) daily precipitation data 

    input_file_path: path to input file url (API)
    output_file_path: path to output files
    year: year to analyse (e.g. 2035)
    month: month to analyse (e.g. 8 for August)
    var_id: cf-compliant variable ID (e.g. pr, tas)
    member_id: ensemble member ID (01, 04, 05, ..., 15)
    mask_1D: xarray Dataset containing 1D land-ocean mask 
    """

    with dask.config.set(**{'array.slicing.split_large_chunks': True}):
        ds = ds.stack(location=("grid_latitude", "grid_longitude"))
        ids = c2str.get_cell_ids(ds.location.values)
        ds.coords['location_id'] = ('location', ids)
        ds = ds.where(ds.bnds == 0, drop=True)
        for item in config["remove_items"]:
            del ds[item]
        for item in config["squeeze_coords"]:
            ds = ds.squeeze(item)

        ds = ds.where(mask_1D["WCID"] >= 0, drop=True)

        starttime = timeit.default_timer()

        for wcid in range(config["NUMBER_OF_WC"]):
            print(f"Working on water company {str(wcid)}")
            ds_mask = ds.where(mask_1D.WCID == wcid, drop=True)

            print(f"Starting dry days calculation...")
            get_dry_days(ds_mask, 
                            year, 
                            month,
                            member_id, 
                            wcid, 
                            proj_id, 
                            output_file_path)
            
            print(f"Starting total rainfall calculation...")
            get_month_total(ds_mask, 
                            year, 
                            month, 
                            member_id, 
                            wcid, 
                            proj_id, 
                            output_file_path)

            for duration_str, start_hour in config["accum_duration_start"].items():
                duration = int(duration_str)
                start = get_start_year(year, month, start_hour)
                precip = ds_mask.where(ds['time'] >= start, drop=True)

                print(f"Calculating {str(duration)}-h accumulated precip, starting at {str(start_hour)}Z")
                if duration > 1:
                    ds_window = rolling_window_sum(precip, duration)
                    ds_window = ds_window.rename({"pr": "pr_sum"})
                    ds_window = ds_window.assign(pr=precip.pr)
                    ds_window['time'] = ds_window["time"].dt.strftime("%Y-%m-%d %H:%M")
                    df_window = ds_window.to_dataframe()
                    df_window.index = df_window.index.droplevel(['grid_latitude', 'grid_longitude'])

                else:
                    ds_window = precip
                    ds_window = ds_window.rename({"pr": "pr_sum"})
                    ds_window = ds_window.assign(pr=precip.pr)
                    ds_window['time'] = ds_window["time"].dt.strftime("%Y-%m-%d %H:%M")
                    df_window = ds_window.to_dataframe()
                    df_window = df_window[df_window['month_number'] == month]
                    df_window.index = df_window.index.droplevel(['grid_latitude', 'grid_longitude'])

                print("Starting bin counts calculation!")
                get_bin_counts(df_window, 
                                year,
                                month, 
                                member_id, 
                                duration, 
                                wcid, 
                                output_file_path,
                                proj_id, 
                                config)

                precip = None

        print("This code took :", timeit.default_timer() - starttime," (s) to run...")

        ds.close()

In [5]:
config = load_config(config_file_path)
mask_ds = xr.open_dataset(config["mask_nc_filename"])
mask_1D = mask_ds.stack(location=("grid_latitude", "grid_longitude"))

process_ukcp_data(combined_ds, 
                  out_file_path,
                  config,
                  year, 
                  month, 
                  ensemble_member_id, 
                  projection_id, 
                  mask_1D)

Working on water company 0
Starting dry days calculation...
Starting total rainfall calculation...
Calculating 1-h accumulated precip, starting at 23Z


KeyboardInterrupt: 