# Postprocess PLUMBER data

In [95]:
%pylab inline
import os
import time
import glob
import operator
import xarray as xr
import itertools
import scipy.stats
import pandas as pd
import matplotlib as mpl
from pprint import pprint
from fastdtw import fastdtw
from functools import reduce
from scipy.special import psi, inv_boxcox
from jupyterthemes import jtplot
from sklearn.neighbors import NearestNeighbors

jtplot.style('grade3', fscale=1.3)
jtplot.figsize(x=18, y=10)
mpl.rcParams['figure.figsize'] = (18, 10)

MM_PER_M = 1000
SEC_PER_MIN = 60
MIN_PER_HOUR = 60
HOUR_PER_DAY = 24
DAY_PER_YEAR = 365
# Timestep is 1 hour in our case
SEC_PER_TIMESTEP = SEC_PER_MIN * MIN_PER_HOUR

user = os.environ['USER']
sites = ['Amplero', 'Blodgett', 'Bugac', 'ElSaler', 'ElSaler2', 'Espirra', 'FortPeck', 
         'Harvard', 'Hesse', 'Howard', 'Howlandm', 'Hyytiala', 'Kruger', 'Loobos', 'Merbleue',
         'Mopane', 'Palang', 'Sylvania', 'Tumba', 'UniMich']
pals_headers = ['d1', 'd2', 'd3', 'Ws', 'Ta', 'Rn', 'Rh', 'PP', 
                'Qe', 'Qh', 'NEE', 'SM1', 'SM2']
obs_path_template = '/pool0/data/'+user+'/PLUMBER_data/sites/{}/observations/**/*.nc'
bench_path_template = '/pool0/data/'+user+'/PLUMBER_data/sites/{}/benchmark/**/*.nc'
spreadsheet_template = '/pool0/data/'+user+'/PLUMBER_data/PLUMBER/data/pals_data/spreadsheets/*.csv'
extracted_template = '/pool0/data/'+user+'/PLUMBER_data/PLUMBER/data/pals_data/extracted/{}.txt'
obs_files = {s: glob.glob(obs_path_template.format(s), recursive=True) for s in sites}
bench_files = {s: glob.glob(bench_path_template.format(s), recursive=True) for s in sites}
spreadsheets = glob.glob(spreadsheet_template, recursive=True)
extracted_files = {s: pd.read_csv(extracted_template.format(s), names=pals_headers, delim_whitespace=True) for s in sites}

Populating the interactive namespace from numpy and matplotlib


`%matplotlib` prevents importing * from pylab and numpy
  "\n`%matplotlib` prevents importing * from pylab and numpy"


In [98]:
def preprocess(ds):
    return ds.squeeze(drop=True).drop(['latitude', 'longitude'])

def postprocess(site):
    var_dict = {'Qle': 'latent_heat',
                'Qh': 'sensible_heat',
                'SWdown': 'shortwave',
                'Tair': 'temperature',
                'pptrate': 'precipitation',
                'soil_moisture': 'soil_moisture'}
    df = extracted_files[site][~extracted_files[site]['d1'].isnull()]
    ds = xr.open_mfdataset(obs_files[site], preprocess=preprocess)
    df.index = ds.time
    ds['soil_moisture'] = df['SM1'].to_xarray() + df['SM2'].to_xarray()
    ds['pptrate'] *= SEC_PER_TIMESTEP
    return ds.rename(var_dict)[list(var_dict.values())]

data = {s: postprocess(s) for s in sites}

In [151]:
def raw_data(data_array):
    """Unpacks data into a flat array"""
    eps = 1e-12
    return data_array.values.flatten() + eps * np.random.random(data_array.shape)


def good_inds(x, strict=False):
    """Remove invalid data"""
    if strict:
        return np.where(np.logical_and(x > 1e-10, x < 3000))[0]
    return np.where(np.logical_and(x > -3000, x < 3000))[0]

def good_data(x, strict=False):
    return x[good_inds(x, strict=strict)]

def nearest_distances(X, k=1):
    """Compute distances to kth nearest neighbors"""
    knn = NearestNeighbors(n_neighbors=k, metric='chebyshev')
    knn.fit(X)
    d, _ = knn.kneighbors(X)
    return d[:, -1]


