In [1]:
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
from math import nan
from pathlib import Path
import dask
import yaml
import os
import utils
import warnings
import gc
warnings.filterwarnings('ignore')

### Output path where the input4MIPS global integrals will sit

In [2]:
outpath="/glade/campaign/cgd/cas/islas/python_savs/CESM_forcings_check/input4MIPS/"

### Specify project number 

In [3]:
project='P04010022'

### Set up the dask cluster

In [4]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client
dask.config.set({"distributed.scheduler.worker_saturation":1.0})
dask.config.set({"optimization.fuse.active": False})
dask.config.set({
    "distributed.worker.memory.target": 0.6,
    "distributed.worker.memory.spill": 0.7,
    "distributed.worker.memory.pause": 0.8,
    "distributed.worker.memory.terminate": 0.95,
})

cluster = PBSCluster(
    cores = 1,
    memory = '30GB',
    processes = 1,
    queue = 'casper',
    local_directory = '/glade/derecho/scratch/islas/dask_tmp/',
    resource_spec = 'select=1:ncpus=1:mem=30GB',
    project=project,
    walltime='02:00:00',
    interface='mgt')

# scale up
#cluster.scale(24)
cluster.adapt(minimum=1, maximum=12)

# change your urls to the dask dashboard so that you can see it
dask.config.set({'distributed.dashboard.link':'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'})

# Setup your client
client = Client(cluster)

In [12]:
cluster

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/islas/proxy/37509/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.18.206.69:39785,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/islas/proxy/37509/status,Total threads: 0
Started: 1 minute ago,Total memory: 0 B


In [29]:
with open('emissions_master_list.yaml') as f:
    masterlist = yaml.safe_load(f)

In [30]:
basepath="/glade/campaign/cesm/cesmdata/input4MIPs_raw/input4MIPs/CMIP7/CMIP/" # input4MIPS location
for species, info in masterlist["species"].items():

    for sector in info["sectors"]:
        print('***'+species+'***')
        print(sector['name'])

        # Set up sector portion of output filename
        sector_out = sector['name'].replace(" + ","_")

        # Add sub_species to the filename where relevant
        if ('sub_species' in sector):
            sector_out = sector['sub_species']+'_'+sector_out
            
        
        # source data location
        source = sector['source']

        # Read in the netcdf files from the source data location
        if isinstance(source, str): # if you only have one source
            varname = sector['varname']
            sourcedir=basepath+source
            ncfiles = sorted([str(p) for p in Path(sourcedir).rglob("*.nc")])
            dat = xr.open_mfdataset(ncfiles, combine="by_coords", parallel=True, chunks={'time':120})
        elif isinstance(source, list): # if you have a list of source files
            alldat=[]
            varname = sector['varname'][0]
            for isource in np.arange(0,len(source),1):
                sourcedir=basepath+source[isource]
                thisvar = sector['varname'][isource]
                ncfiles = sorted([str(p) for p in Path(sourcedir).rglob("*.nc")])
                dat = xr.open_mfdataset(ncfiles, combine="by_coords", parallel=True, chunks={'time':120})
                alldat.append(dat[thisvar])
            alldat = xr.concat(alldat, dim='voc_type')
            if "scalefac_mole" not in sector:
                alldat = alldat.sum('voc_type')
            dat = xr.merge([alldat, dat.lon_bnds, dat.lat_bnds, dat.time_bnds, dat.sector_bnds])
        
        # Try summing over levels
        try:
            dat = dat.sum('level')
        except:
            pass
        

        # Sum up sectors if needed
        if "sectoritems" in sector:
            items = sector["sectoritems"]

            if items == "all":
                print('all')
                dat[varname] = dat[varname].sum('sector')
            elif isinstance(items, list):
                print('list')
                dat[varname] = dat[varname].sel(sector=items).sum('sector')

        # Scale the mass or the moles if needed
        if "scalefac_mass" in sector:
            dat[varname] = dat[varname]*sector['scalefac_mass']

        if "scalefac_mole" in sector:
            if isinstance(source, list): # if you have a list of source files
                print('????')
                mw = sector['mw']
                mwsource = sector['mwsource']
                alldat=[]
                for i in np.arange(0,dat.voc_type.size,1):
                    alldat.append(utils.scale_mole(dat[varname].isel(voc_type=i), mwsource[i], mw, sector['scalefac_mole']))
                alldat = xr.concat(alldat, dim='voc_type')
                alldat = alldat.sum('voc_type')
                dat = xr.merge([alldat, dat.lon_bnds, dat.lat_bnds, dat.time_bnds, dat.sector_bnds])
            else:
                mw = sector['mw']
                mwsource = sector['mwsource']
                dat[varname] = utils.scale_mole(dat[varname], mwsource, mw, sector['scalefac_mole'])        
        
        # rename longitude --> lon and latitude --> lat if it's an issue
        try:
            dat = dat.rename({'longitude':'lon', 'latitude':'lat'})
        except:
            pass

        # rename bounds if it's an issue
        try: 
            dat = dat.rename({'bnds':'bound'})
        except:
            pass
        
        # Calculate annual mean
        dat_am = utils.calcannualmean(dat[varname])

        # Calculate the global integral in Tg/yr
        dat_glob = utils.convert_kgm2s_to_Tg(dat_am, dat.lon_bnds.isel(time=0), dat.lat_bnds.isel(time=0))

        # Assign attributes
        dat_glob = dat_glob.rename('emiss')
        dat_glob = dat_glob.assign_attrs({'sector':sector['name'], 'units':'Tg', 'standard_name': species})

        # Output to file
        out = dat_glob.compute()
        out.to_netcdf(outpath+species+'_'+sector_out+'.nc')

        dat.close()
        del out
        del dat_glob
        del dat_am
        del dat
        gc.collect()
        
        

