In [1]:
import matplotlib.pyplot as plt
import numpy as np
from numpy import ma
import xarray as xr
import geopandas as gpd
import pandas as pd
import xesmf as xe 
import dask

# requires cartopy to be installed
import cartopy.feature as cfeature
import cartopy.io.shapereader as shpreader

import cartopy.crs as ccrs # for projection
import cartopy.feature as cfeature # for map features
from cartopy.util import add_cyclic_point
from matplotlib.axes import Axes
from cartopy.mpl.geoaxes import GeoAxes
#from matplotlib.colors import TwoSlopeNorm
from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER
from collections import Counter
import sys
import os
import time

In [2]:
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(cores=1,
                       processes=1,
                       memory="100GB",
                       walltime="02:00:00",
                       scheduler_options={'host': '172.22.179.3:7698'}) # Change the last 4 numbers here to something else between 7000-8000

cluster.scale(7)

In [3]:
from dask.distributed import Client

client = Client(cluster)

client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 7
Total threads: 7,Total memory: 651.91 GiB

0,1
Comm: tcp://172.22.179.3:7698,Workers: 7
Dashboard: /proxy/8787/status,Total threads: 7
Started: Just now,Total memory: 651.91 GiB

0,1
Comm: tcp://172.22.178.77:32889,Total threads: 1
Dashboard: /proxy/34046/status,Memory: 93.13 GiB
Nanny: tcp://172.22.178.77:41850,
Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-vbczt0an,Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-vbczt0an

0,1
Comm: tcp://172.22.179.35:44756,Total threads: 1
Dashboard: /proxy/36287/status,Memory: 93.13 GiB
Nanny: tcp://172.22.179.35:32918,
Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-073hlv2d,Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-073hlv2d

0,1
Comm: tcp://172.22.179.38:34921,Total threads: 1
Dashboard: /proxy/38299/status,Memory: 93.13 GiB
Nanny: tcp://172.22.179.38:38330,
Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-u6ti5m_b,Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-u6ti5m_b

0,1
Comm: tcp://172.22.179.37:44901,Total threads: 1
Dashboard: /proxy/33336/status,Memory: 93.13 GiB
Nanny: tcp://172.22.179.37:39484,
Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-d4aqih47,Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-d4aqih47

0,1
Comm: tcp://172.22.179.31:46229,Total threads: 1
Dashboard: /proxy/38295/status,Memory: 93.13 GiB
Nanny: tcp://172.22.179.31:35200,
Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-fpa6oxin,Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-fpa6oxin

0,1
Comm: tcp://172.22.179.39:44367,Total threads: 1
Dashboard: /proxy/41303/status,Memory: 93.13 GiB
Nanny: tcp://172.22.179.39:46371,
Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-zhymiieq,Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-zhymiieq

0,1
Comm: tcp://172.22.178.83:44227,Total threads: 1
Dashboard: /proxy/40096/status,Memory: 93.13 GiB
Nanny: tcp://172.22.178.83:45255,
Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-yrls31k4,Local directory: /data/keeling/a/ctavila2/tmp/dask-scratch-space/worker-yrls31k4


In [4]:
models = ["BNU-ESM", "CNRM-CM5", "CSIRO-Mk3-6-0", "CanESM2", "GFDL-ESM2G", "GFDL-ESM2M", "HadGEM2-CC365", "HadGEM2-ES365", "IPSL-CM5A-LR",
         "IPSL-CM5A-MR", "IPSL-CM5B-LR", "MIROC-ESM-CHEM", "MIROC-ESM", "MIROC5", "bcc-csm1-1-m", "MRI-CGCM3", "bcc-csm1-1", "inmcm4"]

filein = "/data/keeling/a/davidcl2/d/nex-gddp-cmip6/FWI_NorthAmerica/INM-CM4-8/historical/FWI_NorthAmerica_day_INM-CM4-8_historical_r1i1p1f1_gr1_2013.nc"
GMFD = xr.open_dataset(filein)
# random gmfd file for regridding still ned MRI-CGCM3 and bcc-csm-1

In [5]:
#trendline_array = []
#deviation_array = []
start_time = time.time()