def marginal_neighbors(X, R):
    """Compute number of neighbors in R-radius ball"""
    knn = NearestNeighbors(metric='chebyshev')
    knn.fit(X)
    return np.array([len(knn.radius_neighbors(p.reshape(1, -1), r)[0][0])
                     for p, r in zip(X, R)])


def entropy_sk(X, k=False):
    """Compute entropy H(X1, X2,..., Xn)"""
    try:
        n, d = X.shape
    except Exception as e:
        X = X.reshape(-1, 1)
        n, d = X.shape
    if not k:
        k = 10#int(np.sqrt(n))
    eps = 1e-12
    r = nearest_distances(X, k) + eps * np.random.random(n)
    n, d = X.shape
    ent = d * np.log(np.mean(r)) + psi(n) - psi(k) + d * np.log(2)
    return ent


def mutual_info(x, y, k=False):
    """Compute I(X; Y)"""
    n = len(x)
    if not k:
        k = 10#int(np.sqrt(n))
    eps = 1e-12
    r = nearest_distances(np.array([x, y]).T, k) + eps * np.random.random(n)
    n_x = marginal_neighbors(x.reshape(-1,1), r)
    n_y = marginal_neighbors(y.reshape(-1,1), r)
    psi_x = psi(n_x)
    psi_y = psi(n_y)
    return (psi(n) + psi(k) - (1./k) - np.mean(psi_x + psi_y))


def conditional_mutual_info(X, k=False):
    """Compute I(X1; X2 | X3:n)"""
    xz = np.array([X[0], *X[2:]]).T
    yz = np.array([X[1], *X[2:]]).T
    z = np.array(X[2:]).T
    d, n = X.shape
    if not k:
        k = 10
    eps = 1e-12
    r = nearest_distances(X.T, k) + eps * np.random.random(n)
    n_xz = marginal_neighbors(xz, r)
    n_yz = marginal_neighbors(yz, r)
    n_z = marginal_neighbors(z, r)
    psi_xz = psi(n_xz)
    psi_yz = psi(n_yz)
    psi_z = psi(n_z)
    return psi(k) - np.mean(psi_xz + psi_yz - psi_z)
    #return (psi(k) - (2./k) - np.mean(-psi_z + psi_xz + psi_yz - (1/n_xz) - (1/n_yz)))

def transfer_entropy(X: np.array, lag: int, window: int, sample_size: int=3000) -> float:
    """
    Computes TE(X2_t; X1_{t-lag} | X2_{t-lag})

    Args:
        X: A list containing multiple timeseries.
           Example: [evap, precip] will compute
           the information transferred from precip to evap,
        lag: The time-offset to consider (in timesteps).

    Returns:
        The value of the equation given above.  Small values
        denote less of a connection.  Negative values are
        possible, but should be thrown out - as they are
        numerical artifacts.
    """
    assert len(X) == 2
    # Subsample data and put it together with combination list
    sample_size = np.min([sample_size, len(X[0])-lag-window])
    idx = np.random.choice(np.arange(lag+window, len(X[0])), sample_size)
    cmis = []
    for w in range(window):
        ans = [X[1][idx]]
        ans.append(X[0][idx-w-lag])
        ans.append(X[1][idx-1])
        cmis.append(conditional_mutual_info(np.array(ans)))
    return np.sum(cmis)
   

def conditional_transfer_entropy(X: np.array, lag: int, window: int, sample_size: int=3000) -> float:
    """
    Computes TE(X1_t; X2_{t-lag} | X3:n_{t-lag}, X1_{t-lag})

    Args:
        X: A list containing multiple timeseries.
           Example: [evap, precip, runoff] will compute
           the information transferred from precip to evap,
           eliminating any effects from runoff.
        lag: The time-offset to consider (in days).

    Returns:
        The value of the equation given above.  Small values
        denote less of a connection.  Negative values are
        possible, but should be thrown out - as they are
        numerical artifacts.
    """
    # Need at least 3 variables to compute the TE
    assert len(X) > 2
    # Subsample data and put it together with combination list
    sample_size = np.min([sample_size, len(X[0])-lag-window])
    idx = np.random.choice(np.arange(lag+window, len(X[0])), sample_size)
    cmis = []
    for w in range(window):
        ans = [X[1][idx]]
        ans.append(X[0][idx-w-lag])
        for v in X[2:]:
            ans.append(v[idx-w-lag])
        ans.append(X[1][idx-1])
        cmis.append(conditional_mutual_info(np.array(ans)))
    return np.sum(cmis)

