In [1]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
import IPython.parallel as p
from IPython.display import clear_output
dates = pd.read_pickle('/Volumes/data/Theo/projects/Budyko_vic/timecode.pcl')

forcing_columns = ['P','Tmax','Tmin','W']
fluxes_columns = ['y','m','d','ET','R','BF','sm1','sm2','sm3','SWE','Cs','Qs','Ql','Qg','NR','PEText','PETtrc','PETsrc']



In [2]:
c = p.Client()

In [3]:
c.ids

[0, 1, 2, 3]

In [4]:
view = c.load_balanced_view()

In [5]:
%%px
import pandas as pd
import numpy as np
import statsmodels.api as sm
dates = pd.read_pickle('/Volumes/data/Theo/projects/Budyko_vic/timecode.pcl')
forcing_columns = ['P','Tmax','Tmin','W']
fluxes_columns = ['y','m','d','ET','R','BF','sm1','sm2','sm3','SWE','Cs','Qs','Ql','Qg','NR','PEText','PETtrc','PETsrc']

In [6]:
files = pd.read_pickle('./forcing_fluxes_filenames_lat_lon_index.df')
files.columns = ['exists','flux','forcing','lat','lon','idx']

In [7]:
len(files)

309673

In [8]:
%%px
def wateryear(index):
    year = index.year
    strt = pd.datetime(year,10,1) # start of the next water year+1
    
    if index<strt:
        wyear = year
    elif index>=strt: 
        wyear = year+1

    return wyear

In [9]:
def wateryear(index):
    year = index.year
    strt = pd.datetime(year,10,1) # start of the next water year+1
    
    if index<strt:
        wyear = year
    elif index>=strt: 
        wyear = year+1

    return wyear

In [12]:
num = 200
fl = files.flux[num]
fr = files.forcing[num]
idx = files.idx[num]

def interannual(fl,fr,idx):
    
    # load the flux file
    flux = pd.read_table(fl, sep='\t', names = fluxes_columns)
    flux.index = pd.DatetimeIndex(dates)

    # load the forcing file
    force = pd.read_table(fr,sep=' ', names = forcing_columns)
    force.index = pd.DatetimeIndex(dates)

    force['wateryear']=force.index.map(wateryear) # compute the water year for each entry
    flux['wateryear'] = force.index.map(wateryear) # compute the water year for each entry

    # compute snowmelt
    swe = flux.SWE.as_matrix() # convert SWE to a matrix
    smelt = swe[0:-1]-swe[1:] # compute snowmelt for each day
    smelt = np.append(0,smelt) # append a zero onto it
    smelt[smelt<0] = 0 # set accumulation events to 0

    flux['sm'] = smelt # add snowmelt back into the flux matrix

    force_sum = force.groupby(by='wateryear').sum()[1:-1] # group by and save complete water years
    flux_sum = flux.groupby(by='wateryear').sum()[1:-1] # group by and save complete water years

    flux_mean = flux.loc[flux['sm']>0,:].groupby(by='wateryear').mean()[1:-1] # also drop the first and last entries

    flux_sum['Q'] = flux_sum.R+flux_sum.BF # compute discharge for each water year

    flux_sum['Q_P'] = flux_sum.Q/force_sum.P # compute Q/P, runoff ratio for each water year

    ## regress the data
    #print 'got to regression'
    data = pd.DataFrame({'sm':flux_mean.sm,'Q_P':flux_sum.Q_P})
    #data = pd.concat([flux_mean['sm'],flux_sum['Q_P']],axis=1)
    data2 = data.dropna().astype(float)
    #data2.reset_index(inplace=True)
    X = data2['sm']

    if len(data2)<10:
        return idx,np.NaN,np.NaN,np.NaN,np.NaN
        #print idx,np.NaN,np.NaN,np.NaN,np.NaN
    else:
        y = data2.Q_P
        X = sm.add_constant(X)
        results = sm.OLS(y, X).fit()

        RR_pval = results.f_pvalue
        RR_rsqrd = results.rsquared_adj
        RR_slope = results.params[1] # slope
        RR_intercept = results.params[0] # intercept

        return idx,RR_pval,RR_rsqrd,RR_slope,RR_intercept
        #print idx,RR_pval,RR_rsqrd,RR_slope,RR_intercept

