In [1]:
from dask.distributed import Client, LocalCluster
if __name__ == "__main__":
    cluster=LocalCluster(host="tcp://127.0.0.1:2456",dashboard_address="127.0.0.1:2467",n_workers=4)
    client = Client(cluster)

In [155]:
import numpy as np
import pandas as pd
import xarray as xr
import math
import dask

import skimage.feature
import skimage.segmentation
import scipy.ndimage as ndi

import glob

import multiprocessing
from multiprocessing import Pool, Manager

In [156]:
def pre_process(U,V):
    try:
        U.longitude
        U.latitude
    except:
        V = V.rename({'lon':'longitude'})
        V = V.rename({'lat':'latitude'})
        U = U.rename({'lon':'longitude'})
        U = U.rename({'lat':'latitude'})
        U.longitude
        U.latitude
    try:
        test = U.uas
    except:
        try:
            test = U.ua
            U = U.rename({'ua':'uas'})
            V = V.rename({'va':'vas'})
        except:
            U = U.rename({'uwnd':'uas'})
            V = V.rename({'vwnd':'vas'})
    try:
        test = U.level
    except:
        try:
            test = U.plev
            U = U.rename({'plev':'level'})
            V = V.rename({'plev':'level'})
        except:
            U = U.rename({'height':'level'})
            V = V.rename({'height':'level'})
    return U,V

In [157]:
def front_watershed(U,V,lon,lat):
    print('front_watershed')
    front = U.copy()
    front['uas'] = front.uas*0
    front = front.rename({'uas':'x'})
    front['U2'] = U.uas #now
    front['V2'] = V.vas #now
    front['U1'] = xr.DataArray(np.concatenate([U.uas.values[:2]*np.nan,U.uas.values[:-2]]),dims=("time","latitude", "longitude"), coords={"time":front.time.values,"longitude":lon ,"latitude": lat})
    front['V1'] = xr.DataArray(np.concatenate([V.vas.values[:2]*np.nan,V.vas.values[:-2]]),dims=("time","latitude", "longitude"), coords={"time":front.time.values,"longitude":lon ,"latitude": lat})
    front = front.sel(time=front.time[2:])
    x = xr.where(front['U1'].values>0,front['U1'].values/front['U1'].values,np.nan)
    x = xr.where(front['V1'].values<0,x,np.nan)
    x = xr.where(front['U2'].values>0,x,np.nan)
    x = xr.where(front['V2'].values>0,x,np.nan)
    x = xr.where(front['V2'].values - front['V1'].values >  2.0 ,x,np.nan)
    front['x'] = xr.DataArray(x,dims=("time","latitude", "longitude"), coords={"time":front.time.values,"longitude":lon ,"latitude": lat}) #were above frontal criteria are met
    front['front'] = xr.DataArray(x*np.nan,dims=("time","latitude", "longitude"), coords={"time":front.time.values,"longitude":lon ,"latitude": lat}) #empty for output
    x = np.nan_to_num(x).astype('int64')
    for d in range(2,len(front.time)): #watershed segmentation to label each front
        front_idx = skimage.feature.peak_local_max(x[d], footprint=np.ones((3, 3)),labels=x[d])
        front_mask = np.full_like(x[d], fill_value = False)
        front_mask[tuple(front_idx.T)] = True
        markers = ndi.label(front_mask)[0]
        obj = skimage.segmentation.watershed(-x[d], markers, mask=front_mask)
        z = xr.DataArray(obj, dims=("lat", "lon"), coords={"lon": lon , "lat":lat})
        front['front'][d] = z
    front['front'] = front.front/front.front
    return front

In [158]:
def getfront(frontin,dx='dx',dy='dy',lat='lat',lon='lon'):
    print('getfront')
    front = frontin
    front = front.sel(time=front.time[2:])
    for d in range(len(front.time)):
        winners=[]
        if math.isnan(np.nanmax(front.front[d]))==False:
            for z in range(1,int(np.nanmax(front.front[d]))+1):
                ids =  np.argwhere(front.front[d].values==z)
                latsid = [item[0] for item in ids]
                lonsid = [item[1] for item in ids]
                lats = [lat[i] for i in latsid]
                lons = [lon[i] for i in lonsid]
                mlat = np.mean(lats)
                imlat = np.argmin((lat-mlat)**2) #mean lat
                #area = dy*np.sum(dx[latsi[z]]) #area not sure why though not used again?
                xlen = dx[imlat]*4*np.std(lons)
                ylen = dy*4*np.std(lats)
                length = np.sqrt((ylen**2)+(xlen**2))  #length
                if length>500.0:   #must be greater than 500km
                    winners.append(z)
            front['front'][d] = front.front[d].where(np.isin(front.front[d],winners))
    return front