def mutual_info_analysis_full_lagged(ds, out_name, lag=0, window=3, sample_size=10000, conditional=True):
    """
    Compute the mutual information or transfer entropy of variables of
    interest for a dataset.  
    """
    precip = raw_data(ds['precipitation'])
    precip += 1e-12*np.random.rand(len(precip))
    temp = raw_data(ds['temperature'])
    soil_moist = raw_data(ds['soil_moisture'])
    lat_heat = raw_data(ds['latent_heat'])
    sen_heat = raw_data(ds['sensible_heat'])
    swrad = raw_data(ds['shortwave'])
    
    names = ['precipitation', 'temperature', 'soil_moisture', 'latent_heat', 'sensible_heat', 'shortwave']
    varlist = [precip, temp, soil_moist, lat_heat, sen_heat, swrad]
    gi_list = [good_inds(v) for v in varlist]
    #gi_list.append(good_inds(precip, strict=True))
    good_data = reduce(np.intersect1d, gi_list)
    varlist = np.array([v[good_data] for v in varlist])
   
    # Calculate all needed variable combinations
    mapping = {n: d for n, d in zip(names, varlist)}
    permutations = [list(l) for l in list(itertools.permutations(names, 2))]
    for combo in permutations:
        n = [n for n in names if n not in combo ]
        [combo.append(nn) for nn in n]   
        
    # Subsample data and put it together with combination list
    analysis_sets = []
    for combo in permutations:
        analysis_sets.append([mapping[c] for c in combo])
    
    # Compute scores
    scores = []
    for c, s in zip(permutations, analysis_sets):
        if conditional:
            scores.append(conditional_transfer_entropy(s, lag, window, sample_size=sample_size))
        else:
            scores.append(transfer_entropy(s[0:2], lag, window, sample_size=sample_size))
       
    # Reformat into a nice dataframe, save it, and return
    df = pd.DataFrame(columns=names, index=names)
    for link, score in zip(permutations, scores):
        if score < 1e-6:
            score = 0
        d = {'name_x': link[0], 'name_y': link[1], 'value': score}
        df.loc[link[0], link[1]] = score
    for name, var in zip(names, varlist):
        e_tot = entropy_sk(var)
        #print('computing unexplained entropy for {}: {}'.format(name, e_tot))
        e_tot -= df[name].sum(skipna=True)
        df[name][name] = e_tot
    df.to_csv('../data/{}.csv'.format(out_name))
    return df


In [153]:
mutual_info_analysis_full_lagged(data['Mopane'], 'mopane_observations_0_10', lag=119*24, window=12, sample_size=1000, conditional=True)

1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000


Unnamed: 0,precipitation,temperature,soil_moisture,latent_heat,sensible_heat,shortwave
precipitation,3.78371,0.0,0.0,0.0,0.0,0.0
temperature,0.0,3.34244,0.0,0.000491342,0.0370549,0.0
soil_moisture,0.0,0.0,2.19132,0.0,0.0,0.0
latent_heat,0.0,0.0,0.0,6.13421,0.0,0.0
sensible_heat,0.0,0.0,0.0,0.168036,6.2258,0.292496
shortwave,0.0,0.0,0.0,0.00404034,0.122138,6.74118


In [106]:
n_sm = {s: len(good_inds(raw_data(data[s]['soil_moisture']))) for s in sites}
good_sites = [k for k, v in n_sm.items() if v > 0]
print(good_sites)

['Amplero', 'Blodgett', 'ElSaler', 'ElSaler2', 'FortPeck', 'Hesse', 'Hyytiala', 'Kruger', 'Loobos', 'Mopane', 'Sylvania']


In [109]:
lag = 0
win = 10
df_dict = {}
for s in good_sites:
    print(s)
    df_dict[s] = mutual_info_analysis_full_lagged(data[s], '{}_obs_{}_{}'.format(s, lag, win), lag=lag, window=win)
    

Amplero
Blodgett
ElSaler
ElSaler2
FortPeck
Hesse
Hyytiala
Kruger
Loobos
Mopane
Sylvania


In [118]:
entropy_sk(data['Mopane']['precipitation'].values)

3.77328995292681

In [133]:
transfer_entropy(np.array([data['Mopane']['soil_moisture'].values, data['Mopane']['latent_heat'].values]), lag=0, window=5, sample_size=5000)

0.51111021907726739