def coarsened_all(model, GMFD, start_day, end_day):
    
    print(time.time() - start_time)           #####################################3#######################################################
    base_path = "/data/keeling/a/davidcl2/d/MACA/FWI_RHavg/historical/macav2metdata_fwi_" + model + "_r1i1p1_historical_"
    
    # Initialize an empty list to store the datasets
    datasets = []
    
    # Loop through the years from 1950 to 1980 (adjust as needed)
    for year in range(1975, 2004, 5):
        # Construct the file path for the current 5-year range
        file_path = f"{base_path}{year}_{year + 4}_CONUS_daily.nc"
        
        # Open the dataset and append it to the list
        ds = xr.open_dataset(file_path)
        datasets.append(ds)
    filein2 = "/data/keeling/a/davidcl2/d/MACA/FWI_RHavg/historical/macav2metdata_fwi_" + model + "_r1i1p1_historical_2005_2005_CONUS_daily.nc"
    filein3 = "/data/keeling/a/davidcl2/d/MACA/FWI_RHavg/rcp85/macav2metdata_fwi_" + model + "_r1i1p1_rcp85_2006_2010_CONUS_daily.nc"
    ds2 = xr.open_dataset(filein2)
    ds3 = xr.open_dataset(filein3)
    datasets.append(ds2)
    datasets.append(ds3)
    concatenated_ds = xr.concat(datasets, dim='time')
    print(time.time() - start_time)                       ######################################################################
    ds_out = xr.Dataset(
    {
        "lat": (["lat"], GMFD.lat.data, {"units": "degrees_north"}),
        "lon": (["lon"], GMFD.lon.data, {"units": "degrees_east"}),
    }
    )
    ds_out
    # Create the Regridder object
    regridder = xe.Regridder(concatenated_ds, ds_out, "conservative")
    ds = regridder(concatenated_ds)
    print(time.time() - start_time)                       ###########################################################
    ds_1979_2010 = ds.sel(time=slice('1979-01-01', '2010-12-31'))

    # Use .groupby() to group the data by year
    grouped_ds = ds_1979_2010.groupby('time.year')
    
    # Use .where() to mask the days outside the desired range for each year
    selected_ds = grouped_ds.apply(lambda x: x.where((x['time.dayofyear'] >= start_day) & (x['time.dayofyear'] <= end_day)))
    
    # Drop any NaN values created by the mask
    selected_ds = selected_ds.dropna(dim='time', how='all')
    print(time.time() - start_time)                       #################################################################
    #annual_mean = selected_ds.groupby('time.year').mean(dim='time')
    selected_ds["lon"] = np.where(selected_ds["lon"] > 180, selected_ds["lon"] - 360, selected_ds["lon"])
    selected_ds = selected_ds.sortby("lon")
    lon_max = -114.016667
    lon_min = -124.766667
    lat_min = 32.025
    lat_max = 41.983333
    
    annual_mean_boundaries = selected_ds.where((selected_ds.lon >= lon_min) & (selected_ds.lon <= lon_max) & (selected_ds.lat >= lat_min) & (selected_ds.lat <= lat_max), drop=True)

    output_path = "/data/keeling/a/ctavila2/5_Model_Data_FWI/MACA gridmet CMIP5/Coarsened/testdask/macav2metdata_fwi_" + model + "_r1i1p1_historical_1979_2010_CONUS_daily_common_gridDASK.nc"

    # Save the dataset to a .nc file
    annual_mean_boundaries.to_netcdf(output_path)

    """
    spatial_aggregate = annual_mean_boundaries.mean(dim=['time'])
    yearly_mean = annual_mean_boundaries.groupby('time.year').mean(dim='time')

    poly_coeffs = yearly_mean.FWI.polyfit(dim="year", deg=1).rename({"polyfit_coefficients":"FWI"}) 
    poly_fits = xr.polyval(coord=yearly_mean["year"], coeffs=poly_coeffs)
    trendline = poly_coeffs.sel(degree=1).FWI 
    trendline_values = trendline * yearly_mean["year"] + poly_coeffs.sel(degree=0).FWI 
    # this gives me my mx+b value at each point, essentially giving me the predicted value from the linear regression
    residuals = yearly_mean.FWI - trendline_values
    standard_deviation_residuals = residuals.std(dim='year')
    fwi_benchmark = yearly_mean.std(dim='year')
    
    
    # Show the plot

    deviation_array.append(standard_deviation_residuals)
    trendline_array.append(trendline)
    """
    
    print("iteration " + model)
    





In [6]:
#coarsened_all(model="inmcm4", GMFD=GMFD, start_day=152, end_day=304)

In [7]:
#for model in models[15:]:
#    coarsened_all(model=model, GMFD=GMFD, start_day=152, end_day=304)

In [6]:
delayed = []
for model in models: # only do for 5 models
    out = dask.delayed(coarsened_all)(model=model, GMFD=GMFD, start_day=152, end_day=304)
    delayed.append(out)

In [7]:
delayed