In [159]:
def get_files():
    models = glob.glob("/terra/data/cmip5/global/rcp85/*")
    avail={}
    for model in models:
        uas = glob.glob(str(model)+"/r1i1p1/3hr/native/uas*")
        vas = glob.glob(str(model)+"/r1i1p1/3hr/native/vas*")
        try:
            test = uas[0]
            avail[model.split('/')[-1]] = {'uas':uas,'vas':vas}
        except:
             pass
    return avail

In [160]:
def compute(dict_in ,key):
    print(key)
    U = xr.open_mfdataset(dict_in['uas'])
    V = xr.open_mfdataset(dict_in['vas'])
    U,V = pre_process(U,V)
    U = U.sel(time=slice('2030-01-01', '2045-12-01')) #1950 to last available
    V = V.sel(time=slice('2030-01-01', '2045-12-01'))
    U = U.sel(latitude = slice(-75,-15))
    U = xr.concat([U.sel(longitude = slice(0,30)),U.sel(longitude = slice(320,360))],dim='longitude')
    V = V.sel(latitude = slice(-75,-15))
    V = xr.concat([V.sel(longitude = slice(0,30)),V.sel(longitude = slice(320,360))],dim='longitude')
    V.coords['longitude'] = (V.coords['longitude'] + 180) % 360 - 180
    U.coords['longitude'] = (U.coords['longitude'] + 180) % 360 - 180
    with dask.config.set(**{'array.slicing.split_large_chunks': True}):
        U = U.sortby(U.longitude).load()
        V = V.sortby(U.longitude).load()
    lon = U.longitude.values
    nlon = len(lon)
    lat = U.latitude.values
    nlat = len(lat)
    front = front_watershed(U,V,lon,lat)
    U.close()
    V.close()
    dx = np.cos(lat*math.pi/180.0)*2*math.pi*6370/360*(lon[-1]-lon[1])/(nlon-1)
    dy = ((lat[2]-lat[1])/180.0)*6370*math.pi
    front = getfront(front,dx,dy,lat,lon)
    front = front.fillna(0)
    front = front.front.resample(time = 'QS-DEC').sum()
    front.to_netcdf(path='Files/'+str(key)+'_seasonal_fronts.nc', mode='w')
    return front

In [161]:
avail = get_files()

In [162]:
for model in avail:
    try:
        compute(avail[model], model)
    except:
        print(model, '- failed')

GFDL-CM3
front_watershed
getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


GFDL-ESM2G
front_watershed
getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


IPSL-CM5A-LR
front_watershed
getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


GFDL-ESM2M
front_watershed
getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


IPSL-CM5A-MR
front_watershed
getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


In [163]:
files = ['CMIP5/output1/MIROC/MIROC-ESM/rcp85/3hr/atmos/3hr/r1i1p1/', 
'CMIP5/output1/MIROC/MIROC-ESM-CHEM/rcp85/3hr/atmos/3hr/r1i1p1/',
'CMIP5/output1/MIROC/MIROC5/rcp85/3hr/atmos/3hr/r1i1p1/',
'CMIP5/output1/CNRM-CERFACS/CNRM-CM5/rcp85/3hr/atmos/3hr/r1i1p1/ ',
'CMIP5/output1/MRI/MRI-CGCM3/rcp85/3hr/atmos/3hr/r1i1p1/',
'CMIP5/output1/MRI/MRI-ESM1/rcp85/3hr/atmos/3hr/r1i1p1/',
'CMIP5/output1/LASG-IAP/FGOALS-s2/rcp85/3hr/atmos/3hr/r1i1p1/', 
'CMIP5/output1/BNU/BNU-ESM/rcp85/3hr/atmos/3hr/r1i1p1/',
'CMIP5/output1/CMCC/CMCC-CM/rcp85/3hr/atmos/3hr/r1i1p1/']
path  = '/terra/data/incoming/cmip5/'
others = {}
for file in files:
    model = file.split('/')[3]
    uas = glob.glob(str(path)+str(file)+"uas/*")
    vas = glob.glob(str(path)+str(file)+"vas/*")
    others[model] = {'uas':uas,'vas':vas}

