## UKCP 18 rainfall processing test notebook

### Import required packages and local routines 

- import local routine: `convert_ll2str.py` 

In [1]:
import os
import numpy as np
import pandas as pd
import xarray as xr
import dask
import dask.config
import cftime
import timeit
import convert_ll2str as c2str
import json
import requests

from base64 import b64encode
from datetime import datetime, timezone
from getpass import getpass
from netCDF4 import Dataset
from urllib.parse import urlparse
from pathlib import Path

### Defining variables

- Calculating 3-h rolling precipitation totals 
- Calculating data for 13 water companies 

In [2]:
year = 2035
month = 9
ensemble_member_id = 4

In [3]:
HOME = str(Path.home())

mask_nc_filename = "UKWC_Cleaned_land-cpm_uk_2.2km.nc"
csv_filename = "YearsMonths_byBinCounts_Rand_OtherYears.csv"
projection_id = 2 # 2021-2040 time slice
var_id = 'pr'

BINS = {3: [2, 6, 10, 15, 20, 30, 40, 60, 80, 110, 140, 175, 215, 265]}

# these values represent the rainfall thresholds for each interval (1-h, 3-h, 6-h) corresponding to different RPs 
RPS = {3: [29, 35, 44, 49, 57]}

rp_years = [5, 10, 30, 50, 100]

global OUTPUT_PATH
global PROJECTION_ID

INPUT_PATH = '/mnt/c/Users/samhardy/OneDrive - JBA Group/2024s1475_RED_UP/ukcp_data/'
SAVE_PATH = '/mnt/c/Users/samhardy/OneDrive - JBA Group/2024s1475_RED_UP/'
NUMBER_OF_WC = 1

accum_duration_start = {3: 22}
MASK = None

remove_items = ['ensemble_member_id', 'grid_latitude_bnds', 'grid_longitude_bnds',
                'time_bnds','rotated_latitude_longitude', 'year', 'yyyymmddhh', 'ensemble_member']
squeeze_coords = ["bnds", "ensemble_member"]

## Code for retrieving UKCP data using an API 

In [4]:
"""
remote_nc_with_token.py
===================

Python script for reading a NetCDF file remotely from the CEDA archive. It demonstrates fetching
and using a download token to authenticate access to CEDA Archive data, as well as how to load
and subset the Dataset from a stream of data (diskless), without having to download the whole file.

Pre-requisites:

 - Python3.x
 - Python libraries (installed by Pip):

```
netCDF4
```

Usage:

```
$ python remote_nc_with_token.py <url> <var_id>
```

Example:

```
$ URL=https://dap.ceda.ac.uk/badc/ukcp18/data/marine-sim/skew-trend/rcp85/skewSurgeTrend/latest/skewSurgeTrend_marine-sim_rcp85_trend_2007-2099.nc
$ VAR_ID=skewSurgeTrend

$ python remote_nc_with_token.py $URL $VAR_ID
```

You will be prompted to provide your CEDA username and password the first time the script is run and
again if the token cached from a previous attempt has expired.

"""

# URL for the CEDA Token API service
TOKEN_URL = "https://services-beta.ceda.ac.uk/api/token/create/"
# Location on the filesystem to store a cached download token
TOKEN_CACHE = os.path.expanduser(os.path.join("~", ".cedatoken"))


def load_cached_token():
    """
    Read the token back out from its cache file.

    Returns a tuple containing the token and its expiry timestamp
    """

    # Read the token back out from its cache file
    try:
        with open(TOKEN_CACHE, "r") as cache_file:
            data = json.loads(cache_file.read())

            token = data.get("access_token")
            expires = datetime.strptime(data.get("expires"), "%Y-%m-%dT%H:%M:%S.%f%z")
            return token, expires

    except FileNotFoundError:
        return None, None


