In [1]:
###########################################
#   This script is not finished !!!!!

import pandas as pd
import numpy as np
import multiprocessing as mp
import os,glob
from scipy.signal import find_peaks

In [None]:
def cleanQ(df):
    # eliminate invalid records
    df1 = df.loc[df.Q.apply(lambda x: not isinstance(x, str)),:]
    df2 = df.loc[df.Q.apply(lambda x: isinstance(x, str)),:]
    try:
        df2 = df2.loc[df2.Q.str.match('\d+'),:]
    except:
        pass
    df = pd.concat([df1, df2])
    df['Q'] = df.Q.astype(np.float32)
    return df

def del_unreliableQ(df):
    '''
    all records are rounded to three decimal places
    observations less than 0 were flagged as suspected
    observations with more than ten consecutive equal values greater than 0 were flagged as suspected
    '''
    df = df.loc[df.Q>=0,:].reset_index()
    df['Q'] = df['Q'].round(3)
    df['date'] = pd.to_datetime(df['date'])
    df = df.sort_values('date').set_index('date')
    index = pd.date_range(df.index[0], df.index[-1], freq = 'D')
    df = df.reindex(index)
    df1 = df.diff()
    df1 = df1.where(df1==0, 1).diff()
    start = np.where(df1.values==-1)[0]
    end = np.where(df1.values==1)[0]
    if len(start) == 0 or len(end) == 0:
        # must no less than zero
        df = df.loc[df.Q>=0,:]
        return (df)
    if start[0] > end[0]:
        start = np.array([0]+start.tolist())
    if start[-1] > end[-1]:
        end = np.array(end.tolist()+[df1.shape[0]+10])
    duration = end - start
    start = start[duration>=10]
    end = end[duration>=10]
    del_idx = np.array([item for a,b in zip(start,end) for item in np.arange(a+1,b+2).tolist()])
    del_idx = del_idx[del_idx<df.shape[0]]
    if len(del_idx) > 0:
        df.drop(df.index[del_idx], inplace = True)
    # must no less than zero
    df = df.loc[df.Q>=0,:]
    return (df)

def del_outlierQ(df):
    '''
        Based on a previously suggested approach for evaluating temperature series (Klein Tank et al., 2009), 
        daily streamflow values are declared as outliers if values of log (Q+0.01) are larger or smaller than 
        the mean value of log (Q+0.01) plus or minus 6 times the standard deviation of log (Q+0.01) computed for 
        that calendar day for the entire length of the series. The mean and standard deviation are computed for 
        a 5-day window centred on the calendar day to ensure that a sufficient amount of data is considered. 
        The log-transformation is used to account for the skewness of the distribution of daily streamflow values 
        and 0.01 was added because the logarithm of zero is undefined. Outliers are flagged as suspect. 
        The rationale underlying this rule is that unusually large or small values are often associated with observational issues. 
        The 6 standard-deviation threshold is a compromise, aiming at screening out outliers that could come from 
        instrument malfunction, while not flagging extreme floods or low flows.
    '''
    df['logQ'] = np.log(df['Q']+0.01)
    df['doy'] = df.index.dayofyear
    df['year'] = df.index.year
    df = df.pivot_table(index = 'doy', columns = 'year', values = 'logQ').reset_index()
    def tmp(x0):
        x = np.arange(x0-2, x0+3) 
        x = np.where(x <= 0, x + 366, x)
        x = np.where(x > 366, x - 366, x)
        s = df.loc[df.doy.isin(x),:].drop(columns=['doy']).values.flatten()
        ave = np.nanmean(s)
        std = np.nanstd(s)
        low = ave - std * 6
        upp = ave + std * 6
        return (x0, low, upp)
    thres = list(map(tmp, np.arange(1, 367)))
    thres = pd.DataFrame(data = np.array(thres), columns = ['doy','low','upp'])
    df = df.merge(thres, on = 'doy').set_index('doy')
    df.iloc[:,:(df.shape[1]-2)] = df.iloc[:,:(df.shape[1]-2)].where(df.iloc[:,:(df.shape[1]-2)].lt(df['upp'], axis=0))
    df.iloc[:,:(df.shape[1]-2)] = df.iloc[:,:(df.shape[1]-2)].where(df.iloc[:,:(df.shape[1]-2)].gt(df['low'], axis=0))
    df = df.drop(columns = ['low','upp']).stack().reset_index(name='logQ')
    df['Q'] = np.exp(df['logQ']) - 0.01
    df['Q'] = np.where(df['Q'].abs()<1e-6, 0, df['Q'])
    df['date'] = pd.to_datetime(df['level_1'].astype(str) + '-' + df['doy'].astype(str), format='%Y-%j')
    df = df[['date','Q']].sort_values('date').set_index('date')
    return df

