In [13]:
from multiprocessing import shared_memory
import sys

import matplotlib
import numpy as np
from tqdm import tqdm
import xarray as xr

climate_indices_home_path = "/home/james/git/climate_indices"
if climate_indices_home_path not in sys.path:
    sys.path.append(climate_indices_home_path)
from climate_indices import compute, indices, utils

%matplotlib inline

Open the precipitation dataset as an xarray Dataset object.

In [14]:
ds_prcp = xr.open_dataset("/home/james/data/nclimgrid/nclimgrid_lowres_prcp.nc")

Get the precipitation data and reshape the array to have the time dimension as the inner-most axis:

In [15]:
da_prcp = ds_prcp['prcp'].transpose('lat', 'lon', 'time')

Create a NumPy array backed by shared memory and copy the precipitation data into it:

In [10]:
# create a shared memory array for precipitation, copy the precipitation data
# into it, then replace the original underlying array with the shared memory
shm_prcp = shared_memory.SharedMemory(create=True, size=da_prcp.data.nbytes)
shared_prcp = np.ndarray(da_prcp.shape, dtype=da_prcp.dtype, buffer=shm_prcp.buf)
shared_prcp[:,:,:] = da_prcp[:,:,:]
da_prcp.data = shared_prcp

In [11]:
shm_name_prcp = shm_prcp.name
shm_name_prcp

'psm_44add675'

In [43]:
initial_year = int(da_prcp['time'][0].dt.year)
calibration_year_initial = 1900
calibration_year_final = 2000
period_times = 12
total_lats = da_prcp.shape[0]
total_lons = da_prcp.shape[1]
fitting_shape = (total_lats, total_lons, period_times)
scales = [1]  # , 2, 3, 6, 9, 12, 24]
periodicity = compute.Periodicity.monthly

Define a function that can be used to compute the gamma fitting parameters for a particular month scale:

