# Process gridded observation data into timeseries

## 2025/03/21

The criterion for considering regions unobserved (>10% missing data) is reasonable, but the impact of this threshold on the results should be discussed.

The data availability threshold influences our results by determining the “start year” in which observations are considered complete running into the future. This influences both the trend at any given year (since it may start earlier or later with a different availability threshold) and the envelope of internal variability (since a longer and earlier beginning trend has less internal variability). The estimate the impact of our threshold on the results, we have recalculated the start date with more (5%) and less (30%) stringent thresholds. The change in record start years is now included as a supplementary figure (Figure S??). Overall, we see that the influence of the availability threshold on the start year is small (<X years) in most regions.


__1. Process the gridded temperature data into timeseries for each observational product.__

Output is a dataArray for each model with dimensions of time and IPCC region containing a time series of the TAS variable.


Use this tool:  

https://github.com/IPCC-WG1/Atlas/blob/main/notebooks/reference-regions_Python.ipynb

For now, I will create my code for the CESM1 and MPI models so that it can be generalized easily. I can pull some code from my climatetrend_uncertainty repository (climatetrend_uncertainty/initial_code/PIC_timeseries_preproc.ipynb).

## Code!

In [1]:
import numpy as np
import xarray as xr
import pandas as pd
import matplotlib.pyplot as plt
import os
import glob
import time

import xagg as xa
import geopandas as gpd
import regionmask

from dask_jobqueue import PBSCluster
from dask.distributed import Client
import dask

regionmask.__version__

%matplotlib inline

__Observational Large Ensembles.__

General directory in Nathan's scratch: /glade/scratch/lenssen/data_for_jonah/

NASA GISTEMP: GISTEMP_2x2  GISTEMP_5x5

HadCRUT5: HadCRUT5

In [2]:
gistemp_2x2_dir = '/glade/derecho/scratch/lenssen/data4jonah/GISTEMP_Ensemble_Aug/' # I don't want to use this
gistemp_dir = "/glade/derecho/scratch/lenssen/data4jonah/GISTEMP_Ensemble_Aug_5x5/"
hadcrut5_dir = '/glade/derecho/scratch/lenssen/data4jonah/HadCRUT5_Ensemble_Aug/'
dcent_unfilled_dir = "/glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS/DCENT/rawdata/DCENT_ensemble/"

### Collect file paths.

#### Collect GISTEMP file paths.

In [3]:
gistemp_files = glob.glob('%s/ensembleChunk_5x5_????.nc' % gistemp_dir)
gistemp_files.sort()

#### Collect HadCRUT5 file paths.

In [4]:
hadcrut5_files = glob.glob('%s/HadCRUT.5.0.2.0.analysis.anomalies.*.nc' % hadcrut5_dir)
hadcrut5_files.sort()

#### Collect DCENT (unfilled) file paths.

In [5]:
dcent_unfilled_files = glob.glob('%s/DCENT_ensemble_1850_2023_member_???.nc' % dcent_unfilled_dir)
dcent_unfilled_files.sort()

#### BEST file path:

In [6]:
best_files = "/glade/u/home/jonahshaw/w/obs/BEST/Land_and_Ocean_LatLong1.nc"

### Load and process timeseries according to IPCC Region designations.

Mask data based on availability.

### 2. Do masking for each dataset

Variable is "tempAnom". "record" coordinate will allow for easier concatenation.

In [7]:
gistemp_tas_var = 'tas'
hadcrut5_tas_var = 'tas'
dcent_unfiled_tas_var = "temperature"
best_tas_var = "temperature"

### Loop over observation files and compute the regional means.

#### GISTEMP