def get_token():
    """
    Fetches a download token, either from a cache file or
    from the token API using CEDA login credentials.

    Returns an active download token
    """

    # Check the cache file to see if we already have an active token
    token, expires = load_cached_token()

    # If no token has been cached or the token has expired, we get a new one
    now = datetime.now(timezone.utc)
    if not token or expires < now:

        if not token:
            print(f"No previous token found at {TOKEN_CACHE}. ", end="")
        else:
            print(f"Token at {TOKEN_CACHE} has expired. ", end="")
        print("Generating a fresh token...")

        print("Please provide your CEDA username: ", end="")
        username = input()
        password = getpass(prompt="CEDA user password: ")

        credentials = b64encode(f"{username}:{password}".encode("utf-8")).decode(
            "ascii"
        )
        headers = {
            "Authorization": f"Basic {credentials}",
        }
        response = requests.request("POST", TOKEN_URL, headers=headers)
        if response.status_code == 200:

            # The token endpoint returns JSON
            response_data = json.loads(response.text)
            token = response_data["access_token"]

            # Store the JSON data in the cache file for future use
            with open(TOKEN_CACHE, "w") as cache_file:
                cache_file.write(response.text)

        else:
            print("Failed to generate token, check your username and password.")

    else:
        print(f"Found existing token at {TOKEN_CACHE}, skipping authentication.")

    return token, expires


def open_datasets(urls: list[str],
                  download_token=None
                  ):
    """ 
    Open a list of NetCDF datasets from specified URLs. 
    """

    datasets = []
    headers = None

    if download_token:
        headers = {"Authorization": f"Bearer {download_token}"}

    for url in urls:
        response = requests.request("GET", url, headers=headers, stream=True)
        if response.status_code != 200:
            print(
                f"Failed to fetch data. The response from the server was {response.status_code}"
            )
            return
        
        filename = os.path.basename(urlparse(url).path)
        print(f"Opening Dataset from file {filename} ...")
        datasets.append(Dataset(filename, memory=response.content))

    return datasets


def initiate_opendap_multiple_files(urls: list[str], 
                                    var_id: str
                                    ) -> xr.Dataset:
    """ 
    Initiate an API call to download UKCP18 data for multiple year-month selections 

    Returns an xarray.dataset, concatenated if necessary and chunked using dask to reduce memory usage
    """
    token, expires = get_token()
    if token:
        # Now that we have a valid token, we can attempt to open the Dataset from a URL.
        # This will only work if the token is associated with a CEDA user that has been granted
        # access to the data (i.e. if they can already download the file in a browser).
        # 
        print(f"Fetching information about variable '{var_id}':")
        if token:
            print((
                f"Using download token '{token[:5]}...{token[-5:]}' for authentication."
                f" Token expires at: {expires}."
            ))
        else:
            print("No DOWNLOAD_TOKEN found in environment.")

        nc_datasets = open_datasets(urls, download_token=token)

        xarray_datasets = []
        for nc_data in nc_datasets:
            if nc_data is None:
                continue

            ds = xr.open_dataset(xr.backends.NetCDF4DataStore(nc_data), chunks={"time": 30})
            xarray_datasets.append(ds)

        # combine datasets using xarray if required 
        if len(xarray_datasets) > 1:
            combined_ds = xr.concat(xarray_datasets, dim="time").chunk({"time": 30})
            return combined_ds
        elif xarray_datasets:
            return xarray_datasets[0]
        else:
            print("No datasets were opened!")
            return None


## Main code block

In [5]:
def get_dry_days(da_precip: xr.Dataset,
                 year: int, 
                 month: int, 
                 member_id: int, 
                 wcid: int):
    """
    Dry day counts from UKCP18 daily precipitation data (xr.ds)
    Calls `save_dry_counts` to write data out to csv file 
    """

    with dask.config.set(**{'array.slicing.split_large_chunks': False}):
        da_day_dry = da_precip.where(da_precip < 0.1)
        da_dry_count = da_day_dry.count(dim="time")
        save_dry_counts(da_dry_count, year, month, member_id, wcid)

        if month == 9:
            start9 = cftime.Datetime360Day(year, month, 15, 0, 30, 0)
            ds_month9 = da_precip.where(da_precip['time'] <= start9, drop=True)
            da_day_dry9 = ds_month9.where(ds_month9 < 0.1)
            da_dry_count9 = da_day_dry9.count(dim="time")
            save_dry_counts(da_dry_count9, year, 13, member_id, wcid)