In [44]:
def compute_gammas(
    da_precip: xr.DataArray,
    scale: int,
    calibration_year_initial,
    calibration_year_final,
    periodicity: compute.Periodicity,
) -> (xr.DataArray, xr.DataArray):
    
    initial_year = int(da_precip['time'][0].dt.year)
    if periodicity == compute.Periodicity.monthly:
        period_times = 12
        gamma_time_coord = "month"
    elif periodicity == compute.Periodicity.daily:
        period_times = 366
        gamma_time_coord = "day"
    gamma_coords={"lat": ds.lat, "lon": ds.lon, gamma_time_coord: range(period_times)}
    total_lats = da_precip.shape[0]
    total_lons = da_precip.shape[1]
    fitting_shape = (total_lats, total_lons, period_times)
    
    # create a shared memory array for precipitation, copy the precipitation data
    # into it, then replace the original underlying array with the shared memory
    shm_prcp = shared_memory.SharedMemory(create=True, size=da_prcp.data.nbytes)
    shared_prcp = np.ndarray(da_precip.shape, dtype=da_precip.dtype, buffer=shm_prcp.buf)
    shared_prcp[:,:,:] = da_precip[:,:,:]
    shm_name_prcp = shm_prcp.name

    # create a shared memory array for the gamma distribution alpha and beta fitting parameters
    shm_alpha = shared_memory.SharedMemory(create=True)  # , size=da_prcp.data.nbytes)
    shm_name_alpha = shm_alpha.name
    alphas = np.full(shape=fitting_shape, fill_value=np.NaN, buffer=shm_alpha.buf)
    shm_beta = shared_memory.SharedMemory(create=True)  # , size=da_prcp.data.nbytes)
    shm_name_beta = shm_beta.name
    betas = np.full(shape=fitting_shape, fill_value=np.NaN, buffer=shm_beta.buf)

    # loop over the grid cells and add a new arguments element for each
                resize_arguments = {
                "file_id": file_id,
                "image_ext": image_ext,
                "annotation_format": annotation_format,
                "annotation_ext": annotation_ext,
                "input_images_dir": input_images_dir,
                "input_annotations_dir": input_annotations_dir,
                "output_images_dir": output_images_dir,
                "output_annotations_dir": output_annotations_dir,
                "new_width": new_width,
                "new_height": new_height,
            }
            resize_arguments_list.append(resize_arguments)

    arguments_list = []
    for lat_index in range(total_lats):
        for lon_index in range(total_lons):

            arguments = {
                'lat_index': lat_index,
                'lon_index': lon_index,
                'shm_name_prcp': shm_name_prcp,
                'shm_name_alpha': shm_name_alpha,
                'shm_name_beta': shm_name_beta,
                'scale': scale,
                'calibration_year_initial': calibration_year_initial,
                'calibration_year_final': calibration_year_final,
                'periodicity': periodicity,
            }
            arguments_list.append(arguments)

    # use a ProcessPoolExecutor to download the images in parallel
    with concurrent.futures.ProcessPoolExecutor() as executor:

        _logger.info("Computing gamma fitting parameters")

        # use the executor to map the download function to the iterable of arguments
        list(tqdm(executor.map(compute_gammas_gridcell, arguments_list), total=len(arguments_list)))
        
    alpha_attrs = {
        'description': 'shape parameter of the gamma distribution (also referred to as the concentration) ' + \
        f'computed from the {scale}-month scaled precipitation values',
    }
    da_alpha = xr.DataArray(
        data=alphas,
        coords=gamma_coords,
        dims=tuple(gamma_coords.keys()),
        name=f"alpha_{str(scale).zfill(2)}",
        attrs=alpha_attrs,
    )
    beta_attrs = {
        'description': '1 / scale of the distribution (also referred to as the rate) ' + \
        f'computed from the {scale}-month scaled precipitation values',
    }
    da_beta = xr.DataArray(
        data=betas,
        coords=gamma_coords,
        dims=tuple(gamma_coords.keys()),
        name=f"beta_{str(scale).zfill(2)}",
        attrs=beta_attrs,
    )

    return da_alpha, da_beta