***so4_a1***
SO2smoothed


In [28]:
cluster.close()

In [22]:
mw = sector['mw']
mwsource = sector['mwsource']
alldat=[]
for i in np.arange(0,dat.voc_type.size,1):
    alldat.append(utils.scale_mole(dat.isel(voc_type=i), mwsource[i], mw, sector['scalefac_mole']))
alldat = xr.concat(alldat, dim='voc_type')
alldat = alldat.sum('voc_type')
#dat = xr.merge([alldat, dat.lon_bnds, dat.lat_bnds, dat.time_bnds, dat.sector_bnds])

In [24]:
dat

Unnamed: 0,Array,Chunk
Bytes,9.52 GiB,29.66 MiB
Shape,"(3, 3288, 360, 720)","(1, 120, 180, 360)"
Dask graph,336 chunks in 48 graph layers,336 chunks in 48 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 9.52 GiB 29.66 MiB Shape (3, 3288, 360, 720) (1, 120, 180, 360) Dask graph 336 chunks in 48 graph layers Data type float32 numpy.ndarray",3  1  720  360  3288,

Unnamed: 0,Array,Chunk
Bytes,9.52 GiB,29.66 MiB
Shape,"(3, 3288, 360, 720)","(1, 120, 180, 360)"
Dask graph,336 chunks in 48 graph layers,336 chunks in 48 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,36.12 MiB,6.59 MiB
Shape,"(3288, 720, 2)","(600, 720, 2)"
Dask graph,6 chunks in 19 graph layers,6 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 36.12 MiB 6.59 MiB Shape (3288, 720, 2) (600, 720, 2) Dask graph 6 chunks in 19 graph layers Data type float64 numpy.ndarray",2  720  3288,

Unnamed: 0,Array,Chunk
Bytes,36.12 MiB,6.59 MiB
Shape,"(3288, 720, 2)","(600, 720, 2)"
Dask graph,6 chunks in 19 graph layers,6 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,18.06 MiB,3.30 MiB
Shape,"(3288, 360, 2)","(600, 360, 2)"
Dask graph,6 chunks in 19 graph layers,6 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.06 MiB 3.30 MiB Shape (3288, 360, 2) (600, 360, 2) Dask graph 6 chunks in 19 graph layers Data type float64 numpy.ndarray",2  360  3288,