In [164]:
others['CNRM-CM5']['uas'] = glob.glob('/terra/data/incoming/cmip5/CMIP5/output1/CNRM-CERFACS/CNRM-CM5/rcp85/3hr/atmos/3hr/r1i1p1/uas/*')
others['CNRM-CM5']['vas'] = glob.glob('/terra/data/incoming/cmip5/CMIP5/output1/CNRM-CERFACS/CNRM-CM5/rcp85/3hr/atmos/3hr/r1i1p1/vas/*')

In [167]:
for model in others:
    try:
        compute(others[model], model)
    except:
        print(model, '- failed')

MIROC-ESM
front_watershed


  x = xr.where(front['U1'].values>0,front['U1'].values/front['U1'].values,np.nan)


getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


MIROC-ESM-CHEM
front_watershed


  x = xr.where(front['U1'].values>0,front['U1'].values/front['U1'].values,np.nan)


getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


MIROC5
front_watershed


  x = xr.where(front['U1'].values>0,front['U1'].values/front['U1'].values,np.nan)


getfront
CNRM-CM5


  dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
  return array(a, dtype, copy=False, order=order)
  dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
  return array(a, dtype, copy=False, order=order)


CNRM-CM5 - failed
MRI-CGCM3
front_watershed
getfront
MRI-ESM1
front_watershed
getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


FGOALS-s2
front_watershed
getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


BNU-ESM
front_watershed
getfront


  if math.isnan(np.nanmax(front.front[d]))==False:


CMCC-CM
front_watershed


  x = xr.where(front['U1'].values>0,front['U1'].values/front['U1'].values,np.nan)


getfront


In [171]:
xr.open_mfdataset(others['CNRM-CM5']['uas'])

  dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
  return array(a, dtype, copy=False, order=order)


Unnamed: 0,Array,Chunk
Bytes,3.57 MiB,228.38 kiB
Shape,"(233736, 2)","(14616, 2)"
Count,60 Tasks,16 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 3.57 MiB 228.38 kiB Shape (233736, 2) (14616, 2) Count 60 Tasks 16 Chunks Type object numpy.ndarray",2  233736,

Unnamed: 0,Array,Chunk
Bytes,3.57 MiB,228.38 kiB
Shape,"(233736, 2)","(14616, 2)"
Count,60 Tasks,16 Chunks
Type,object,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,456.52 MiB,28.55 MiB
Shape,"(233736, 128, 2)","(14616, 128, 2)"
Count,64 Tasks,16 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 456.52 MiB 28.55 MiB Shape (233736, 128, 2) (14616, 128, 2) Count 64 Tasks 16 Chunks Type float64 numpy.ndarray",2  128  233736,

Unnamed: 0,Array,Chunk
Bytes,456.52 MiB,28.55 MiB
Shape,"(233736, 128, 2)","(14616, 128, 2)"
Count,64 Tasks,16 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,913.03 MiB,57.09 MiB
Shape,"(233736, 256, 2)","(14616, 256, 2)"
Count,64 Tasks,16 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 913.03 MiB 57.09 MiB Shape (233736, 256, 2) (14616, 256, 2) Count 64 Tasks 16 Chunks Type float64 numpy.ndarray",2  256  233736,

Unnamed: 0,Array,Chunk
Bytes,913.03 MiB,57.09 MiB
Shape,"(233736, 256, 2)","(14616, 256, 2)"
Count,64 Tasks,16 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,28.53 GiB,1.78 GiB
Shape,"(233736, 128, 256)","(14616, 128, 256)"
Count,48 Tasks,16 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 28.53 GiB 1.78 GiB Shape (233736, 128, 256) (14616, 128, 256) Count 48 Tasks 16 Chunks Type float32 numpy.ndarray",256  128  233736,

Unnamed: 0,Array,Chunk
Bytes,28.53 GiB,1.78 GiB
Shape,"(233736, 128, 256)","(14616, 128, 256)"
Count,48 Tasks,16 Chunks
Type,float32,numpy.ndarray


In [166]:
client.close()