# Extracting subsets of ERA5 data to a smaller region to save on memory

In [1]:
%matplotlib inline
import sys

import numpy as np
import netCDF4 as nc
import numpy.matlib
import datetime
import xarray as xr
from scipy import interpolate
from numpy import ma
from scipy import stats
import scipy.io as sio
import pickle as pickle
from sklearn import linear_model
import numpy.ma as ma
import matplotlib.patches as mpatches
from shapely.geometry.polygon import LinearRing

import scipy as sp
import pandas as pd

import time

from copy import copy 

# Plotting
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib import ticker

from matplotlib.ticker import FormatStrFormatter
from mpl_toolkits.axes_grid1.axes_divider import HBoxDivider
import mpl_toolkits.axes_grid1.axes_size as Size
from mpl_toolkits.axes_grid1 import make_axes_locatable

# OS interaction
import os
import sys
import cftime

import cartopy.crs as ccrs
from cartopy.util import add_cyclic_point

from IPython.display import display
from IPython.display import HTML
import IPython.core.display as di # Example: di.display_html('<h3>%s:</h3>' % str, raw=True)

import matplotlib.colors as mcolors

import glob
import dask
import dask.bag as db

from scipy import interpolate

import statsmodels.stats.multitest as multitest

from matplotlib.gridspec import GridSpec
from cartopy.crs import EqualEarth, PlateCarree

In [2]:
era5_path = '/glade/campaign/collections/rda/data/ds633.0/'
out_path = '/glade/u/home/zcleveland/ERA5_analysis/ERA5_dsw/'

In [3]:
# Add other variables and their corresponding subdirectories here
var_directories = {
    'lsp': 'accumu',
}
# Input parameters for desert southwest
lat_range = slice(40, 20)
lon_range = slice(240, 260)

In [4]:
# Function to extract and process data for a single variable
def process_variable(variable, start_date, end_date):
    if (end_date-start_date)>11:
        print(f'Time range greater than 1 year. Skipping...\n ')
        print(f'start_date: {start_date}\n end_date: {end_date}')
        return
        
    start_year, start_month = f'{start_date}'[:4], f'{start_date}'[4:]
    end_year, end_month = f'{end_date}'[:4], f'{end_date}'[4:]
    # Define output filename
    out_filename = f'{variable}_{start_date}_{end_date}_dsw.nc'
    out_file_path = os.path.join(out_path, out_filename)
    
    # Check if the output file already exists
    if os.path.exists(out_file_path):
        print(f'File {out_filename} already exists. Skipping...\n')
        return

    start_time = time.time()
    print(f'Processing variable: {variable}\n')
    # Get the subdirectory for the variable
    subdirectory = var_directories.get(variable)
    if subdirectory is None:
        print(f'No subdirectory: {variable}. Skipping...\n')
        return
    print(subdirectory)
    
    # Find files for the variable in the specified directory
    files = []
    for year in range(int(start_year), int(end_year)+1):
        for month in range(1,13):
            try:
                if ((f'{year}{month}' < f'{start_date}') or (f'{year}{month}' > f'{end_date}')):
                    pass
                elif month<10:
                    files += glob.glob(f'{era5_path}/*{subdirectory}*/{year}0{month}/*_{variable}.*.nc', recursive=True)
                else:
                    files += glob.glob(f'{era5_path}/*{subdirectory}*/{year}{month}/*_{variable}.*.nc', recursive=True)
            except Exception as e:
                print(f'Error in {era5_path}/*{subdirectory}*/{year}0{month}/*_{variable}.*.nc: {e}\n')
    files.sort()
    
    # Calculate total number of directories for sanity check
    total_directories = len(files)
    print(f'{total_directories} number of files\n')
    
    # Create a list to hold data for each month
    with dask.config.set(**{'array.slicing.split_large_chunks': True}):
        data_list = []
        temp_time = time.time()
        print('opening datasets')
        data_list.append(xr.open_mfdataset(files))
        elapsed_time = time.time() - temp_time
        print(f'\rElapsed Time: {elapsed_time:.2f} seconds\n')
    
        if not data_list:
            print(f'No files found for variable: {variable}\n')
            return
        
        # Concatenate data for all months into a single xarray and subset
        temp_time = time.time()
        print('combining data')
        combined_data = xr.merge(data_list)
        ds_sub = combined_data.sel(latitude=lat_range, longitude=lon_range, drop=True)
        elapsed_time = time.time() - temp_time
        print(f'\rElapsed Time: {elapsed_time:.2f} seconds\n')
        
        # Save the combined data to a NetCDF file
        temp_time = time.time()
        print('writing data to NetCDF')
        # ds_sub.to_netcdf(out_file_path)
        elapsed_time = time.time() - temp_time
        print(f'\rElapsed Time: {elapsed_time:.2f} seconds\n')
    
    # Calculate total time taken
    total_time = time.time() - start_time
    print(f'Done\nProcessed {variable} in {total_time:.2f} seconds\n')
    print(f'data is {sys.getsizeof(combined_data)} bytes')
    return combined_data, ds_sub



In [None]:
# Loop through variables in var_directories and process each one
for var in var_directories:
    var_data, sub_data = process_variable(var, 198001, 201912)

In [42]:
var_data

Unnamed: 0,Array,Chunk
Bytes,1.30 TiB,1.49 GiB
Shape,"(28736, 12, 721, 1440)","(32, 12, 721, 1440)"
Count,2832 Tasks,944 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.30 TiB 1.49 GiB Shape (28736, 12, 721, 1440) (32, 12, 721, 1440) Count 2832 Tasks 944 Chunks Type float32 numpy.ndarray",28736  1  1440  721  12,

Unnamed: 0,Array,Chunk
Bytes,1.30 TiB,1.49 GiB
Shape,"(28736, 12, 721, 1440)","(32, 12, 721, 1440)"
Count,2832 Tasks,944 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,112.25 kiB,128 B
Shape,"(28736,)","(32,)"
Count,2832 Tasks,944 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 112.25 kiB 128 B Shape (28736,) (32,) Count 2832 Tasks 944 Chunks Type int32 numpy.ndarray",28736  1,

Unnamed: 0,Array,Chunk
Bytes,112.25 kiB,128 B
Shape,"(28736,)","(32,)"
Count,2832 Tasks,944 Chunks
Type,int32,numpy.ndarray


In [43]:
sub_data

Unnamed: 0,Array,Chunk
Bytes,8.43 GiB,9.61 MiB
Shape,"(28736, 12, 81, 81)","(32, 12, 81, 81)"
Count,3776 Tasks,944 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 8.43 GiB 9.61 MiB Shape (28736, 12, 81, 81) (32, 12, 81, 81) Count 3776 Tasks 944 Chunks Type float32 numpy.ndarray",28736  1  81  81  12,

Unnamed: 0,Array,Chunk
Bytes,8.43 GiB,9.61 MiB
Shape,"(28736, 12, 81, 81)","(32, 12, 81, 81)"
Count,3776 Tasks,944 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,112.25 kiB,128 B
Shape,"(28736,)","(32,)"
Count,2832 Tasks,944 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 112.25 kiB 128 B Shape (28736,) (32,) Count 2832 Tasks 944 Chunks Type int32 numpy.ndarray",28736  1,

Unnamed: 0,Array,Chunk
Bytes,112.25 kiB,128 B
Shape,"(28736,)","(32,)"
Count,2832 Tasks,944 Chunks
Type,int32,numpy.ndarray


In [5]:
# check file
filecheck = xr.open_dataset('/glade/u/home/zcleveland/ERA5_analysis/ERA5_dsw/lsp_198901_199001_dsw.nc')
filecheck