In [8]:
def create_ipccregion_timeseries_xagg(
    ds_filepath:str,
    ds_var:str,
    model_str:str,
    cesm=False,
    read_wm=True,
    write_wm=True,
    new_times=None,
):
    
    '''
    Compute timeseries for all IPCC AR6 regions when given a simple model output file.
    Now using xagg to appropriately weight gridcells that fall partly within a region!
    '''
    # Load data
    ds = xr.open_dataset(ds_filepath)
    
    try:
        ds = ds.rename({"latitude":"lat", "longitude":"lon"})
    except:
        pass
    
    # Correct time if CESM
    if cesm:
        ds  = fix_cesm_time(ds)
    
    if new_times is not None:
        ds["time"] = new_times

    da = ds[ds_var]

    xagg_dir = "/glade/u/home/jonahshaw/w/trend_uncertainty/nathan/xagg_resources"
    xa.set_options(rgrd_alg='bilinear',nan_to_zero_regridding=False)

    if (read_wm and os.path.exists(os.path.join(xagg_dir, f'wm_{model_str}'))):
        # Load weightmap
        weightmap = xa.read_wm(os.path.join(xagg_dir, f'wm_{model_str}'))
    else:
        # Load IPCC region shp file:
        ipcc_wgi_regions_shp = "IPCC-WGI-reference-regions-v4.shp"
        gdf = gpd.read_file(os.path.join(xagg_dir, ipcc_wgi_regions_shp))
                
        # Compute weights for entire grid. Assuming lat, lon, time dimension on input
        area_weights = np.cos(np.deg2rad(da.lat)).broadcast_like(da.isel(time=0).squeeze())
        
        weightmap = xa.pixel_overlaps(da, gdf, weights=area_weights)
        # Save the weightmap for later:
        if write_wm:
            weightmap.to_file(os.path.join(xagg_dir, f'wm_{model_str}'))

    # Aggregate
    with xa.set_options(silent=True):
        aggregated = xa.aggregate(da, weightmap)
    # aggregated = xa.aggregate(da, weightmap)
    
    # Convert to an xarray dataset
    aggregated_ds = aggregated.to_dataset()
    # Change xarray formatting to match previous file organization.
    aggregated_ds = aggregated_ds.set_coords(("Continent", "Type", "Name", "Acronym")).rename({"poly_idx": "RegionIndex", "Name": "RegionName", "Acronym": "RegionAbbrev"})
        
    return aggregated_ds

In [9]:
def mask_IPCC_byavailability(
    filepath: str,
    var: str,
    regions: regionmask.Regions,
    masking_threshold: float=0.9,
    ufunc=None,
    new_times=None,
    verbose=False,
):
    """
    Function for masking observational record according to data availability.
    True on the output means that region exceeds the threshold.

    Args:
        filepath (str): Path to file to load
        var (str): String identifier of variable to mask by availabiliy
        masking_threshold (float, optional): Fraction of region needed to avoid masking. Defaults to 0.9.
        ufunc (function, optional): Function to apply to the loaded DataArray. Defaults to None.

    Returns:
        region_masks (xr.DataArray): Boolean mask for each IPCC region
    """

    ds = xr.open_dataset(filepath)

    # Added for BEST, hope it works for others.
    try:
        ds = ds.rename({"latitude":"lat", "longitude":"lon"})
    except:
        pass

    da = ds[var]
    if ufunc is not None:
        da = ufunc(da)

    if new_times is not None:
        da["time"] = new_times

    mask    = regions.mask(da.isel(time=0))

    # Get unique region indices
    reg     = np.unique(mask.values)
    reg     = reg[~np.isnan(reg)]

    # for metadata: find abbreviations of all regions that were selected
    abbrevs = regions[reg].abbrevs
    names   = regions[reg].names

    # Compute weights for entire grid
    weights_spatial = np.cos(np.deg2rad(da.lat)).broadcast_like(da.isel(time=0)) # assuming 'lat' used consistently
    weights_spatiotemporal = np.cos(np.deg2rad(da.lat)).broadcast_like(da) # assuming 'lat' used consistently

    # Iterate over regions and compute a weighted time series
    region_masks_list = []
    for i,_abbrev,_name in zip(reg, abbrevs, names):
        if verbose:
            print(f"Region {i} of {len(reg)}")
        _region_weight = weights_spatial.where(mask == i).sum(dim=["lat", "lon"]) # Total weight of the region
        _available_region_weights = weights_spatiotemporal.where((mask == i) & (~np.isnan(da))).sum(dim=["lat", "lon"]) # Weight of the unmasked portion of the region
        
        _region_mask = (_available_region_weights / _region_weight) > masking_threshold
        _region_mask = _region_mask.assign_coords(RegionIndex=i).expand_dims("RegionIndex")
        _region_mask = _region_mask.assign_coords(RegionName=_name)
        _region_mask = _region_mask.assign_coords(RegionAbbrev=_abbrev)
        
        region_masks_list.append(_region_mask)
    region_masks = xr.concat(region_masks_list, dim="RegionIndex")
    region_masks.name = "mask"
    
    return region_masks


def gistemp_5x5_preproc(ds):
    
    try:
        ds = ds.rename({"latitude":"lat", "longitude":"lon"})
    except:
        pass
    
    return ds
    # return ds.rename({"latitude":"lat", "longitude":"lon"})