def Eckhardt(Q, alpha=.98, BFI=.80, re=1):
    """
    Recursive digital filter for baseflow separation. Based on Eckhardt, 2004.\n
    Q : array of discharge measurements\n
    alpha : filter parameter\n
    BFI : BFI_max (maximum baseflow index)\n
    re : number of times to run filter
    """
    Q = np.array(Q)
    f = np.zeros(len(Q))
    f[0] = Q[0]
    for t in np.arange(1,len(Q)):
        # algorithm
        f[t] = ((1 - BFI) * alpha * f[t-1] + (1 - alpha) * BFI * Q[t]) / (1 - alpha * BFI)
        if f[t] > Q[t]:
            f[t] = Q[t]
    # calls method again if multiple passes are specified
    return np.nansum(f)/np.nansum(Q)

# Function to identify and delete the smaller sample within 5 days
def delete_smaller_sample(df):
    while True:
        intervals = (df.index[1:] - df.index[:-1]).days
        if (intervals > 5).all():
            break
        indices1 = df.index[:-1][intervals<=5]
        indices2 = df.index[1:][intervals<=5]
        for int1,int2 in zip(indices1,indices2):
            if int1 not in df.index:
                continue
            if df.loc[int1] < df.loc[int2]:
                df.drop(int1, inplace=True)
            else:
                df.drop(int2, inplace=True)
    return df

# Function to calculate lag time between rainfall peak and discharge peak
def calc_lagT(pr_dis, p = 95):
    tmp = pr_dis.unstack(level=-1)
    # use scipy to find local discharge peaks
    R95p = tmp.loc[tmp.pr>=0.1,'pr'].quantile(p/100)
    peaks_pr, _ = find_peaks(tmp.pr.values, height=R95p) # find R95p rainfall peaks
    peaks_dis, _ = find_peaks(tmp.dis.values, height=0)
    # remove peaks within five days
    df_peaks_pr = tmp.iloc[peaks_pr,:].pr
    df_peaks_pr = delete_smaller_sample(df_peaks_pr)
    df_peaks_dis = tmp.iloc[peaks_dis,:].dis
    df_peaks_dis = delete_smaller_sample(df_peaks_dis)
    # calculate lag time bewteen rainfall peaks and discharge peaks
    lagT = []
    noResponse = 0
    for index_pr in df_peaks_pr.index:
        index_dis = df_peaks_dis.loc[df_peaks_dis.index>=index_pr]
        if index_dis.shape[0] == 0:
            noResponse += 1
            continue
        index_dis = index_dis.iloc[[0]].index[0]
        if df_peaks_pr.loc[(df_peaks_pr.index>index_pr)&(df_peaks_pr.index<=index_dis)].shape[0] > 0:
            noResponse += 1
            continue
        lagT.append((index_dis - index_pr).days)
    if df_peaks_pr.shape[0] == 0:
        noResRatio = 1
    else:
        noResRatio = noResponse/df_peaks_pr.shape[0]
    return (pd.Series([np.mean(lagT), noResRatio], index = ['lagT','noResRatio',]))