def get_bin_counts(df_prcp: pd.DataFrame, 
                   year: int, 
                   month: int, 
                   member_id: int, 
                   duration: int, 
                   wcid):
    """
    Calculate rainfall counts for specified bins relevant to the chosen event duration (e.g. 1-h, 3-h, 6-h)
    """
    bins = BINS[duration]
    filename = os.path.join(OUTPUT_PATH, f"Rainfall_bin_counts_{duration}h_ens{member_id}_proj{PROJECTION_ID}.csv")
    cols = ['Projection_slice_ID', 'Member', 'Year', 'Month', 'WCID', 'Bin counts']

    total_count = []

    for i in range(1, len(bins)):
        df_count = df_prcp[(df_prcp['pr_sum'] > bins[i - 1]) & (df_prcp['pr_sum'] < bins[i])]
        total_count.append(df_count.shape[0])
    data_list = [PROJECTION_ID, member_id, year, month, wcid, total_count]
    total_count_df = pd.DataFrame([data_list], columns=cols)
    save(filename, total_count_df)

    data_list = None
    total_count = None
    total_count_df = None

    if month == 9:
        df_prcp_copy = df_prcp
        df_prcp_copy.insert(0, 'Time', df_prcp_copy.index)
        start_sept_date = cftime.Datetime360Day(year, month, 15, 0, 30, 0)
        df_prcp_copy = df_prcp_copy[df_prcp_copy['Time'] <= start_sept_date]
        for i in range(1, len(bins)):
            df_count = df_prcp_copy[(df_prcp_copy['pr_sum'] > bins[i - 1]) & (df_prcp_copy['pr_sum'] < bins[i])]
            total_count.append(df_count.shape[0])
        data_list = [PROJECTION_ID, member_id, year, month, wcid, total_count]
        total_count_df = pd.DataFrame([data_list], columns=cols)
        save(filename, total_count_df)

        data_list = None
        total_count = None
        total_count_df = None


def save_dry_counts(ds_dry_days: xr.Dataset, 
                    year: int, 
                    month: int, 
                    member_id: int, 
                    wcid: int):
    """ 
    Save dry day count data to a csv file 
    Called by `get_dry_days`
    """
    filename = os.path.join("./", f"Dry_days_counts_ens{member_id}_proj{PROJECTION_ID}.csv")

    ds_mask = ds_dry_days.mean()
    dry_list = [PROJECTION_ID, member_id, year, month, wcid, ds_mask.pr.values.tolist()]
    cols = ['Projection_slice_ID', 'Member', 'Year', 'Month', 'WCID', 'Mean dry day counts']
    df = pd.DataFrame([dry_list], columns=cols)
    save(filename, df)


def get_file_name(year: int, 
                  month: int , 
                  ensemble_member: int
                  ) -> str:
    """ 
    Return string for UKCP file name specific to a month, year and ensemble member
    """
    start_date = f"{year:04d}{month:02d}01"
    url=f"https://dap.ceda.ac.uk/badc/ukcp18/data/land-cpm/uk/2.2km/rcp85/{ensemble_member:02d}/pr/1hr/v20210615/"
    file_main = f"pr_rcp85_land-cpm_uk_2.2km_{ensemble_member:02d}_1hr_{start_date}"
    file_name = os.path.join(url, file_main + f"-{year:04d}{month:02d}30.nc")

    return file_name


