Author: K Geil

Date: 05/2023

Description: convert gaez dat files (daily deviation data from pxv files) to netcdf


# Setup

In [None]:
# for computing
import numpy as np
import xarray as xr # for reading/writing netcdf
import dask.array as da
import dask
import pandas as pd # only used for date times

# convenience things
from time import time, sleep
import os
import glob # for system commands
from natsort import natsorted # for alphabetical sorting

# for plotting
import matplotlib.pyplot as plt
# import cartopy.crs as ccrs

# from dask.diagnostics import ProgressBar
# from dask.distributed import progress

In [None]:
# contain worker logs in their own folder
homedir = os.environ['HOME']
daskpath=os.path.join(homedir, "dask-worker-space-can-be-deleted")

try: 
    os.mkdir(daskpath) 
except OSError as error: 
    print(error) 

In [None]:
# your notebook directory location
repo_dir='/work/hpc/users/kerrie/UN_FAO/repos/py_AEZ_data_prep/'
nb_link='https://github.com/kerriegeil/pyAEZ_data_prep/blob/main/global/01_dat_to_nc.ipynb'

# your data directory location
data_dir='/work/hpc/users/kerrie/UN_FAO/data/'

# the ALOSmask file that has matching grid to your pxv/dat data
# maskfile='/work/hpc/users/kerrie/UN_FAO/scripts/PXV_script/ALOSmask5m_fill.rst'
maskfile=data_dir+'/orig/DataDownload03152023/ALOSmask5m_fill.rst'

# linux path of project directory (to include in file metadata)
source_dirs=['on HPC2 /gri/projects/rgmg/climate/UN_FAO/data_downloads/2023-03-15_DataDownload/',
             'on Orion /work/hpc/users/kerrie/UN_FAO/data/orig/DataDownload03152023/']

year=1980

fillval=-9999. # value used for missing in the dat files

# metadata for output data files
timeattrs={'standard_name':'time','long_name':'time','axis':'T'}
yattrs={'standard_name':'latitude','long_name':'latitude','units':'degrees_north','axis':'Y'}
xattrs={'standard_name':'longitude','long_name':'longitude','units':'degrees_east','axis':'X'}

time_encoding={'calendar':'standard','units':'days since 1900-01-01 00:00:00','_FillValue':None}
y_encoding={'_FillValue':None}
x_encoding={'_FillValue':None}

# Read data from dat file

In [None]:
# list of dat files to convert
filelist=natsorted(glob.glob(data_dir+'gaez_dat_files/global_'+str(year)+'/*'+str(year)+'*'))
filelist

In [None]:
# save varnames for later
varnames=['Precip','Srad','Tmin-2m','Tmax-2m','Vapr','Wind-10m']

# also we have this scale and units info from the file "UnitScaleFactors.txt"
scale_factor=[1E-5,1000.,0.01,0.01,0.01,0.001]
units=['mm/day','J/m2/day','degrees C','degrees C','hPa','m/s']
long_names=['precipitation','surface short wave radiation','2m minimum air temperature','2m maximum air temperature','vapor pressure','10m wind speed']

# which var to process (0-based index of filelist) e.i. v=0 processed Precip
v=2

print('Processing variable =',varnames[v])

In [None]:
temp=open(filelist[v]).read().splitlines() # get each line as a string and remove carriage returns
ilatilon=temp[0::2] # grab the lines with the lat/lons (every other line)
data=temp[1::2]  # grab the lines with the data (every other line)

In [None]:
# get each string lat/lon as integer and put it in an numpy array
ilat=np.array([int(i.split()[0]) for i in ilatilon]).astype('int16') 
ilon=np.array([int(i.split()[1]) for i in ilatilon]).astype('int16') 

# put data in a numpy array too, takes 30-60s
data2D=np.loadtxt(data,dtype='int16')
nt=data2D.shape[1]

print('data dimensions:', data2D.shape[0],'rows (each row represents a different grid cell) by',nt,'cols (each col represents a day of the year)')

print('data min max:',data2D.min(),data2D.max())

In [None]:
test=data2D[407:408:]
test

In [None]:
missing=test[test<0]
missing=np.where(missing==-9999,1,0)
missing.sum()

# Get grid info from mask file

In [None]:
mask=xr.open_dataset(maskfile,engine='rasterio').squeeze()['band_data'] 
mask=mask.drop('band')

mask

In [None]:
ny,nx=mask.shape
yvals=mask.y.data.astype('float32')
xvals=mask.x.data.astype('float32')

# create a time dimension
# doys=(np.arange(365)+1).astype('int32')
time=pd.date_range(str(year)+'-01-01',str(year)+'-12-31',freq='D')

# sometimes leap day is deleted out of the dataset, if so fix up time to match
if len(time) != nt:
    time=time[~(time==str(year)+'-02-29')]


print('global data dimensions:',ny,'latitudes by',nx,'longitudes by',nt,'days')

we now have the metadata needed (lat, lon, time info) to create an xarray data array to store global data

# Start a compute cluster with dask distributed 

if you started jupyter notebook session with many nodes/cores, use LocalCluster below

if you started jupyter notebook session with few cores, use SLURMCluster below


- setting 1 worker to a full node of cores, then scaling up is the way to go

- the default settings with less cores/threads per worker and more workers takes much longer to compute

- for SLURMCluster we have to wait until the workers connect. You can log into orion and squeue -u username to see if you workers are stuck in pending due to lack of priority or nodes being down. DONT BUILD THE TASKLIST OR EXECUTE THE COMPUTATION UNTIL YOU SEE ALL WORKERS UP AND RUNNING (output from client under cluster info)