def calc_hs(df):
    '''df should be a dataframe, include four columns: date, pr, dis, lat, and darea'''
    if len(set(['date','pr','dis','lat','darea'])-set(df.columns.tolist())) != 0:
        raise Exception ('the input df misses columns: date, pr, dis, and darea') 

    # transform streamflow to specific discharge
    df['dis'] = df.dis.values / df.darea.values
    lat = df.lat.values[0]
    df = df.drop(columns=['darea','lat']).set_index('date')

    newtime = pd.date_range(df.index.values[0], df.index.values[-1], freq = 'D')
    df = df.reindex(newtime)

    # discharge quantile
    Q = df.loc[df.dis>0,'dis'].quantile([.05, .1, .5, .95])

    # event duration
    tmp1 = (df[['dis']] > Q.loc[0.5] * 9) * 1
    tmp2 = (df[['dis']] < Q.loc[0.5] * 0.2) * 1
    def func(x):
        y = (np.diff(x) != 0).astype('int').cumsum()
        y = np.hstack([np.nan, y])
        y = pd.DataFrame({'x':x,'y':y})
        y = y.loc[y.x==1,:].groupby('y').size().mean()
        return np.array([y])
    high_q_dur = tmp1.agg(func); high_q_dur = high_q_dur.fillna(0).squeeze()
    low_q_dur = tmp2.agg(func); low_q_dur = low_q_dur.fillna(0).squeeze()

    # calculate some hydrologic signatures
    HS = pd.Series([
        # Mean daily runoff
        df['dis'].mean(), 
        # runoff ratio
        df['dis'].mean() / df['pr'].mean(),  
        # slope of the flow duration curve 
        (np.log(df.loc[df.dis>0,'dis'].quantile(.33)) - np.log(df.loc[df.dis>0,'dis'].quantile(.66))) / (0.66-0.33), 
        # runoff Q5 and Q95
        Q.loc[0.05],
        Q.loc[0.95],
        # ratio of Q10 to Q50 to indicate groundwater
        Q.loc[0.1] / Q.loc[0.5],
        # frequency of high flows, low flows, and zero flows
        (df['dis']>Q.loc[0.5]*9).sum(),
        (df['dis']<Q.loc[0.5]*.2).sum(),
        (df['dis']==0).sum(),
        # variability coefficient
        df['dis'].std() / df['dis'].mean(),
        # event duration
        high_q_dur, low_q_dur,
        # BFI
        df.dis.agg(Eckhardt),
    ], index = [
        'q_mean', 'runoff_ratio', 'slope_fdc', 'Q5', 'Q95', 'Q10_50', 'high_q_freq', 'low_q_freq', 'zero_q_freq', 'cv', 'high_q_dur', 'low_q_dur', 'BFI',
    ])

    # calculate lag time between peak rainfall and peak discharge 
    df_combine = df.reset_index().rename(columns={'index':'date'}).melt(id_vars = 'date')
    df_combine = df_combine.rename(columns={'variable':'name'})
    df_combine = df_combine.set_index(['date','name'])
    df_lagT = df_combine.agg(calc_lagT).T.reset_index()
    HS = pd.concat([HS, df_lagT[['lagT','noResRatio']].squeeze()])

    # transform dataframe to hydrological-year cycle and then calculate some HS
    # generally, 1 October to 30 September in the Northern Hemisphere, 1 July to 30 June in the Southern Hemisphere (https://glossary.ametsoc.org/wiki/Water_year)
    
    if lat >= 0:
        start,end = 10,9
    else:
        start,end = 7,6
    year1 = df.index.year.values[0]
    year2 = df.index.year.values[-1]
    hy1 = pd.to_datetime('%d-%02d-01'%(year1,start))
    hy2 = pd.to_datetime('%d-%02d-30'%(year2,end))
    
    # transform time to hydrologic-cycle time
    df = df.loc[(df.index>=hy1)&(df.index<=hy2),:]
    df.index = pd.date_range('%d-1-1'%year1, periods = df.shape[0], freq = 'D')

    # calculate mean annual rainfall and flashiness index
    p_mean0 = df.groupby(df.index.year)['pr'].sum().mean()
    def funcs(x):
        a = np.abs(x.diff()).sum()
        b = x.sum()
        if b == 0 or np.isinf(b):
            return np.nan
        else:
            return a/b
    FI0 = df.groupby(df.index.year)['dis'].apply(funcs).mean()
    HS = pd.concat([HS, pd.Series([FI0,p_mean0], index = ['FI','p_mean'])])

    # calculate other hydrologic signatures
    # 1. stream_elas
    mq_tot = df.dis.mean()
    mp_tot = df.pr.mean()
    mq = df.groupby(df.index.year)['dis'].mean()
    mp = df.groupby(df.index.year)['pr'].mean()
    dp = mp - mp_tot
    dq = mq - mq_tot
    stream_elas0 = ((dq/mq_tot)/(dp/mp_tot)).median()
    HS = pd.concat([HS, pd.Series([stream_elas0], index = ['stream_elas'])])

    # 2. hfd_mean
    hfd_mean0 = df.groupby(df.index.year)['dis'].apply(lambda x: np.abs(x.cumsum() - x.sum()*0.5).argmin())
    hfd_mean0 = hfd_mean0.where(hfd_mean0<365)
    hfd_mean0 = hfd_mean0.mean(skipna=True)
    HS = pd.concat([HS, pd.Series([hfd_mean0], index = ['hfd_mean'])])

    return (HS)