[Delayed('coarsened_all-7e585a71-a9bb-46c5-9b61-c62f9dd46338'),
 Delayed('coarsened_all-a6a237ba-f824-47cd-b411-835ee751231f'),
 Delayed('coarsened_all-4ca1118f-a8d1-4316-a47a-c48cbbe6ba3e'),
 Delayed('coarsened_all-d4542794-4782-40c5-8afa-3141b35bed6e'),
 Delayed('coarsened_all-ed5595e5-83f0-4d52-9488-7b0818ab0727'),
 Delayed('coarsened_all-f0d75a39-4075-4b43-a0d3-b86ce594044e'),
 Delayed('coarsened_all-c9ba9088-bed0-423d-a7c1-8c2ceca91723'),
 Delayed('coarsened_all-42312b9f-6f8d-4cf2-be7b-1dc283dc3467'),
 Delayed('coarsened_all-4b1e6107-acf7-4068-9167-83423c1ff8ca'),
 Delayed('coarsened_all-b43b70d4-084c-4de7-af33-e9442a36b1a0'),
 Delayed('coarsened_all-bd2e6b2f-55f3-4328-ba01-1afffb36ca4d'),
 Delayed('coarsened_all-176e473b-1858-4404-955a-db8185553f02'),
 Delayed('coarsened_all-b08118f5-5c39-4887-b93a-705ac772cdcd'),
 Delayed('coarsened_all-b9d9c7d4-57e5-42f9-a122-e021cc2318fc'),
 Delayed('coarsened_all-1ef130c8-d5fe-4821-aa12-1a966bb3413b'),
 Delayed('coarsened_all-24868534-d66f-4e

In [8]:
results = dask.compute(*delayed)  # Specify distributed scheduler

2023-11-12 20:54:50,566 - distributed.scheduler - ERROR - Couldn't gather keys {'coarsened_all-c9ba9088-bed0-423d-a7c1-8c2ceca91723': [], 'coarsened_all-56311453-f555-405c-93f5-eda4e14839e0': [], 'coarsened_all-4ca1118f-a8d1-4316-a47a-c48cbbe6ba3e': [], 'coarsened_all-a6a237ba-f824-47cd-b411-835ee751231f': [], 'coarsened_all-f0d75a39-4075-4b43-a0d3-b86ce594044e': [], 'coarsened_all-1ef130c8-d5fe-4821-aa12-1a966bb3413b': [], 'coarsened_all-7e585a71-a9bb-46c5-9b61-c62f9dd46338': [], 'coarsened_all-d4542794-4782-40c5-8afa-3141b35bed6e': [], 'coarsened_all-24868534-d66f-4e55-bd15-9986ab068d52': [], 'coarsened_all-b9d9c7d4-57e5-42f9-a122-e021cc2318fc': [], 'coarsened_all-42312b9f-6f8d-4cf2-be7b-1dc283dc3467': [], 'coarsened_all-176e473b-1858-4404-955a-db8185553f02': []} state: ['processing', 'processing', 'processing', 'processing', 'processing', 'processing', 'processing', 'processing', 'processing', 'processing', 'processing', 'processing'] workers: []
NoneType: None
2023-11-12 20:54:50,5

KilledWorker: Attempted to run task coarsened_all-f0d75a39-4075-4b43-a0d3-b86ce594044e on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://172.22.178.67:35822. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

In [None]:
#df = pd.concat(results)

In [54]:
#annual_mean_boundaries

In [None]:
reader = shpreader.Reader('/data/keeling/a/ctavila2/2_maps/county_shapefile/countyl010g.shp')
    counties = list(reader.geometries())
    #counties
    COUNTIES = cfeature.ShapelyFeature(counties, ccrs.PlateCarree())
    
    shapefile_path = '/data/keeling/a/ctavila2/4_Model_Trendline/shape/ne_10m_ocean.shp'
    gdf_ocean = gpd.read_file(shapefile_path)
    
    scale = '110m'
    states110 = cfeature.NaturalEarthFeature(
                category='cultural',
                name='admin_1_states_provinces_lines',
                scale=scale,
                facecolor='none',
                edgecolor='r')
    
    cmap='pink'
    cmap='cividis'
    #cmap='viridis'
    cmap='coolwarm'
    # central_longitude=260.0
    ## (17,17)
    fig, ax = plt.subplots(figsize=(8, 8), subplot_kw={'projection': ccrs.PlateCarree()})
    spatial_aggregate.FWI.plot(ax=ax,cmap=cmap,vmin=0,vmax=65)
    
    # Set the aspect ratio to 'box' for horizontal stretching
    # Replace min_x and max_x with appropriate values# Plot the second shapefile (gdf_wgs84)
    # Replace 'white' and 'black' with the desired colors and styling
    
    #gdf_wgs84.boundary.plot(ax=ax, color='none', edgecolor='red', linewidth=1.0)
    
    # Set the extent and add other map features as needed
    ax.set_extent([-125, -113, 30, 45], crs=ccrs.PlateCarree())
    ax.add_feature(cfeature.NaturalEarthFeature('cultural', 'admin_1_states_provinces_lines', '110m', edgecolor='gray', facecolor='none'))
    ax.coastlines()
    ax.add_feature(cfeature.LAKES)
    ax.add_feature(COUNTIES, linewidth=0.8, alpha=0.5, facecolor='none', edgecolor='black')
    ax.add_feature(cfeature.BORDERS, linewidth=2, edgecolor='white')
    
    gl = ax.gridlines(draw_labels=True, color='black', alpha=0.5, linestyle='--')
    gl.xlabel_style = {'size': 10, 'color': 'black'}
    gl.ylabel_style = {'size': 10, 'color': 'black'}
    
    gdf_ocean.plot(ax=ax, color='white', edgecolor='white', linewidth=1.0)
    
    plt.title('CMIP5 GFDL-ESM2G Average FWI', fontsize=16)
    plt.savefig('/data/keeling/a/ctavila2/5_Model_Data_FWI/MACA gridmet CMIP5/CMIP5_GFDL-ESM2G Average FWI 1979 to 2010 mxtmp rhavg')
    