In [None]:
# for LocalCluster
# from dask.distributed import Client,LocalCluster
# cluster=LocalCluster(n_workers=1,
#                     threads_per_worker=20)


# for SLURMCluster
# when there's plenty of idle nodes choose cores=20, mem=180GB, then scale to 5
# when there's no idle nodes choose cores=10, mem=90GB, then scale to 10
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(
    cores=10, # actual cores (not logical) can't be more cores than are available on a single node (20 for Orion)
    processes=1, # choose 1 process, then scale to more with cluster.scale below
    memory="90GB", # cores x mem/core
    queue="400p48h", #"400p48h", # for Orion MSU jobs (not NOAA)
    local_directory='$TMPDIR', # should be the same for everyone
    walltime="00:05:00", # set this as short as possible or you'll have to go in and kill your workers
    log_directory=daskpath)

In [None]:
# for LocalCluster
# client=Client(cluster)
# cluster.scale(4)  # adjust this to scale for however many cores are available in your notebook session
# sleep(2)
# client

# for SLURMCluster
client=Client(cluster)
cluster.scale(jobs=10) # requesting jobs x the resources defined in cluster
sleep(45) # SLURM takes a comparitively long time to fully connect to the workers, so wait (sometimes up to a full min is req)

# don't start computing below until your workers show up here
client

In [None]:
# if workers didn't show up above keep executing the next block ('client') until they show up 
client

In [None]:
# use this to stop a localCluster or restart kernel
# don't open multiple clusters
# client.close()

# Use dask delayed parallel computing to put data onto global grid

loop thru each latitude to create a chunk of the global data array, where grid cells without data are filled with nan

inside the loop we issue calls to functions that do the heavier computing tasks

we delay those functions to create a dask task graph on length ny=2160 and then set off all the tasks to compute

In [None]:
# function to build a chunk of data for grids where data is present

@dask.delayed
def build_full_lat(ixs,data,y,x,t,fv,sf):
#     arr=xr.DataArray(dims=['y','x','doy'],coords={'y':('y',y),'x':('x',x),'doy':('doy',t)}).astype('float16')
    arr=xr.DataArray(np.nan,dims=['y','x','time'],coords={'y':('y',y),'x':('x',x),'time':('time',time)}).astype('float16')
    for i,ix in enumerate(ixs):
        arr[0:1,ix:ix+1,:]=data[i,:]
    
    arr=arr.where(arr!=fv)
    arr=arr*sf # apply scale factor
    
    return arr

In [None]:
# function to build a chunk of data for grids where data is not present

@dask.delayed
def build_empty_lat(y,x,t):
#     arr=xr.DataArray(dims=['y','x','doy'],coords={'y':('y',y),'x':('x',x),'doy':('doy',t)}).astype('float16')
    arr=xr.DataArray(np.nan,dims=['y','x','time'],coords={'y':('y',y),'x':('x',x),'time':('time',time)}).astype('float16')
    return arr

now we write a normal not-delayed loop to call the delayed functions and collect all the delayed tasks into a list called tasklist

In [None]:
tasklist=[]

# parellilze by latitude (latitude loop)
for iy in range(ny):
    indices=np.where(ilat==iy+1)[0] # find which data rows apply to this latitude
    if np.any(indices):
#         result=build_full_lat((ilon[indices]-1),data2D[indices,:],yvals[iy:iy+1],xvals,doys) # lazy call to func returns a task
        result=build_full_lat((ilon[indices]-1),data2D[indices,:],yvals[iy:iy+1],xvals,time,fillval,scale_factor[v]) # lazy call to func returns a task
        tasklist.append(result) # collect list of compute tasks
    else:
#         result=build_empty_lat(yvals[iy:iy+1],xvals,doys)
        result=build_empty_lat(yvals[iy:iy+1],xvals,time)
        tasklist.append(result)

dask.compute starts the parallel computing and pulls all the results down from workers into a list of arrays

note: the use of *tasklist means output will be a list of len ny=2160 of arrays, without * the output will be len 1

In [None]:
output=dask.compute(*tasklist)
output[0] # look at 1 array in the list

now concat all the arrays into a single large array

In [None]:
bigarr=xr.concat(output,dim='y')
# bigarr

# Write netcdf file

first add appropriate metadata, then write compressed file

In [None]:
# variable/coordinate metadata
varattrs={'standard_name':varnames[v],'long_name':long_names[v],'units':units[v]}

bigarr.name=varnames[v]
bigarr.attrs=varattrs
bigarr['y'].attrs=yattrs
bigarr['x'].attrs=xattrs
bigarr['time'].attrs=timeattrs

ds=bigarr.to_dataset()
ds=ds.assign_attrs({'source_data':source_dirs,
                    'source_code':nb_link})

print('bigarr is',bigarr.nbytes*1E-6,'MB')

ds

let's take a look at part of the data

In [None]:
bigarr.sel(y=slice(50,23),x=slice(-90,-65),time='1980-06-01').plot()

In [None]:
var_encoding = {'zlib':True,'dtype':'float32'}

# writing with compression will take a few minutes
# if we want the file even smaller we could encode scale and offset
ds.to_netcdf(data_dir+'gaez_nc_files/'+varnames[v]+'_DailyDev_'+str(year)+'_5m.nc',
            encoding={'y':y_encoding,
                      'x':x_encoding,
                      'time':time_encoding,
                      varnames[v]:var_encoding})

# Read the netcdf file

In [None]:
var=xr.open_dataset(data_dir+'gaez_nc_files/'+varnames[v]+'_DailyDev_'+str(year)+'_5m.nc')
var

In [None]:
var[varnames[v]].sel(y=slice(50,23),x=slice(-90,-65),time='1980-06-01').plot()