def get_start_year(year: int, 
                   month: int, 
                   hour: int):
    """ 
    This function provides a buffer around the selected date
    Starts the analysis on the 30th of the previous month
    (i.e. 30th June 1981 if the user chose July 1981)
    """
    if year != 1980:
        year1 = year #1981
        month1 = month - 1 #6
        if month == 1:
            year1 = year - 1
            month1 = 12
        start = cftime.Datetime360Day(year1, month1, 30, hour, 0, 0)
    else:
        month = 12
        start = cftime.Datetime360Day(year, month, 1, 0, 30, 0)

    return start


def check_dir(file_name: str):
    """ 
    check if a directory exists, and create one if not 
    """
    directory = os.path.dirname(file_name)
    if not os.path.exists(directory):
        os.makedirs(directory)


def save(file_name: str, 
         df: pd.DataFrame
         ):
    """ 
    save pandas dataframe as a csv 
    """
    check_dir(file_name)
    if os.path.isfile(file_name):
        df.to_csv(file_name, mode='a', header=False, index=False, float_format="%.2f")
    else:
        df.to_csv(file_name, mode='a', index=False, float_format="%.2f")


def rolling_window_sum(ds: xr.Dataset, 
                       window_size: int
                       ) -> xr.Dataset:
    """
    rolling window calculation for an xr.ds by defined window size (1-h, 3-h, 6-h, etc)
    """
    print(f"Starting calculation of rolling {str(window_size)}-h accumulated precip!")
    ds_window = ds.rolling(time=window_size, min_periods=window_size).construct("new").sum("new", skipna=True)
    print(f"Finished calculating rolling {str(window_size)}-h accumulated precip!")

    return ds_window

## Run the notebook 

In [6]:
year = 2035
month = 9
member_id = 4

def run_notebook_functions(INPUT_PATH: str, 
                           mask_nc_filename: str, 
                           projection_id: int, 
                           member_id: int,
                           month: int,
                           year: int): 
    """ 
    Run specified functions to process UKCP18 daily precipitation data for a given date (month,year), ensemble member and time slice 
    """
    global PROJECTION_ID
    PROJECTION_ID = projection_id
    profile_selected_month = pd.read_csv(os.path.join(INPUT_PATH, "YearsMonths_byBinCounts_Rand_OtherYears.csv"))

    mask_nc = os.path.join(INPUT_PATH, mask_nc_filename)
    mask_orig = xr.open_dataset(mask_nc)
    mask_1D = mask_orig.stack(location=("grid_latitude", "grid_longitude"))

    projection_profile = profile_selected_month[profile_selected_month['Projection_slice_ID']
                                                == projection_id]
    
    # call_main function 
    df_row = projection_profile[(projection_profile['Month'] == month) & (projection_profile['Year'] == year)]
    if df_row.empty:
        print(f"No data found for Month: {month}, Year: {year}")
    
    year = int(df_row['Year'].iloc[0])
    month = int(df_row['Month'].iloc[0])

    year1 = year
    month1 = month - 1
    if month == 1:
        year1 = year - 1
        month1 = 12

    file_name = get_file_name(year, month, member_id)
    pre_file_name = get_file_name(year1, month1, member_id)

    if (year==1980 and month==12) or (year==2020 and month==12) or (year==2060 and month==12):
        infile = [file_name]
    else:
        infile = [pre_file_name, file_name]

    with initiate_opendap_multiple_files(infile,var_id) as ds:
        print("Finished reading in data from Opendap!")

        with dask.config.set(**{'array.slicing.split_large_chunks': True}):
            ds = ds.stack(location=("grid_latitude", "grid_longitude"))
            ids = c2str.get_cell_ids(ds.location.values)
            ds.coords['location_id'] = ('location', ids)
            ds = ds.where(ds.bnds == 0, drop=True)
            for item in remove_items:
                del ds[item]
            for item in squeeze_coords:
                ds = ds.squeeze(item)

            ds = ds.where(mask_1D["WCID"] >= 0, drop=True)

            starttime = timeit.default_timer()

            print(f"Working on water company {str(0)}")
            ds_mask = ds.where(mask_1D.WCID == 0, drop=True)
            duration = 3; start_hour = 22
            start = get_start_year(year, month, start_hour)
            precip = ds_mask.where(ds['time'] >= start, drop=True)
            print(f"Calculating {str(duration)}-h accumulated precip, starting at {str(start_hour)}Z")

            ds_window = rolling_window_sum(precip, duration)
            ds_window = ds_window.rename({"pr": "pr_sum"})
            ds_window = ds_window.assign(pr=precip.pr)
            ds_window['time'] = ds_window["time"].dt.strftime("%Y-%m-%d %H:%M")
            df_window = ds_window.to_dataframe()
            df_window.index = df_window.index.droplevel(['grid_latitude', 'grid_longitude'])
            
    return precip, df_window