In [10]:
def aggregateandmask_wrapper(
    ds_filepath:str,
    save_filepath:str,
    ds_var:str,
    model_str:str,
    realization:int,
    regions: regionmask.Regions=regionmask.defined_regions.ar6.all,
    masking_threshold: float=0.9,
    ufunc=None,
    new_times=None,
    verbose=False,
):
    aggregated_ds = create_ipccregion_timeseries_xagg(
        ds_filepath=ds_filepath,
        ds_var=ds_var,
        model_str=model_str,
        new_times=new_times,
    )

    region_masks = mask_IPCC_byavailability(
        filepath=ds_filepath,
        var=ds_var,
        regions=regions,
        masking_threshold=masking_threshold,
        ufunc=ufunc,
        new_times=new_times,
        verbose=verbose,
    )

    ipcc_regions_maskedavail = aggregated_ds.where(region_masks)
    
    # Add "realization" coordinate so concatenation is easier.
    ipcc_regions_maskedavail = ipcc_regions_maskedavail.assign_coords(realization=realization).expand_dims("realization")
    ipcc_regions_maskedavail.to_netcdf(path=save_filepath)


In [11]:
save_dir = '/glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS/'

#### GISTEMP

Use 5% (95% availability) threshold.

In [12]:
%%time

model_subdir = 'GISTEMP_5x5/20240820/xagg_correctedtime/threshold_0.95'

if not os.path.exists(os.path.join(save_dir,model_subdir)):
    os.makedirs(os.path.join(save_dir,model_subdir))

# New time dimension to apply to correct drift from skipping leap years. JKS test.
new_times = pd.date_range('1880-01-16', '2020-12-31', freq='1ME') + pd.tseries.offsets.Day(-15)

# Variable to select and operate over.
_ds_var = gistemp_tas_var

tasks = []


for i,_ds_filepath in enumerate(gistemp_files):

    filename = _ds_filepath.split('/')[-1]
    _outfilepath = '%s/%s/%s' % (save_dir,model_subdir,filename)
    
    if os.path.exists(_outfilepath):
        print('Skipping %s' % _outfilepath)
        continue

    tasks.append(dask.delayed(aggregateandmask_wrapper)(
        ds_filepath=_ds_filepath,
        save_filepath=_outfilepath,
        ds_var=_ds_var,
        model_str="GISTEMP_5x5",
        realization=i+1,
        regions=regionmask.defined_regions.ar6.all,
        masking_threshold=0.95,
        ufunc=gistemp_5x5_preproc,
        new_times=new_times,
    ))

    # if i == 2: break
# dask.compute(*tasks)
    
    

Skipping /glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//GISTEMP_5x5/20240820/xagg_correctedtime/threshold_0.95/ensembleChunk_5x5_0001.nc
Skipping /glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//GISTEMP_5x5/20240820/xagg_correctedtime/threshold_0.95/ensembleChunk_5x5_0002.nc
Skipping /glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//GISTEMP_5x5/20240820/xagg_correctedtime/threshold_0.95/ensembleChunk_5x5_0003.nc
Skipping /glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//GISTEMP_5x5/20240820/xagg_correctedtime/threshold_0.95/ensembleChunk_5x5_0004.nc
Skipping /glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//GISTEMP_5x5/20240820/xagg_correctedtime/threshold_0.95/ensembleChunk_5x5_0005.nc
Skipping /glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//GISTEMP_5x5/20240820/xagg_correctedtime/threshold_0.95/ensembleChunk_5x5_0006.nc
Skipping /glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//GISTEMP_5x5/20240820/xag

In [18]:
# Launch a Dask cluster using PBSCluster
cluster = PBSCluster(cores    = 1,
                    memory   = '8GB',
                    queue    = 'casper',
                    walltime = '00:15:00',
                    project  = 'UCUC0007',
                    )
cluster.scale(jobs=16)
client = Client(cluster)

dask.compute(*tasks)

client.shutdown()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 39817 instead


Use 30% (70% availability) threshold.

In [14]:
%%time

model_subdir = 'GISTEMP_5x5/20240820/xagg_correctedtime/threshold_0.70'

if not os.path.exists(os.path.join(save_dir,model_subdir)):
    os.makedirs(os.path.join(save_dir,model_subdir))

# New time dimension to apply to correct drift from skipping leap years. JKS test.
new_times = pd.date_range('1880-01-16', '2020-12-31', freq='1ME') + pd.tseries.offsets.Day(-15)

# Variable to select and operate over.
_ds_var = gistemp_tas_var

tasks = []