In [13]:
interannual(fl,fr,idx)

(201,
 0.71075578981185006,
 -0.076730744702182108,
 -0.0046604710922694892,
 0.20073342295241361)

In [26]:
def compwflux(fl,fr,idx):
    flux = pd.read_table(fl, sep='\t', names = fluxes_columns)
    flux.index = pd.DatetimeIndex(dates)

    force = pd.read_table(fr,sep=' ', names = forcing_columns)
    force.index = pd.DatetimeIndex(dates)

    force['wateryear']=force.index.map(wateryear) # compute the water year for each entry
    flux['wateryear'] = force.index.map(wateryear) # compute the water year for each entry
    #wyears = np.unique(force.index.year)[1:] # grab all the complete water years

    # compute snowmelt
    swe = flux.SWE.as_matrix() # convert SWE to a matrix
    smelt = swe[0:-1]-swe[1:] # compute snowmelt for each day
    smelt = np.append(0,smelt) # append a zero onto it
    smelt[smelt<0] = 0 # set accumulation events to 0
    flux['sm'] = smelt
    
    # compute rainfall
    coef = (0.-force.Tmin)/(np.abs(force.Tmax-force.Tmin))
    coef[coef<0] = 0.
    coef[coef>1] = 1.
    force['rain'] = force.P.as_matrix()*(1-coef) # compute rain
    force['wflux'] = force.rain + flux.sm

    force_sum = force.groupby(by='wateryear').sum()[1:-1] # group by and save complete water years
    flux_sum = flux.groupby(by='wateryear').sum()[1:-1] # group by and save complete water years

    flux_mean = flux.loc[(flux.sm>0) | (force.rain>0),:].groupby(by='wateryear').mean()[1:-1]
    force_mean = force.loc[(flux.sm>0) | (force.rain>0),:].groupby(by='wateryear').mean()[1:-1]
    
    n = len(np.unique(force.wateryear)[1:-1]) # get the length of the variable

    # convert output variables
    wyears = list(flux_sum.index)
    idx = list(np.repeat(idx,n))
    huc8 = list(np.repeat(huc8,n))
    sm = list(flux_mean.sm)
    rain = list(force_mean.rain)
    wflux = list(force_mean.wflux)
    
    return wyears,idx,huc8,sm,rain,wflux

In [36]:
#test batch
num = 200
num2 = 300
fl = files.flux[num:num2]
fr = files.forcing[num:num2]
idx = files.idx[num:num2]

In [14]:
#full calculation
fl = files.flux
fr = files.forcing
idx = files.idx

## Run the Interannual Calculation

In [15]:
res = view.map(interannual,fl,fr,idx)

In [16]:
import ProgressBar as pb

In [17]:
p = pb.ProgressBar(len(fl))

In [18]:
while res.ready() == False:
    p.animate(res.progress)

[****************100%******************]  309666 of 309673 complete


In [19]:
idx,RR_pval,RR_rsqrd,RR_slope,RR_intercept = zip(*res.result) # unzip the results

In [20]:
np.savez_compressed('./RR_interannual_slope.npz', idx=idx,RR_pval=RR_pval,RR_rsqrd=RR_rsqrd,RR_slope=RR_slope,RR_intercept=RR_intercept)

## Run the Water Flux Calculation

In [28]:
res = view.map(compwflux,fl,fr,idx,huc8)

In [29]:
res.progress

12

In [30]:
while res.ready() == False:
    pause(0.5)
    clear_output(wait=0.2)
    print res.progress

1507


In [35]:
wyears,idx,huc8,sm,rain,wflux = zip(*res.result) # unzip the results

In [36]:
# repack the results

m,n = np.shape(wyears)
wyears= np.reshape(wyears,[1,m*n])[0]
sm = np.reshape(sm,[1,m*n])[0]
rain = np.reshape(rain,[1,m*n])[0]
wflux = np.reshape(wflux,[1,m*n])[0]
idx = np.reshape(idx,[1,m*n])[0]
huc8 = np.reshape(huc8,[1,m*n])[0]

In [38]:
np.savez_compressed('./WSC_HUC8_interannual_wflux.npz', wyears=wyears,sm=sm,rain=rain,wflux=wflux,idx=idx,huc8=huc8)