In [44]:
def compute_gammas_gridcell(
    arguments: Dict,
):
    
    initial_year = int(da_precip['time'][0].dt.year)
    if periodicity == compute.Periodicity.monthly:
        period_times = 12
        gamma_time_coord = "month"
    elif periodicity == compute.Periodicity.daily:
        period_times = 366
        gamma_time_coord = "day"
    gamma_coords={"lat": ds.lat, "lon": ds.lon, gamma_time_coord: range(period_times)}
    total_lats = da_precip.shape[0]
    total_lons = da_precip.shape[1]
    fitting_shape = (total_lats, total_lons, period_times)
    
    # create a shared memory array for precipitation, copy the precipitation data
    # into it, then replace the original underlying array with the shared memory
    shm_prcp = shared_memory.SharedMemory(create=True, size=da_prcp.data.nbytes)
    shared_prcp = np.ndarray(da_precip.shape, dtype=da_precip.dtype, buffer=shm_prcp.buf)
    shared_prcp[:,:,:] = da_precip[:,:,:]
    shm_name_prcp = shm_prcp.name

    shm_alpha = shared_memory.SharedMemory(create=True)  # , size=da_prcp.data.nbytes)
    shm_name_alpha = shm_alpha.name
    alphas = np.full(shape=fitting_shape, fill_value=np.NaN, buffer=shm_alpha.buf)
    shm_beta = shared_memory.SharedMemory(create=True)  # , size=da_prcp.data.nbytes)
    shm_name_beta = shm_beta.name
    betas = np.full(shape=fitting_shape, fill_value=np.NaN, buffer=shm_beta.buf)

    # loop over the grid cells and add a new arguments element for each
                resize_arguments = {
                "file_id": file_id,
                "image_ext": image_ext,
                "annotation_format": annotation_format,
                "annotation_ext": annotation_ext,
                "input_images_dir": input_images_dir,
                "input_annotations_dir": input_annotations_dir,
                "output_images_dir": output_images_dir,
                "output_annotations_dir": output_annotations_dir,
                "new_width": new_width,
                "new_height": new_height,
            }
            resize_arguments_list.append(resize_arguments)

    arguments_list = []
    for lat_index in range(total_lats):
        for lon_index in range(total_lons):

            arguments = {
                'lat_index': lat_index,
                'lon_index': lon_index,
                'shm_name_prcp': shm_name_prcp,
                'shm_name_alpha': shm_name_alpha,
                'shm_name_beta': shm_name_beta,
                'scale': scale,
                'calibration_year_initial': calibration_year_initial,
                'calibration_year_final': calibration_year_final,
                'periodicity': periodicity,
            }
            # get the precipitation values for the lat/lon grid cell
            values = da_precip[lat_index, lon_index]

            # skip over this grid cell if all NaN values
            if (np.ma.is_masked(values) and values.mask.all()) or np.all(np.isnan(values)):
                continue

            # convolve to scale
            scaled_values = \
                compute.scale_values(
                    values,
                    scale=scale,
                    periodicity=periodicity,
                )

            # compute the fitting parameters on the scaled data
            alphas[lat_index, lon_index], betas[lat_index, lon_index] = \
                compute.gamma_parameters(
                    scaled_values,
                    data_start_year=initial_year,
                    calibration_start_year=calibration_year_initial,
                    calibration_end_year=calibration_year_final,
                    periodicity=periodicity,
                )

    # use a ProcessPoolExecutor to download the images in parallel
    with concurrent.futures.ProcessPoolExecutor() as executor:

        _logger.info("Resizing files")

        # use the executor to map the download function to the iterable of arguments
        list(tqdm(executor.map(resize_image, resize_arguments_list),
                  total=len(resize_arguments_list)))
        
    alpha_attrs = {
        'description': 'shape parameter of the gamma distribution (also referred to as the concentration) ' + \
        f'computed from the {scale}-month scaled precipitation values',
    }
    da_alpha = xr.DataArray(
        data=alphas,
        coords=gamma_coords,
        dims=tuple(gamma_coords.keys()),
        name=f"alpha_{str(scale).zfill(2)}",
        attrs=alpha_attrs,
    )
    beta_attrs = {
        'description': '1 / scale of the distribution (also referred to as the rate) ' + \
        f'computed from the {scale}-month scaled precipitation values',
    }
    da_beta = xr.DataArray(
        data=betas,
        coords=gamma_coords,
        dims=tuple(gamma_coords.keys()),
        name=f"beta_{str(scale).zfill(2)}",
        attrs=beta_attrs,
    )

    return da_alpha, da_beta

Define a function that can be used to compute the SPI for a particular month scale:

In [45]:
def compute_spi_gamma(
        da_precip: xr.DataArray,
        da_alpha: xr.DataArray,
        da_beta: xr.DataArray,
        scale: int,
        periodicity: compute.Periodicity,
) -> xr.DataArray:
    
    initial_year = int(da_precip['time'][0].dt.year)
    total_lats = da_precip.shape[0]
    total_lons = da_precip.shape[1]
    spi = np.full(shape=da_precip.shape, fill_value=np.NaN)

    for lat_index in range(total_lats):
        for lon_index in range(total_lons):

            # get the values for the lat/lon grid cell
            values = da_precip[lat_index, lon_index]

            # skip over this grid cell if all NaN values
            if (np.ma.is_masked(values) and values.mask.all()) or np.all(np.isnan(values)):
                continue

            gamma_parameters = {
                "alphas": da_alpha[lat_index, lon_index],
                "betas": da_beta[lat_index, lon_index],
            }

            # compute the SPI
            spi[lat_index, lon_index] = \
                indices.spi(
                    values,
                    scale=scale,
                    distribution=indices.Distribution.gamma,
                    data_start_year=initial_year,
                    calibration_year_initial=calibration_year_initial,
                    calibration_year_final=calibration_year_final,
                    periodicity=compute.Periodicity.monthly,
                    fitting_params=gamma_parameters,
                )

    # build a DataArray for this scale's SPI
    da_spi = xr.DataArray(
        data=spi,
        coords=da_precip.coords,
        dims=da_precip.dims,
        name=f"spi_gamma_{str(scale).zfill(2)}",
    )
    da_spi.attrs = {
        'description': f'SPI ({scale}-{periodicity} gamma) computed from monthly precipitation ' + \
            f'data for the period {da_precip.time[0]} through {da_precip.time[-1]} using a ' + \
            f'calibration period from {calibration_year_initial} through {calibration_year_final}',
        'valid_min': -3.09,
        'valid_max': 3.09,
        'long_name': f'{scale}-{periodicity} SPI(gamma)',
        'calibration_year_initial': calibration_year_initial,
        'calibration_year_final': calibration_year_final,
    }

    return da_spi