for i,_ds_filepath in enumerate(gistemp_files):

    filename = _ds_filepath.split('/')[-1]
    _outfilepath = '%s/%s/%s' % (save_dir,model_subdir,filename)
    
    if os.path.exists(_outfilepath):
        print('Skipping %s' % _outfilepath)
        continue

    tasks.append(dask.delayed(aggregateandmask_wrapper)(
        ds_filepath=_ds_filepath,
        save_filepath=_outfilepath,
        ds_var=_ds_var,
        model_str="GISTEMP_5x5",
        realization=i+1,
        regions=regionmask.defined_regions.ar6.all,
        masking_threshold=0.70,
        ufunc=gistemp_5x5_preproc,
        new_times=new_times,
    ))

    # if i == 2: break
# dask.compute(*tasks)
    
    

CPU times: user 19.7 ms, sys: 15.8 ms, total: 35.5 ms
Wall time: 55.1 ms


In [18]:
# Launch a Dask cluster using PBSCluster
cluster = PBSCluster(cores    = 1,
                    memory   = '8GB',
                    queue    = 'casper',
                    walltime = '00:15:00',
                    project  = 'UCUC0007',
                    )
cluster.scale(jobs=16)
client = Client(cluster)

dask.compute(*tasks)

client.shutdown()



#### HadCRUT5

In [19]:
# Arbitrary function to apply

def hadcrut5_preproc(da:xr.DataArray):
    
    try:
        da = da.rename({"latitude":"lat", "longitude":"lon"})
    except:
        pass
    
    return da

Apply the 95% (5%) threshold

In [20]:
%%time

model_subdir = 'HadCRUT5/20240820/xagg/threshold_0.95/'

if not os.path.exists(os.path.join(save_dir,model_subdir)):
    os.makedirs(os.path.join(save_dir,model_subdir))

# Variable to select and operate over.
_ds_var = hadcrut5_tas_var

tasks = []

for i,_ds_filepath in enumerate(hadcrut5_files):

    filename = _ds_filepath.split('/')[-1]
    _realization = filename.split(".")[-2]
    _outfilepath = '%s/%s/%s' % (save_dir,model_subdir,filename)
    
    if os.path.exists(_outfilepath):
        print('Skipping %s' % _outfilepath)
        continue

    tasks.append(dask.delayed(aggregateandmask_wrapper)(
        ds_filepath=_ds_filepath,
        save_filepath=_outfilepath,
        ds_var=_ds_var,
        model_str="HadCRUT5",
        realization=_realization,
        regions=regionmask.defined_regions.ar6.all,
        masking_threshold=0.95,
        ufunc=hadcrut5_preproc,
    ))

CPU times: user 15.7 ms, sys: 10.9 ms, total: 26.7 ms
Wall time: 29.6 ms


In [22]:
_outfilepath

'/glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//HadCRUT5/20240820/xagg/threshold_0.95//HadCRUT.5.0.2.0.analysis.anomalies.99.nc'

In [23]:
# Launch a Dask cluster using PBSCluster
cluster = PBSCluster(cores    = 1,
                    memory   = '8GB',
                    queue    = 'casper',
                    walltime = '00:15:00',
                    project  = 'UCUC0007',
                    )
cluster.scale(jobs=16)
client = Client(cluster)

dask.compute(*tasks)

client.shutdown()



Apply the 70% (30%) threshold

In [24]:
%%time

model_subdir = 'HadCRUT5/20240820/xagg/threshold_0.70/'

if not os.path.exists(os.path.join(save_dir,model_subdir)):
    os.makedirs(os.path.join(save_dir,model_subdir))

# Variable to select and operate over.
_ds_var = hadcrut5_tas_var

tasks = []

for i,_ds_filepath in enumerate(hadcrut5_files):

    filename = _ds_filepath.split('/')[-1]
    _realization = filename.split(".")[-2]
    _outfilepath = '%s/%s/%s' % (save_dir,model_subdir,filename)
    
    if os.path.exists(_outfilepath):
        print('Skipping %s' % _outfilepath)
        continue

    tasks.append(dask.delayed(aggregateandmask_wrapper)(
        ds_filepath=_ds_filepath,
        save_filepath=_outfilepath,
        ds_var=_ds_var,
        model_str="HadCRUT5",
        realization=_realization,
        regions=regionmask.defined_regions.ar6.all,
        masking_threshold=0.70,
        ufunc=hadcrut5_preproc,
    ))

CPU times: user 16.6 ms, sys: 16.7 ms, total: 33.4 ms
Wall time: 37.3 ms


In [27]:
# Launch a Dask cluster using PBSCluster
cluster = PBSCluster(cores    = 1,
                    memory   = '8GB',
                    queue    = 'casper',
                    walltime = '00:15:00',
                    project  = 'UCUC0007',
                    )
cluster.scale(jobs=16)
client = Client(cluster)