Unnamed: 0,Array,Chunk
Bytes,18.06 MiB,3.30 MiB
Shape,"(3288, 360, 2)","(600, 360, 2)"
Dask graph,6 chunks in 19 graph layers,6 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,51.38 kiB,1.88 kiB
Shape,"(3288, 2)","(120, 2)"
Dask graph,28 chunks in 13 graph layers,28 chunks in 13 graph layers
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 51.38 kiB 1.88 kiB Shape (3288, 2) (120, 2) Dask graph 28 chunks in 13 graph layers Data type object numpy.ndarray",2  3288,

Unnamed: 0,Array,Chunk
Bytes,51.38 kiB,1.88 kiB
Shape,"(3288, 2)","(120, 2)"
Dask graph,28 chunks in 13 graph layers,28 chunks in 13 graph layers
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,411.00 kiB,75.00 kiB
Shape,"(3288, 8, 2)","(600, 8, 2)"
Dask graph,6 chunks in 19 graph layers,6 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 411.00 kiB 75.00 kiB Shape (3288, 8, 2) (600, 8, 2) Dask graph 6 chunks in 19 graph layers Data type float64 numpy.ndarray",2  8  3288,

Unnamed: 0,Array,Chunk
Bytes,411.00 kiB,75.00 kiB
Shape,"(3288, 8, 2)","(600, 8, 2)"
Dask graph,6 chunks in 19 graph layers,6 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [23]:
alldat

Unnamed: 0,Array,Chunk
Bytes,3.17 GiB,29.66 MiB
Shape,"(3288, 360, 720)","(120, 180, 360)"
Dask graph,112 chunks in 75 graph layers,112 chunks in 75 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 3.17 GiB 29.66 MiB Shape (3288, 360, 720) (120, 180, 360) Dask graph 112 chunks in 75 graph layers Data type float32 numpy.ndarray",720  360  3288,

Unnamed: 0,Array,Chunk
Bytes,3.17 GiB,29.66 MiB
Shape,"(3288, 360, 720)","(120, 180, 360)"
Dask graph,112 chunks in 75 graph layers,112 chunks in 75 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,36.12 MiB,6.59 MiB
Shape,"(3288, 720, 2)","(600, 720, 2)"
Dask graph,6 chunks in 41 graph layers,6 chunks in 41 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 36.12 MiB 6.59 MiB Shape (3288, 720, 2) (600, 720, 2) Dask graph 6 chunks in 41 graph layers Data type float64 numpy.ndarray",2  720  3288,

Unnamed: 0,Array,Chunk
Bytes,36.12 MiB,6.59 MiB
Shape,"(3288, 720, 2)","(600, 720, 2)"
Dask graph,6 chunks in 41 graph layers,6 chunks in 41 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,18.06 MiB,3.30 MiB
Shape,"(3288, 360, 2)","(600, 360, 2)"
Dask graph,6 chunks in 41 graph layers,6 chunks in 41 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 18.06 MiB 3.30 MiB Shape (3288, 360, 2) (600, 360, 2) Dask graph 6 chunks in 41 graph layers Data type float64 numpy.ndarray",2  360  3288,

Unnamed: 0,Array,Chunk
Bytes,18.06 MiB,3.30 MiB
Shape,"(3288, 360, 2)","(600, 360, 2)"
Dask graph,6 chunks in 41 graph layers,6 chunks in 41 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,411.00 kiB,75.00 kiB
Shape,"(3288, 8, 2)","(600, 8, 2)"
Dask graph,6 chunks in 41 graph layers,6 chunks in 41 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 411.00 kiB 75.00 kiB Shape (3288, 8, 2) (600, 8, 2) Dask graph 6 chunks in 41 graph layers Data type float64 numpy.ndarray",2  8  3288,

Unnamed: 0,Array,Chunk
Bytes,411.00 kiB,75.00 kiB
Shape,"(3288, 8, 2)","(600, 8, 2)"
Dask graph,6 chunks in 41 graph layers,6 chunks in 41 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