Copy the attributes from the precipitation dataset that will be applicable to the corresponding gamma fitting parameters and SPI datasets:

In [46]:
attrs_to_copy = [
    'Conventions',
    'ncei_template_version',
    'naming_authority',
    'standard_name_vocabulary',
    'institution',
    'geospatial_lat_min',
    'geospatial_lat_max',
    'geospatial_lon_min',
    'geospatial_lon_max',
    'geospatial_lat_units',
    'geospatial_lon_units',
]
global_attrs = {key: value for (key, value) in ds.attrs.items() if key in attrs_to_copy}

Compute the gamma fitting parameters for all scales and add these into a Dataset that we'll write to NetCDF:

In [None]:
%%time
if periodicity == compute.Periodicity.monthly:
    period_times = 12
    gamma_time_coord = "month"
elif periodicity == compute.Periodicity.daily:
    period_times = 366
    gamma_time_coord = "day"
ds_gamma = xr.Dataset(
    coords={"lat": ds.lat, "lon": ds.lon, gamma_time_coord: range(period_times)},
    attrs=global_attrs,
)
for scale in scales:
    var_name_alpha = f"alpha_{str(scale).zfill(2)}"
    var_name_beta = f"beta_{str(scale).zfill(2)}"
    da_alpha, da_beta = compute_gammas(
        da_prcp,
        scale,
        calibration_year_initial,
        calibration_year_final,
        periodicity,
    )
    ds_gamma[f"alpha_{str(scale).zfill(2)}"] = da_alpha
    ds_gamma[f"beta_{str(scale).zfill(2)}"] = da_beta
    
netcdf_gamma = '/home/james/data/nclimgrid/nclimgrid_lowres_gamma.nc'
ds_gamma.to_netcdf(netcdf_gamma)

Compute the SPI using the pre-computed gamma fitting parameters for all scales and add these into a SPI(gamma) Dataset that we'll write to NetCDF:

In [None]:
%%time
ds_spi = xr.Dataset(
    coords=ds.coords,
    attrs=global_attrs,
)
for scale in scales:
    var_name_alpha = f"alpha_{str(scale).zfill(2)}"
    var_name_beta = f"beta_{str(scale).zfill(2)}"
    da_spi = compute_spi_gamma(
        da_prcp: xr.DataArray,
        ds_gamma[f'alpha_{str(scale).zfill(2)}'],
        ds_gamma[f'beta_{str(scale).zfill(2)}'],
        scale,
        periodicity,
    )
    ds_spi[f"spi_gamma_{str(scale).zfill(2)}"] = da_spi
    
netcdf_spi = '/home/james/data/nclimgrid/nclimgrid_lowres_spi_gamma.nc'
ds_spi.to_netcdf(netcdf_spi)

Plot a time step to validate that the SPI values look reasonable:

In [None]:
ds_spi["spi_gamma_03"].isel(time=500).plot()