In [1]:
import fsspec

fs = fsspec.filesystem('gs')
fs.ls('gs://gcp-public-data-arco-era5/co/')

['gcp-public-data-arco-era5/co/model-level-moisture.zarr',
 'gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2',
 'gcp-public-data-arco-era5/co/model-level-wind.zarr',
 'gcp-public-data-arco-era5/co/model-level-wind.zarr-v2',
 'gcp-public-data-arco-era5/co/single-level-forecast.zarr',
 'gcp-public-data-arco-era5/co/single-level-forecast.zarr-v2',
 'gcp-public-data-arco-era5/co/single-level-reanalysis.zarr',
 'gcp-public-data-arco-era5/co/single-level-reanalysis.zarr-v2',
 'gcp-public-data-arco-era5/co/single-level-surface.zarr',
 'gcp-public-data-arco-era5/co/single-level-surface.zarr-v2']

In [2]:
import xarray

ds = xarray.open_zarr(
    'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3',
    chunks={'time': 48},
    storage_options=dict(token='anon'),
)["2m_temperature"].sel(time=slice('1990-01-01', '2024-11-01'))

ds_for = xarray.open_zarr(
    'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3',
    chunks={'time': 48},
    storage_options=dict(token='anon'),
)["maximum_2m_temperature_since_previous_post_processing"].sel(time=slice('1990-01-01', '2024-11-01'))

print(ds_for)

<xarray.DataArray 'maximum_2m_temperature_since_previous_post_processing' (
                                                                           time: 305376,
                                                                           latitude: 721,
                                                                           longitude: 1440)>
dask.array<getitem, shape=(305376, 721, 1440), dtype=float32, chunksize=(48, 721, 1440), chunktype=numpy.ndarray>
Coordinates:
  * latitude   (latitude) float32 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0
  * longitude  (longitude) float32 0.0 0.25 0.5 0.75 ... 359.0 359.2 359.5 359.8
  * time       (time) datetime64[ns] 1990-01-01 ... 2024-11-01T23:00:00
Attributes:
    long_name:   Maximum temperature at 2 metres since previous post-processing
    short_name:  mx2t
    units:       K


In [3]:
def lon_to_360(dlon: float) -> float:
  return ((360 + (dlon % 360)) % 360)

champaign_coords = {"min_lat": 40.07651334790849, "max_lat": 40.3265133, "min_long": lon_to_360(-88.38088352464067), "max_long": lon_to_360(-88.0608835)}

champaign_ds = ds.where(
    (ds.longitude > champaign_coords["min_long"]) & (ds.latitude > champaign_coords["min_lat"]) &
    (ds.longitude < champaign_coords["max_long"]) & (ds.latitude < champaign_coords["max_lat"]),
    drop=True
)

champaign_spatially_expanded_ds = ds.where (
    (ds.longitude > champaign_coords["min_long"] - 0.25) & (ds.latitude > champaign_coords["min_lat"] - 0.25) &
    (ds.longitude < champaign_coords["max_long"] + 0.25) & (ds.latitude < champaign_coords["max_lat"] + 0.25),
    drop=True
)

champaign_ds_for = ds_for.where(
    (ds_for.longitude > champaign_coords["min_long"]) & (ds_for.latitude > champaign_coords["min_lat"]) &
    (ds_for.longitude < champaign_coords["max_long"]) & (ds_for.latitude < champaign_coords["max_lat"]),
    drop=True
)

champaign_spatially_expanded_ds_for = ds_for.where (
    (ds_for.longitude > champaign_coords["min_long"] - 0.25) & (ds_for.latitude > champaign_coords["min_lat"] - 0.25) &
    (ds_for.longitude < champaign_coords["max_long"] + 0.25) & (ds_for.latitude < champaign_coords["max_lat"] + 0.25),
    drop=True
)

champaign_ds_for

Unnamed: 0,Array,Chunk
Bytes,1.16 MiB,192 B
Shape,"(305376, 1, 1)","(48, 1, 1)"
Count,53026 Tasks,6362 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.16 MiB 192 B Shape (305376, 1, 1) (48, 1, 1) Count 53026 Tasks 6362 Chunks Type float32 numpy.ndarray",1  1  305376,

Unnamed: 0,Array,Chunk
Bytes,1.16 MiB,192 B
Shape,"(305376, 1, 1)","(48, 1, 1)"
Count,53026 Tasks,6362 Chunks
Type,float32,numpy.ndarray


In [4]:
import numpy as np
import pandas as pd

def grid_point_analysis(ds, lat, long, date):
    today_temp = ds.sel(time=date, latitude = lat, longitude = long).values
    
    historical_data = ds.sel(time=ds.time.dt.dayofyear.isin(range(date.dayofyear - 10, date.dayofyear + 11)))
    hist_values = historical_data.values.flatten()

    if today_temp < np.percentile(hist_values, 5):
        return 'Extremely Low'
    elif today_temp > np.percentile(hist_values, 95):
        return 'Extremely High'
    else:
        return 'Normal'

In [None]:
import dask
import xarray as xr

# Ensure datasets are backed by Dask arrays
# Adjust chunk sizes based on available memory and data structure
champaign_ds_c = champaign_ds.chunk({'time': 100, 'latitude': 10, 'longitude': 10})
champaign_ds_for_c = champaign_ds_for.chunk({'time': 100, 'latitude': 10, 'longitude': 10})

# Compute the difference lazily
difference = champaign_ds_c - champaign_ds_for_c

# Apply masks lazily (still retains metadata)
positive_diff = difference.where(difference > 0)
negative_diff = difference.where(difference < 0)

# Trigger parallel computation
positive_count = positive_diff.count().compute()  # This will utilize parallel processing
negative_count = negative_diff.count().compute()

print(f"Positive Differences: {positive_count.values}")
print(f"Negative Differences: {negative_count.values}")


In [None]:
import pandas as pd
import numpy as np
from multiprocessing import Pool

# Define a function to compute the mask for a chunk of data
def compute_extreme_negative_mask(chunk):
    chunk['extreme_negative_mask'] = (chunk['2m_temperature'] - chunk['maximum_2m_temperature_since_previous_post_processing']) < extreme_diff_threshold
    return chunk[chunk['extreme_negative_mask']]

# Split the data into chunks
num_chunks = 4  # Number of parallel processes
chunks = np.array_split(champaign_df, num_chunks)

# Create a multiprocessing pool
with Pool(num_chunks) as pool:
    results = pool.map(compute_extreme_negative_mask, chunks)

# Combine the results
extreme_negative_actuals_df = pd.concat(results)