ds_precip, df_precip = run_notebook_functions(INPUT_PATH, mask_nc_filename, projection_id, member_id, month, year)

Found existing token at /home/shardy08/.cedatoken, skipping authentication.
Fetching information about variable 'pr':
Using download token 'eyJhb...A_LBQ' for authentication. Token expires at: 2024-12-19 14:00:40.166194+00:00.
Opening Dataset from file pr_rcp85_land-cpm_uk_2.2km_04_1hr_20350801-20350830.nc ...
Opening Dataset from file pr_rcp85_land-cpm_uk_2.2km_04_1hr_20350901-20350930.nc ...
Finished reading in data from Opendap!
Working on water company 0
Calculating 3-h accumulated precip, starting at 22Z
Starting calculation of rolling 3-h accumulated precip!
Finished calculating rolling 3-h accumulated precip!


### `get_bin_counts` function

In [7]:
def str_to_cftime360(time_str: str) -> cftime:
    """ 
    Apply string to cftime360 conversion to each item in an iterable 
    Turn '2024-09-15 00:30' into '2024-09-15-00-30' and then split by '-'
    Final result: ['2024', '09', '15', '00', '30']
    """
    year, month, day, hour, minute = map(int, time_str.replace(":", "-").replace(" ", "-").split("-"))
    return cftime.Datetime360Day(year, month, day, hour, minute)

In [8]:
df_prcp_copy = df_precip
df_prcp_copy.insert(0, 'Time', df_prcp_copy.index)
df_prcp_copy['Time'] = df_prcp_copy['Time'].apply(str_to_cftime360)
mid_sept_date = cftime.Datetime360Day(year, month, 15, 0, 30, 0)
start_sept_date = cftime.Datetime360Day(year, month, 1, 0, 30, 0)
df_prcp_copy = df_prcp_copy[(df_prcp_copy['Time'] >= start_sept_date) & (df_prcp_copy['Time'] <= mid_sept_date)]

In [11]:
df_prcp_copy

Unnamed: 0_level_0,Time,grid_latitude,grid_longitude,latitude,longitude,month_number,location_id,pr_sum,pr
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2035-09-01 00:30,2035-09-01 00:30:00,2.840050,358.130737,55.297592,-5.780537,9,UK_00284N_35813E,0.000012,5.973412e-06
2035-09-01 00:30,2035-09-01 00:30:00,2.840050,358.150757,55.298496,-5.745451,9,UK_00284N_35815E,0.000679,6.582984e-04
2035-09-01 00:30,2035-09-01 00:30:00,2.840050,358.170746,55.299389,-5.710418,9,UK_00284N_35817E,0.005860,5.760635e-03
2035-09-01 00:30,2035-09-01 00:30:00,2.860050,358.130737,55.317571,-5.782135,9,UK_00286N_35813E,0.000010,3.315604e-06
2035-09-01 00:30,2035-09-01 00:30:00,2.860050,358.150757,55.318475,-5.747032,9,UK_00286N_35815E,0.000369,3.553440e-04
...,...,...,...,...,...,...,...,...,...
2035-09-15 00:30,2035-09-15 00:30:00,8.062849,360.650757,60.556943,-1.189136,9,UK_00806N_36065E,0.168826,1.276793e-01
2035-09-15 00:30,2035-09-15 00:30:00,8.062849,360.670746,60.556575,-1.148880,9,UK_00806N_36067E,1.098274,8.586697e-01
2035-09-15 00:30,2035-09-15 00:30:00,8.062849,360.690735,60.556195,-1.108626,9,UK_00806N_36069E,0.000004,1.027349e-06
2035-09-15 00:30,2035-09-15 00:30:00,8.062849,360.710754,60.555804,-1.068311,9,UK_00806N_36071E,0.000026,2.433108e-05