# read rainfall
def readPr(fname):
    df = pd.read_csv(fname)
    df = df.set_index('ohdb_id')
    df = df.loc[:,df.columns.str.match('\d+_P$')].T.reset_index()
    df['date'] = pd.to_datetime(df['index'].str[:8], format = '%Y%m%d')
    df = df.drop(columns=['index'])
    print(fname)
    return (df)
fnames = glob.glob('../data_mswx/*daily_meteo*csv')
pool = mp.Pool(8)
df_pr = pool.map(readPr, fnames)
df_pr = pd.concat(df_pr)
print(df_pr.shape)

df_attr = pd.read_csv('../data/basin_attributes.csv')

def main(ohdb_id):
    df_pr0 = df_pr[['date',ohdb_id]].rename(columns={ohdb_id:'pr'})
    df_dis = pd.read_csv(f'../../data/OHDB/OHDB_v0.2.3/OHDB_data/discharge/daily/{ohdb_id}.csv')
    df_dis['date'] = pd.to_datetime(df_dis['date'])
    # read
    df_dis = cleanQ(df_dis)
    # quality check
    df_dis = del_unreliableQ(df_dis)
    # delete outliers
    df_dis = del_outlierQ(df_dis).reset_index().rename(columns={'index':'date'})
    df = df_pr0.merge(df_dis, on = 'date')
    df = df.sort_values('date',ascending=True).rename(columns={'Q':'dis'})
    df['darea'] = df_attr.loc[df_attr.ohdb_id==ohdb_id,'gritDarea'].values[0]
    df['lat'] = df_attr.loc[df_attr.ohdb_id==ohdb_id,'ohdb_latitude'].values[0]
    
    df_event = pd.read_csv('../data/dis_OHDB_seasonal4_Qmin7_Qmax7_1982-2023.csv')
    df_event0 = df_event.loc[df_event.ohdb_id==ohdb_id,['Qmax7date','Qmin7date','']]
    
    HS = calc_hs(df)
    HS.name = ohdb_id
    print(ohdb_id)
    return (HS)

if __name__ == '__main__':
    pool = mp.Pool(12)
    HS = pool.map(main, df_attr.ohdb_id.values)
    HS = pd.concat(HS, axis = 1)
    HS = HS.T.reset_index()
    HS.to_csv('../data/time_varying_hydrologic_signatures.csv', index = False)

In [4]:
df_event = pd.read_csv('../data/dis_OHDB_seasonal4_Qmin7_Qmax7_1982-2023.csv')
df_event

Unnamed: 0,season,year,countDay,Qmax7,Qmin7,Qmax7date,Qmin7date,ohdb_id
0,DJF,1995,84,22.264286,4.037143,1995-12-17,1995-02-20,OHDB_009000845
1,DJF,1996,91,34.954285,11.611429,1996-02-10,1996-12-29,OHDB_009000845
2,DJF,1997,90,41.197143,2.850000,1997-01-26,1997-12-31,OHDB_009000845
3,DJF,1998,90,25.871428,1.758571,1998-12-07,1998-02-07,OHDB_009000845
4,DJF,1999,90,43.407144,13.960000,1999-12-27,1999-12-05,OHDB_009000845
...,...,...,...,...,...,...,...,...
1512589,SON,2018,91,0.837286,0.195714,2018-11-28,2018-09-30,OHDB_004000197
1512590,SON,2019,91,0.495429,0.037286,2019-11-04,2019-09-26,OHDB_004000197
1512591,SON,2020,91,0.798571,0.107571,2020-10-30,2020-10-06,OHDB_004000197
1512592,SON,2021,91,11.773143,0.209571,2021-09-05,2021-10-31,OHDB_004000197