dask.compute(*tasks)

client.shutdown()



#### BEST

Why this take so long?
/glade/u/home/jonahshaw/w/trend_uncertainty/nathan/OBS_LENS//BEST/20250320/xagg/Land_and_Ocean_LatLong1.nc

Apply the 5% (95%) threshold.

_Having issues here?_  
_available_region_weights = weights_spatiotemporal.where((mask == i) & (~np.isnan(da))).sum(dim=["lat", "lon"]) # Weight of the unmasked portion of the region

In [63]:
%%time

model_subdir = 'BEST/20250320/xagg/threshold_0.95/'

if not os.path.exists(os.path.join(save_dir,model_subdir)):
    os.makedirs(os.path.join(save_dir,model_subdir))

best_newtimes = pd.date_range('1850-01-16', '2024-12-31', freq='1ME') + pd.tseries.offsets.Day(-15)

# Variable to select and operate over.
_ds_var = best_tas_var
_ds_filepath = best_files

filename = _ds_filepath.split('/')[-1]
_realization = 1
_outfilepath = '%s/%s/%s' % (save_dir,model_subdir,filename)
tasks = []

if os.path.exists(_outfilepath):
    print('Skipping %s' % _outfilepath)

else:
    tasks.append(dask.delayed(aggregateandmask_wrapper)(
        ds_filepath=_ds_filepath,
        save_filepath=_outfilepath,
        ds_var=_ds_var,
        model_str="BEST",
        realization=_realization,
        regions=regionmask.defined_regions.ar6.all,
        masking_threshold=0.95,
        new_times=best_newtimes
    ))

CPU times: user 24.5 ms, sys: 0 ns, total: 24.5 ms
Wall time: 59.1 ms


In [66]:
# Launch a Dask cluster using PBSCluster
cluster = PBSCluster(cores    = 1,
                    memory   = '32GB',
                    queue    = 'casper',
                    walltime = '00:15:00',
                    project  = 'UCUC0007',
                    )
cluster.scale(jobs=1)
client = Client(cluster)

dask.compute(*tasks)

client.shutdown()



BEST 0.70 threshold

In [71]:
%%time

model_subdir = 'BEST/20250320/xagg/threshold_0.70/'

if not os.path.exists(os.path.join(save_dir,model_subdir)):
    os.makedirs(os.path.join(save_dir,model_subdir))

best_newtimes = pd.date_range('1850-01-16', '2024-12-31', freq='1ME') + pd.tseries.offsets.Day(-15)

# Variable to select and operate over.
_ds_var = best_tas_var
_ds_filepath = best_files

filename = _ds_filepath.split('/')[-1]
_realization = 1
_outfilepath = '%s/%s/%s' % (save_dir,model_subdir,filename)

tasks = []

if os.path.exists(_outfilepath):
    print('Skipping %s' % _outfilepath)

else:
    tasks.append(dask.delayed(aggregateandmask_wrapper)(
        ds_filepath=_ds_filepath,
        save_filepath=_outfilepath,
        ds_var=_ds_var,
        model_str="BEST",
        realization=_realization,
        regions=regionmask.defined_regions.ar6.all,
        masking_threshold=0.70,
        new_times=best_newtimes
    ))

CPU times: user 19.4 ms, sys: 4.73 ms, total: 24.1 ms
Wall time: 173 ms


In [72]:
# Launch a Dask cluster using PBSCluster
cluster = PBSCluster(cores    = 1,
                    memory   = '32GB',
                    queue    = 'casper',
                    walltime = '00:15:00',
                    project  = 'UCUC0007',
                    )
cluster.scale(jobs=1)
client = Client(cluster)

dask.compute(*tasks)

client.shutdown()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 33827 instead


Clean-up dask workers

In [83]:
working_dir = "/glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/"
daskworker_list = glob.glob(f"{working_dir}/dask-worker.????????")

In [87]:
import subprocess

for file_path in daskworker_list:
    print(file_path)
    try:
        subprocess.run(['rm', '-f', file_path], check=True)
        print(f"Removed: {file_path}")
    except subprocess.CalledProcessError as e:
        print(f"Error removing {file_path}: {e}")
    # break
        

/glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4169023
Removed: /glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4169023
/glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4168980
Removed: /glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4168980
/glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4169031
Removed: /glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4169031
/glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4169019
Removed: /glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4169019
/glade/u/home/jonahshaw/Scripts/git_repos/internalvar-vs-obsunc/preprocess_obs3/dask-worker.o4169071
Removed: /glade/u/home/jonahshaw/Scripts/git_repos/inte