: 

In [9]:
# duration = 3
# wcid = 0
# bins = BINS[duration]
# filename = os.path.join("./", f"Rainfall_bin_counts_{duration}h_ens{member_id}_proj{PROJECTION_ID}.csv")
# cols = ['Projection_slice_ID', 'Member', 'Year', 'Month', 'WCID', 'Bin counts']
# total_count = []

# for i in range(1, len(bins)):
#     df_count = df_precip[(df_precip['pr_sum'] > bins[i - 1]) & (df_precip['pr_sum'] < bins[i])]
#     total_count.append(df_count.shape[0])
# data_list = [PROJECTION_ID, member_id, year, month, wcid, total_count]
# total_count_df = pd.DataFrame([data_list], columns=cols)
# save(filename, total_count_df)

# data_list = None
# total_count = None
# total_count_df = None

# if month == 9:
#     df_prcp_copy = df_precip
#     df_prcp_copy.insert(0, 'Time', df_prcp_copy.index)
#     start_sept_date = cftime.Datetime360Day(year, month, 15, 0, 30, 0)
#     df_prcp_copy = df_prcp_copy[df_prcp_copy['Time'] <= start_sept_date]
#     for i in range(1, len(bins)):
#         df_count = df_prcp_copy[(df_prcp_copy['pr_sum'] > bins[i - 1]) & (df_prcp_copy['pr_sum'] < bins[i])]
#         total_count.append(df_count.shape[0])
#     data_list = [PROJECTION_ID, member_id, year, month, wcid, total_count]
#     total_count_df = pd.DataFrame([data_list], columns=cols)
#     save(filename, total_count_df)

### `get_dry_days` function

In [10]:
# ds_month = ds_precip.where(ds_precip['time'].dt.month == month, drop=True)
# daily_precip = ds_precip['pr'].resample(time="D").sum()
# da_dry_days = daily_precip.where(daily_precip < 0.1)
# da_dry_day_count = da_dry_days.count(dim="time")

# if month == 9:
#     start_sept = cftime.Datetime360Day(year, month, 15, 0, 30, 0)
#     ds_sept = ds_precip.where((ds_precip['time'] <= start_sept) & (ds_precip['time'].dt.month == month), drop=True)
#     daily_precip = ds_sept['pr'].resample(time="D").sum()
#     da_dry_days_sept = daily_precip.where(daily_precip < 0.1)
#     da_dry_day_count_sept = da_dry_days_sept.count(dim="time")
#     #save_dry_counts(da_dry_day_count_sept, year, 13, member_id, 0)

# filename = os.path.join("./", f"Dry_days_counts_ens{member_id}_proj{PROJECTION_ID}.csv")

# ds_mask = da_dry_day_count_sept.mean()
# dry_list = [PROJECTION_ID, member_id, year, month, 0, ds_mask.values.tolist()]
# cols = ['Projection_slice_ID', 'Member', 'Year', 'Month', 'WCID', 'Mean dry day counts']
# df = pd.DataFrame([dry_list], columns=cols)
# save(filename, df)