In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.filterwarnings('ignore')
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)
from tensorflow.contrib import legacy_seq2seq
from tensorflow.contrib.rnn import RNNCell
import argparse
import yaml
import os
import pandas as pd
import logging
import numpy as np
import pickle
import scipy.sparse as sp
import time
import matplotlib.pyplot as plt
import keras
import numpy.testing as npt
import copy
import datetime
import tensorflow.contrib.slim as slim
import inspect
import sys
import tqdm
import seaborn as sns
from sklearn import preprocessing
from scipy.sparse import linalg
from pprint import pformat
from importlib import reload
import cmocean
import matplotlib.transforms as mtrans
from lib.AMSGrad import AMSGrad
import gc
sns.set(rc={'figure.figsize':(40, 6)})

In [None]:
def add_simple_summary(writer, names, values, global_step):
    for name, value in zip(names, values):
        summary = tf.Summary()
        summary_value = summary.value.add()
        summary_value.simple_value = value
        summary_value.tag = name
        writer.add_summary(summary, global_step)
def get_total_trainable_parameter_size():
    total_parameters = 0
    for variable in tf.trainable_variables():
        total_parameters += np.product([x.value for x in variable.get_shape()])
    return total_parameters
def load_pickle(pickle_file):
    try:
        with open(pickle_file, 'rb') as f:
            pickle_data = pickle.load(f)
    except UnicodeDecodeError as e:
        with open(pickle_file, 'rb') as f:
            pickle_data = pickle.load(f, encoding='latin1')
    except Exception as e:
        print('Unable to load data ', pickle_file, ':', e)
        raise
    return pickle_data
# logger
def config_logging(log_dir, log_filename='info.log', level=logging.INFO):
    # Add file handler and stdout handler
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    # Create the log directory if necessary.
    try:
        os.makedirs(log_dir)
    except OSError:
        pass
    file_handler = logging.FileHandler(os.path.join(log_dir, log_filename))
    file_handler.setFormatter(formatter)
    file_handler.setLevel(level=level)
    # Add console handler.
    console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(console_formatter)
    console_handler.setLevel(level=level)
    logging.basicConfig(handlers=[file_handler, console_handler], level=level)
def get_logger(log_dir, name, log_filename='info.log', level=logging.INFO):
    logger = logging.getLogger(name)
    logger.setLevel(level)
    # Add file handler and stdout handler
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler = logging.FileHandler(os.path.join(log_dir, log_filename))
    file_handler.setFormatter(formatter)
    # Add console handler.
    console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(console_formatter)
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    # Add google cloud log handler
    logger.info('Log directory: %s', log_dir)
    return logger
# matrix transformation
def calculate_normalized_laplacian(adj):
    """
    # L = D^-1/2 (D-A) D^-1/2 = I - D^-1/2 A D^-1/2
    # D = diag(A 1)
    :param adj:
    :return:
    """
    adj = sp.coo_matrix(adj)
    d = np.array(adj.sum(1))
    d_inv_sqrt = np.power(d, -0.5).flatten()
    d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
    d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
    normalized_laplacian = sp.eye(adj.shape[0]) - adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo()
    return normalized_laplacian
def calculate_random_walk_matrix(adj_mx):
    adj_mx[adj_mx < 0] = 0
    adj_mx = sp.coo_matrix(adj_mx)
    d = np.array(adj_mx.sum(1))
    d_inv = np.power(d, -1).flatten()
    d_inv[np.isinf(d_inv)] = 0.
    d_mat_inv = sp.diags(d_inv)
    random_walk_mx = d_mat_inv.dot(adj_mx).tocoo()
    return random_walk_mx
def calculate_reverse_random_walk_matrix(adj_mx):
    return calculate_random_walk_matrix(np.transpose(adj_mx))
def calculate_scaled_laplacian(adj_mx, lambda_max=2, undirected=True):
    adj_mx[adj_mx < 0] = 0
    if undirected:
        adj_mx = np.maximum.reduce([adj_mx, adj_mx.T])
    L = calculate_normalized_laplacian(adj_mx)
    if lambda_max is None:
        lambda_max, _ = linalg.eigsh(L, 1, which='LM')
        lambda_max = lambda_max[0]
    L = sp.csr_matrix(L)
    M, _ = L.shape
    I = sp.identity(M, format='csr', dtype=L.dtype)
    L = (2 / lambda_max * L) - I
    return L.astype(np.float32)

In [None]:
# Classes of scaler
class MinMaxScaler:
    def __init__(self, M=None, ran=(0,1)):
        if M is not None:
            _M = M
            self.fit(_M)
    def fit(self, M):
        _M = M
        self._min = np.min(_M, axis=0)
        self._max = np.max(_M, axis=0)
        self._num_features = _M.shape[-1]
        assert len(self._min) == self._num_features
        assert len(self._max) == self._num_features
        assert np.all(self._max - self._min > 1e-6)
    def transform(self ,M):
        _M = M
        _M_scale = (_M - self._min) / (self._max - self._min)
        return _M_scale
    def inverse_transform(self, M):
        _M = M
        _M_scaleback = (self._min + _M * (self._max - self._min) ) 
        return _M_scaleback
class StandardScaler:
    def __init__(self, M=None):
        if M is not None:
            _M = M
            self.fit(_M)
    def fit(self, M):
        _M = M
        self._mean = np.mean(_M, axis=0)
        self._std = np.std(_M, axis=0)
        self._num_features = _M.shape[-1]
    def transform(self ,M):
        _M = M
        _M_scale = (_M - self._mean) / self._std
        return _M_scale
    def inverse_transform(self, M):
        _M = M
        _M_scaleback = (_M * self._std + self._mean) 
        return _M_scaleback

In [None]:
#Loss function
def masked_mae_loss_transform(T, scaler):
    T_transpose = tf.transpose(T, perm=[0, 1, 3, 2])
    T_inv = scaler.inverse_transform(T_transpose)
    T_transpose_back = tf.transpose(T_inv, perm=[0, 1, 3, 2])
    return T_transpose_back
def masked_mse_tf(preds, labels, null_val=np.nan):
    if np.isnan(null_val):
        mask = ~tf.is_nan(labels)
    else:
        mask = tf.not_equal(labels, null_val)
    mask = tf.cast(mask, tf.float32)
    mask /= tf.reduce_mean(mask)
    mask = tf.where(tf.is_nan(mask), tf.zeros_like(mask), mask)
    loss = tf.square(tf.subtract(preds, labels))
    loss = loss * mask
    loss = tf.where(tf.is_nan(loss), tf.zeros_like(loss), loss)
    return tf.reduce_mean(loss)
def masked_mae_tf(preds, labels, null_val=np.nan):
    if np.isnan(null_val):
        mask = ~tf.is_nan(labels)
    else:
        mask = tf.not_equal(labels, null_val)
    mask = tf.cast(mask, tf.float32)
    mask /= tf.reduce_mean(mask)
    mask = tf.where(tf.is_nan(mask), tf.zeros_like(mask), mask)
    loss = tf.abs(tf.subtract(preds, labels))
    loss = loss * mask
    loss = tf.where(tf.is_nan(loss), tf.zeros_like(loss), loss)
    return tf.reduce_mean(loss)
def masked_rmse_tf(preds, labels, null_val=np.nan):
    return tf.sqrt(masked_mse_tf(preds=preds, labels=labels, null_val=null_val))
def masked_rmse_np(preds, labels, null_val=np.nan):
    return np.sqrt(masked_mse_np(preds=preds, labels=labels, null_val=null_val))
def masked_mse_np(preds, labels, null_val=np.nan):
    with np.errstate(divide='ignore', invalid='ignore'):
        if np.isnan(null_val):
            mask = ~np.isnan(labels)
        else:
            mask = np.not_equal(labels, null_val)
        mask = mask.astype('float32')
        mask /= np.mean(mask)
        rmse = np.square(np.subtract(preds, labels)).astype('float32')
        rmse = np.nan_to_num(rmse * mask)
        return np.mean(rmse)
def masked_mae_np(preds, labels, null_val=np.nan):
    with np.errstate(divide='ignore', invalid='ignore'):
        if np.isnan(null_val):
            mask = ~np.isnan(labels)
        else:
            mask = np.not_equal(labels, null_val)
        mask = mask.astype('float32')
        mask /= np.mean(mask)
        mae = np.abs(np.subtract(preds, labels)).astype('float32')
        mae = np.nan_to_num(mae * mask)
        return np.mean(mae)
def masked_mape_np(preds, labels, null_val=np.nan):
    with np.errstate(divide='ignore', invalid='ignore'):
        if np.isnan(null_val):
            mask = ~np.isnan(labels)
        else:
            mask = np.not_equal(labels, null_val)
        mask = mask.astype('float32')
        mask /= np.mean(mask)
        mape = np.abs(np.divide(np.subtract(preds, labels).astype('float32'), labels))
        mape = np.nan_to_num(mask * mape)
        return np.mean(mape)

def masked_mse_loss(scaler, null_val):
    assert 0
    def loss(preds, labels):
        if scaler:
            preds = scaler.inverse_transform(preds)
            labels = scaler.inverse_transform(labels)
        return masked_mse_tf(preds=preds, labels=labels, null_val=null_val)

    return loss
def masked_rmse_loss(scaler, null_val):
    assert 0
    def loss(preds, labels):
        if scaler:
            preds = scaler.inverse_transform(preds)
            labels = scaler.inverse_transform(labels)
        return masked_rmse_tf(preds=preds, labels=labels, null_val=null_val)

    return loss
def masked_mae_loss(null_val, scalers=None, multiple=False):
    def loss(preds, labels):
        if scalers and multiple:
            num_scalers = len(scalers)
            preds_per_source = tf.split(value=preds, num_or_size_splits=multiple, axis=-2)
            labels_per_source = tf.split(value=labels, num_or_size_splits=multiple, axis=-2)
            pred_array, label_array = [], []
            for i, scaler in enumerate(scalers):
                # transpose axis 2(num_nodes) and axis 3(num_features_per_nodes) before inverse transform
                pred = masked_mae_loss_transform(preds_per_source[i], scaler)
                label = masked_mae_loss_transform(labels_per_source[i], scaler)
                pred_array.append(pred)
                label_array.append(label)
            preds = tf.concat(values=pred_array, axis=-2)
            labels = tf.concat(values=label_array, axis=-2)
#             preds = pred_array[0]
#             labels = label_array[0]
        mae = masked_mae_tf(preds=preds, labels=labels, null_val=null_val)
        return mae

    return loss

def calculate_metrics(df_pred, df_test, null_val):
    mape = masked_mape_np(preds=df_pred.as_matrix(), labels=df_test.as_matrix(), null_val=null_val)
    mae = masked_mae_np(preds=df_pred.as_matrix(), labels=df_test.as_matrix(), null_val=null_val)
    rmse = masked_rmse_np(preds=df_pred.as_matrix(), labels=df_test.as_matrix(), null_val=null_val)
    return mae, mape, rmse

In [None]:
def create_dir_input_output_if_not_exists(create=False, **configs):
    dp_input = configs.get('input_dir', './data')
    dp_output = configs.get('output_dir', './results')
    dp_input = os.path.join(dp_input, '')
    dp_output = os.path.join(dp_output, '')
    if create == True:
        create_dir_if_not_exists(dp_input, recursive=True)
        create_dir_if_not_exists(dp_output, recursive=True)
    return dp_input, dp_output
def create_dir_config_preprocessed_models_if_not_exists(create=False, **configs):
    lags = configs.get('lags', 3)
    horizon = configs.get('horizon', 3)
    scaler_type = configs.get('Standard', 'Standard')
    matrix_type = configs.get('matrix_type', 'Kernel')

    dp_output_config = dp_output + \
                os.path.join('config_{}_{}_{}_{}'.format(lags, horizon, scaler_type, matrix_type), '')
    if create == True:
        create_dir_if_not_exists(dp_output_config)

    dp_preprocessed = dp_output_config + \
                os.path.join('preprocessed', '')
    dp_models = dp_output_config + \
                os.path.join('models', '')
    if create == True:
        create_dir_if_not_exists(dp_preprocessed)
        create_dir_if_not_exists(dp_models)
    return dp_output_config, dp_preprocessed, dp_models

In [None]:
# Load Data
class DataLoader(object):
    def __init__(self, xs, ys, batch_size, pad_with_last_sample=True, shuffle=False):
        self.batch_size = batch_size
        self.current_ind = 0
        if pad_with_last_sample:
            num_padding = (batch_size - (len(xs) % batch_size)) % batch_size
            x_padding = np.repeat(xs[-1:], num_padding, axis=0)
            y_padding = np.repeat(ys[-1:], num_padding, axis=0)
            xs = np.concatenate([xs, x_padding], axis=0)
            ys = np.concatenate([ys, y_padding], axis=0)
        self.size = len(xs)
        self.num_batch = int(self.size // self.batch_size)
        if shuffle:
            permutation = np.random.permutation(self.size)
            xs, ys = xs[permutation], ys[permutation]
        self.xs = xs
        self.ys = ys
    def get_iterator(self):
        self.current_ind = 0

        def _wrapper():
            while self.current_ind < self.num_batch:
                start_ind = self.batch_size * self.current_ind
                end_ind = min(self.size, self.batch_size * (self.current_ind + 1))
                x_i = self.xs[start_ind: end_ind, ...]
                y_i = self.ys[start_ind: end_ind, ...]
                yield (x_i, y_i)
                self.current_ind += 1

        return _wrapper()
def get_dp_preprocessed(**config):
    lags = configs.get('lags', 3)
    horizon = configs.get('horizon', 3)
    scaler_type = configs.get('Standard', 'Standard')
    matrix_type = configs.get('matrix_type', 'Kernel')
    dp_input, dp_output = create_dir_input_output_if_not_exists(**configs)
    dp_output_config = dp_output + \
            os.path.join('config_{}_{}_{}_{}'.format(lags, horizon, scaler_type, matrix_type), '')
    dp_preprocessed = dp_output_config + \
            os.path.join('preprocessed', '')
    return dp_preprocessed
def load_pickle(pickle_file):
    try:
        with open(pickle_file, 'rb') as f:
            pickle_data = pickle.load(f)
    except UnicodeDecodeError as e:
        with open(pickle_file, 'rb') as f:
            pickle_data = pickle.load(f, encoding='latin1')
    except Exception as e:
        print('Unable to load data ', pickle_file, ':', e)
        raise
    return pickle_data
def load_matrix(**configs):
    matrixs, id_map_idxs = [], []
    dp_preprocessed = get_dp_preprocessed(**configs)
    signals = configs.get('signals', None)
    for signal in signals:
        dp_signal = dp_preprocessed + \
                os.path.join(signal, '')
        fp_matrix = dp_signal + 'matrix.pkl'
        matrix, id_map_idx = load_pickle(fp_matrix)
        matrixs.append(matrix)
        id_map_idxs.append(id_map_idx)
    return matrixs, id_map_idxs
def load_scaler(fp, scaler_type):
    scaler = load_pickle(fp)
    if scaler_type == 'MinMax':
        scaler._min = scaler._min.values
        scaler._max = scaler._max.values
    if scaler_type == 'Standard':
        scaler._mean = scaler._mean.values
        scaler._std = scaler._std.values
    return scaler
def load_signals(**configs):
    batch_size = configs.get('batch_size', 64)
    test_batch_size = configs.get('test_batch_size', 64)
    signals = configs.get('signals', None)
    lags = configs.get('lags', 3)
    horizon = configs.get('horizon', 3)
    scaler_type = configs.get('scaler', 'Standard')
    matrix_type = configs.get('matrix_type', 'Kernel')
    
    data_heter = {}
    for category in ['train', 'val', 'test']:
        data_heter['x_' + category] = []
        data_heter['y_' + category] = []
        
    scalers, datas, l_num_nodes = [], [], []
    
    for signal in signals:
        data = {}
        dp_preprocessed = get_dp_preprocessed(**configs)
        dp_signal = dp_preprocessed + \
                os.path.join(signal, '')
        fp_scaler = dp_signal + \
                '{}.pkl'.format(scaler_type)
        scaler = load_scaler(fp_scaler, scaler_type)
        for category in ['train', 'val', 'test']:
            fp_cat = dp_signal + \
                '{}.npz'.format(category)
            data_cat = np.load(fp_cat)
            data['x_' + category] = data_cat['x']
            data['y_' + category] = data_cat['y']
            data_heter['x_' + category].append(data['x_' + category])
            data_heter['y_' + category].append(data['y_' + category])
        
        scalers.append(scaler)
        datas.append(data)
        l_num_nodes.append(data_cat['x'].shape[-2])


    for category in ['train', 'val', 'test']:    
        data_heter['x_' + category] = np.concatenate(data_heter['x_' + category], axis=-2)
        data_heter['y_' + category] = np.concatenate(data_heter['y_' + category], axis=-2)

        
    data_heter['train_loader'] = DataLoader(data_heter['x_train'], data_heter['y_train'], batch_size, shuffle=True)
    data_heter['val_loader'] = DataLoader(data_heter['x_val'], data_heter['y_val'], test_batch_size, shuffle=False)
    data_heter['test_loader'] = DataLoader(data_heter['x_test'], data_heter['y_test'], test_batch_size, shuffle=False)
    datas.append(data_heter)
    return datas, scalers, l_num_nodes
def load_config(fp_configs):
    assert os.path.isfile(fp_configs), '{} is not a regular file'.format(fp_configs)
    with open(fp_configs) as f:
        configs = yaml.load(f, Loader=yaml.FullLoader)
    return configs

In [None]:
class MGCell_Heter(RNNCell):
    def call(self, inputs, **kwargs):
        pass
    def compute_output_shape(self, input_shape):
        pass
    def __init__(self, num_units, matrixs, l_num_nodes, gc_step=2, num_proj=None,
                 activation=tf.nn.tanh, reuse=None, filter_type="random_walk", use_gc_for_ru=True):
        super(MGCell_Heter, self).__init__(_reuse=reuse)
        self._num_units = num_units
        self.l_num_nodes = l_num_nodes
        self._num_all_nodes = sum(l_num_nodes)
        self._num_signals = len(l_num_nodes)
        self._num_proj = num_proj
        self._gc_step = gc_step
        self._activation = activation
        self._output_sizes = [x * self._num_units for x in self.l_num_nodes]
        self._supports = []
        self._use_gc_for_ru = use_gc_for_ru
        for i, adj in enumerate(matrixs):
            adj = adj.astype('float32')
            supports = []
            new_supports = []
            if filter_type == "laplacian":
                supports.append(calculate_scaled_laplacian(adj, lambda_max=None))
            elif filter_type == "random_walk":
                supports.append(calculate_random_walk_matrix(adj).T)
            elif filter_type == "dual_random_walk":
                supports.append(calculate_random_walk_matrix(adj).T)
                supports.append(calculate_random_walk_matrix(adj.T).T)
            else:
                supports.append(calculate_scaled_laplacian(adj))
            for support in supports:
                new_supports.append(self._build_sparse_matrix(support))
            self._supports.append(new_supports)
        self.interS_weights = []
        self.interS_bias = []
        with tf.variable_scope("MG_interS_Weight", reuse=tf.AUTO_REUSE):
            for i in range(self._num_signals):
                dtype = 'float32'
                rmself_state_shape = self._num_all_nodes - self.l_num_nodes[i]
                S_state_shape = self.l_num_nodes[i]
                interS_weights = tf.get_variable('interS_weights_{}'.format(i),
                    shape=[rmself_state_shape, S_state_shape],
                    dtype=dtype,
                    initializer=tf.contrib.layers.xavier_initializer()
                )
                interS_bias = tf.get_variable('self_bias_{}'.format(i),
                    shape=S_state_shape,
                    dtype=dtype,
                    initializer=tf.contrib.layers.xavier_initializer()
                )
                self.interS_weights.append(interS_weights)
                self.interS_bias.append(interS_bias)

        with tf.variable_scope("projection", reuse=tf.AUTO_REUSE):
            self.proj_w = tf.get_variable('w',
                    shape=[self._num_units, 1],
                    dtype=dtype,
                    initializer=tf.contrib.layers.xavier_initializer()
            )
    @staticmethod
    def _build_sparse_matrix(L):
        L = L.tocoo()
        indices = np.column_stack((L.row, L.col))
        L = tf.SparseTensor(indices, L.data, L.shape)
        return tf.sparse_reorder(L)
    @property
    def state_size(self):
        state_size = self._num_all_nodes * self._num_units
        return state_size
    @property
    def output_size(self):
        output_size = self._num_all_nodes * self._num_units
        if self._num_proj is not None:
            output_size = self._num_all_nodes * self._num_proj
        return output_size

    def __call__(self, inputs, state, scope=None):
        if inputs.get_shape() == state.get_shape():
            split_inputs = tf.split(inputs, [x * self._num_units for x in self.l_num_nodes], axis=-1)
        else:
            split_inputs = tf.split(inputs, self.l_num_nodes, axis=-1)
        split_state = tf.split(state, [x * self._num_units for x in self.l_num_nodes], axis=-1)
        with tf.variable_scope(scope or "MG_He_cell"):
            with tf.variable_scope("gates"):  # Reset gate and update gate.
                output_size = [2 * self._num_units] * self._num_signals
                fn = self._gconv
                output_split = []
                for i in range(self._num_signals):
                    inputs_i = split_inputs[i]
                    state_i = split_state[i]
                    output_size_i = output_size[i]
                    
                    value = tf.nn.sigmoid(fn(inputs_i, state_i, output_size_i, bias_start=1.0, si=i))
                    value = tf.reshape(value, (-1, self.l_num_nodes[i], output_size_i))
                    r, u = tf.split(value=value, num_or_size_splits=2, axis=-1)
                    r = tf.reshape(r, (-1, self.l_num_nodes[i] * self._num_units))
                    u = tf.reshape(u, (-1, self.l_num_nodes[i] * self._num_units))
                    with tf.variable_scope("candidate"):
                        c = self._gconv(inputs_i, r * state_i, self._num_units, si=i)
                        if self._activation is not None:
                            c = self._activation(c)
                    
                    output_i = new_state_i = u * state_i + (1 - u) * c
                    output_split.append(output_i)
                    
            outputs = []
            for i in range(self._num_signals):
                curS = output_split[i]
                rmself_output_split = output_split[:i] + output_split[i+1:]
                rmself_output = tf.concat(rmself_output_split, axis=-1)
                
                batch_size = rmself_output.shape[0]
                rmself_reshape = tf.reshape(rmself_output, shape=[batch_size, -1, self._num_units])
                rmself_proj = tf.matmul(rmself_reshape, self.proj_w)
                rmself_proj_squeeze = tf.squeeze(rmself_proj)
                toCurS = tf.matmul(rmself_proj_squeeze, self.interS_weights[i])
                toCurS = tf.nn.bias_add(toCurS, self.interS_bias[i])
                #toCurS = tf.nn.softmax(toCurS)
                toCurS = tf.nn.sigmoid(toCurS)
                toCurS_expand = tf.expand_dims(toCurS, axis=-1)
                toCurS_expand_invproj = tf.matmul(toCurS_expand, tf.transpose(self.proj_w))
                toCurS_invshape = tf.reshape(toCurS_expand_invproj, shape=[batch_size, -1])

                T1 = curS
                # T1 = tf.math.multiply(self.S_weights[i], T1)
                # T1 = tf.nn.bias_add(T1, self.S_bias[i])
                T2 = toCurS_invshape
                T = tf.add(T1, T2)
                T = T1
                outputs.append(T)
                
            output = new_state = tf.concat(outputs, axis=-1)
            
            if self._num_proj is not None:
                w = self.proj_w
                batch_size = inputs.get_shape()[0].value
                output = tf.reshape(new_state, shape=(-1, self._num_units))
                output = tf.reshape(tf.matmul(output, w), shape=(batch_size, -1))
                #output = tf.nn.relu(output)
        return output, new_state

    @staticmethod
    def _concat(x, x_):
        x_ = tf.expand_dims(x_, 0)
        return tf.concat([x, x_], axis=0)
    def _gconv(self, inputs, state, output_size, bias_start=0.0, si=-1):
        assert si != -1
        batch_size = inputs.get_shape()[0].value
        inputs = tf.reshape(inputs, (batch_size, self.l_num_nodes[si], -1))
        state = tf.reshape(state, (batch_size, self.l_num_nodes[si], -1))
        inputs_and_state = tf.concat([inputs, state], axis=-1)
        input_size = inputs_and_state.get_shape()[-1].value
        dtype = inputs.dtype

        x = inputs_and_state
        x0 = tf.transpose(x, perm=[1, 2, 0])  # (num_nodes, total_arg_size, batch_size)
        x0 = tf.reshape(x0, shape=[self.l_num_nodes[si], input_size * batch_size])
        x = tf.expand_dims(x0, axis=0)
        
        scope = tf.get_variable_scope()
        with tf.variable_scope(scope):
            if self._gc_step == 0:
                pass
            else:
                for support in self._supports[si]:
                    x1 = tf.sparse_tensor_dense_matmul(support, x0)
                    x = self._concat(x, x1)

                    for k in range(2, self._gc_step + 1):
                        x2 = 2 * tf.sparse_tensor_dense_matmul(support, x1) - x0
                        x = self._concat(x, x2)
                        x1, x0 = x2, x1

            num_matrices = len(self._supports[si]) * self._gc_step + 1  # Adds for x itself.
            x = tf.reshape(x, shape=[num_matrices, self.l_num_nodes[si], input_size, batch_size])
            x = tf.transpose(x, perm=[3, 1, 2, 0])  # (batch_size, num_nodes, input_size, order)
            x = tf.reshape(x, shape=[batch_size * self.l_num_nodes[si], input_size * num_matrices])
            
            weights = tf.get_variable(
                'weights_{}'.format(si), [input_size * num_matrices, output_size], dtype=dtype,
                initializer=tf.contrib.layers.xavier_initializer())
            x = tf.matmul(x, weights)  # (batch_size * self._num_nodes, output_size)
            biases = tf.get_variable("bias_{}".format(si), [output_size], dtype=dtype,
                                     initializer=tf.constant_initializer(bias_start, dtype=dtype))
            x = tf.nn.bias_add(x, biases)
        return tf.reshape(x, [batch_size, self.l_num_nodes[si] * output_size])

In [None]:
class MGModel(object):
    def __init__(self, matrixs, is_training, batch_size, **configs):
        self._loss = None
        self._mae = None
        self._train_op = None
        cl_decay_steps = int(configs.get('cl_decay_steps', 1000))
        filter_type = configs.get('filter_type', 'random_walk')
        gc_step = int(configs.get('gc_step', 2))
        horizon = int(configs.get('horizon', 3))
        lags = int(configs.get('lags', 3))
        num_rnn_layers = int(configs.get('num_rnn_layers', 2))
        num_units = int(configs.get('num_rnn_units', 32))
        signals = configs.get('signals', None)
        use_curriculum_learning = bool(configs.get('use_curriculum_learning', False))
        
        input_dim = int(configs.get('input_dim', 1))
        output_dim = int(configs.get('output_dim', 1))
        l_num_nodes = configs.get('l_num_nodes')
        num_signals = len(signals)
        
        num_all_nodes = sum(l_num_nodes)
        
        # Input (batch_size, timesteps, num_all_nodes, input_dim)
        self._inputs = tf.placeholder(tf.float32, shape=(batch_size, lags, num_all_nodes, input_dim), name='inputs')
        # Labels: (batch_size, timesteps, num_sensor, input_dim), same format with input except the temporal dimension.
        self._labels = tf.placeholder(tf.float32, shape=(batch_size, horizon, num_all_nodes, output_dim), name='labels')
        
        cell = MGCell_Heter(num_units=num_units, matrixs=matrixs, l_num_nodes=l_num_nodes,
                            gc_step=2, filter_type=filter_type)
        proj_cell = MGCell_Heter(num_units=num_units, matrixs=matrixs, l_num_nodes=l_num_nodes,
                            gc_step=2, filter_type=filter_type, num_proj=1)
        
        GO_SYMBOL = tf.zeros(shape=(batch_size, num_all_nodes * output_dim))

        encoding_cells = [cell] * num_rnn_layers
        decoding_cells = [cell] * (num_rnn_layers - 1) + [proj_cell]
        encoding_cells = tf.contrib.rnn.MultiRNNCell(encoding_cells, state_is_tuple=True)
        decoding_cells = tf.contrib.rnn.MultiRNNCell(decoding_cells, state_is_tuple=True)

        global_step = tf.train.get_or_create_global_step()
        # Outputs: (batch_size, timesteps, num_nodes, output_dim)
        with tf.variable_scope('MG_SEQ'):
            inputs = tf.unstack(tf.reshape(self._inputs, (batch_size, lags, num_all_nodes * input_dim)), axis=1)
            labels = tf.unstack(
                tf.reshape(self._labels[..., :output_dim], (batch_size, horizon, num_all_nodes * output_dim)), axis=1)
            labels.insert(0, GO_SYMBOL)

            def _loop_function(prev, i):
                if is_training:
                    # Return either the model's prediction or the previous ground truth in training.
                    if use_curriculum_learning:
                        c = tf.random_uniform((), minval=0, maxval=1.)
                        threshold = self._compute_sampling_threshold(global_step, cl_decay_steps)
                        result = tf.cond(tf.less(c, threshold), lambda: labels[i], lambda: prev)
                    else:
                        result = labels[i]
                else:
                    # Return the prediction of the model in testing.
                    result = prev
                return result

            _, enc_state = tf.contrib.rnn.static_rnn(encoding_cells, inputs, dtype=tf.float32)
            outputs, final_state = legacy_seq2seq.rnn_decoder(labels, enc_state, decoding_cells,
                                                              loop_function=_loop_function)

        # Project the output to output_dim.
        outputs = tf.stack(outputs[:horizon], axis=1)
        self._outputs = tf.reshape(outputs, (batch_size, horizon, num_all_nodes, output_dim), name='outputs')
        self._merged = tf.summary.merge_all()

    @staticmethod
    def _compute_sampling_threshold(global_step, k):
        return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32)
    @property
    def inputs(self):
        return self._inputs
    @property
    def labels(self):
        return self._labels
    @property
    def loss(self):
        return self._loss
    @property
    def mae(self):
        return self._mae
    @property
    def merged(self):
        return self._merged
    @property
    def outputs(self):
        return self._outputs

In [None]:
class MGSupervisor(object):
    def __init__(self, writer, **configs):
        self._configs = configs
        self._train_configs = configs.get('train')
        self._signals = configs.get('signals')
        # logging.
        self._log_dir = self._get_log_dir(configs)
        log_level = self._configs.get('log_level', 'INFO')
        self._logger = get_logger(self._log_dir, __name__, 'info.log', level=log_level)
        self._writer = writer
        
        # Data preparation
        self._matrixs, self._id_to_idxs = load_matrix(**configs)
        self._datas, self._scalers, self._l_num_nodes = load_signals(**configs)
        self._configs.update({'l_num_nodes': self._l_num_nodes})

        merged_summary = tf.summary.merge_all()
        
        train_batch_size = configs.get('train_batch_size')
        test_batch_size  = configs.get('test_batch_size')
        with tf.name_scope('Train'):
            with tf.variable_scope('MG', reuse=False):
                self._train_model = MGModel(is_training=True, matrixs=self._matrixs,
                                              batch_size=train_batch_size, **self._configs)
            with tf.variable_scope('MG', reuse=True):
                self._test_model = MGModel(is_training=False, matrixs=self._matrixs,
                                              batch_size=test_batch_size, **self._configs)

        # Learning rate.
        self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(0.01),
                                   trainable=False)
        self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate')
        self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update')

        # Configure optimizer
        optimizer_name = self._train_configs.get('optimizer', 'adam').lower()
        epsilon = float(self._train_configs.get('epsilon', 1e-3))
        optimizer = tf.train.AdamOptimizer(self._lr, epsilon=epsilon)
        if optimizer_name == 'sgd':
            optimizer = tf.train.GradientDescentOptimizer(self._lr, )
        elif optimizer_name == 'amsgrad':
            optimizer = AMSGrad(self._lr, epsilon=epsilon)

        # Calculate loss
        output_dim = configs.get('output_dim')
        preds = self._train_model.outputs
        labels = self._train_model.labels[..., :output_dim]

        null_val = 0.
        self._loss_fn = masked_mae_loss(null_val=null_val, scalers=self._scalers, multiple=self._l_num_nodes)
        self._train_loss = self._loss_fn(preds=preds, labels=labels)

        tvars = tf.trainable_variables()
        grads = tf.gradients(self._train_loss, tvars)
        max_grad_norm = configs['train'].get('max_grad_norm', 1.)
        grads, _ = tf.clip_by_global_norm(grads, max_grad_norm)
        global_step = tf.train.get_or_create_global_step()
        self._train_op = optimizer.apply_gradients(zip(grads, tvars), global_step=global_step, name='train_op')

        max_to_keep = self._train_configs.get('max_to_keep', 100)
        self._epoch = 0
        self._saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep)

        # logging.
        total_trainable_parameter = get_total_trainable_parameter_size()
        self._logger.info('Total number of trainable parameters: {:d}'.format(total_trainable_parameter))
        for var in tf.global_variables():
            self._logger.debug('{}, {}'.format(var.name, var.get_shape()))
    def run_epoch_generator(self, sess, model, data_generator, return_output=False, training=False, writer=None, loader=None):
        losses = []
        maes = []
        outputs = []
        output_dim = self._configs.get('output_dim')
        preds = model.outputs
        labels = model.labels[..., :output_dim]
        loss = self._loss_fn(preds=preds, labels=labels)
        fetches = {
            'loss': loss,
            'mae': loss,
            'global_step': tf.train.get_or_create_global_step()
        }
        if training:
            fetches.update({
                'train_op': self._train_op
            })
            merged = model.merged
            if merged is not None:
                fetches.update({'merged': merged})

        if return_output:
            fetches.update({
                'outputs': model.outputs
            })

        for _, (x, y) in enumerate(data_generator):
            feed_dict = {
                model.inputs: x,
                model.labels: y,
            }

            vals = sess.run(fetches, feed_dict=feed_dict)

            losses.append(vals['loss'])
            maes.append(vals['mae'])
            
            if writer is not None and 'merged' in vals:
                writer.add_summary(vals['merged'], global_step=vals['global_step'])
            if return_output:
                outputs.append(vals['outputs'])

        results = {
            'loss': np.mean(losses),
            'mae': np.mean(maes)
        }
        if return_output:
            results['outputs'] = outputs
        return results

    def train(self, sess, **configs):
        configs.update(self._train_configs)
        return self._train(sess, **configs)
    def _train(self, sess, base_lr, epoch, steps, patience=10, epochs=40,
               min_learning_rate=2e-6, lr_decay_ratio=0.1, save_model=1,
               test_every_n_epochs=10, **train_configs):
        history = []
        min_val_loss = float('inf')
        wait = 0
        max_to_keep = train_configs.get('max_to_keep', 100)
        saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep)
        model_filename = train_configs.get('model_filename')
        if model_filename is not None:
            saver.restore(sess, model_filename)
            self._epoch = epoch + 1
        else:
            sess.run(tf.global_variables_initializer())
        self._logger.info('Start training ...')

        # Force epoch to be zero
        # self._epoch = 0
        while self._epoch <= epochs:
            # Learning rate schedule.
            new_lr = max(min_learning_rate, base_lr * (lr_decay_ratio ** np.sum(self._epoch >= np.array(steps))))
            self.set_lr(sess=sess, lr=new_lr)

            start_time = time.time()
            train_results = self.run_epoch_generator(sess, self._train_model,
                                                     self._datas[-1]['train_loader'].get_iterator(),
                                                     training=True,
                                                     writer=self._writer)

            train_loss, train_mae = train_results['loss'], train_results['mae']
            if train_loss > 1e5:
                self._logger.warning('Gradient explosion detected. Ending...')
                break

            global_step = sess.run(tf.train.get_or_create_global_step())
            # Compute validation error.
            val_results = self.run_epoch_generator(sess, self._test_model,
                                                   self._datas[-1]['val_loader'].get_iterator(),
                                                   training=False)
            val_loss, val_mae = val_results['loss'], val_results['mae']
            add_simple_summary(self._writer,
                                     ['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'],
                                     [train_loss, train_mae, val_loss, val_mae], global_step=global_step)
            end_time = time.time()
            message = 'Epoch [{}/{}] ({}) train_mae: {:.4f}, val_mae: {:.4f} lr:{:.6f} {:.1f}s'.format(
                self._epoch, epochs, global_step, train_mae, val_mae, new_lr, (end_time - start_time))
            self._logger.info(message)
            if self._epoch % test_every_n_epochs == test_every_n_epochs - 1:
                outputs = self.evaluate(sess)
                self.outputs = outputs
                mobile_pred_1 = outputs['predictions'][0][0].squeeze()
                mobile_grou_1 = outputs['groundtruth'][0][0].squeeze()
                temp_pred_1 = outputs['predictions'][1][0].squeeze()
                temp_grou_1 = outputs['groundtruth'][1][0].squeeze()
                plot_my_pred_ground(mobile_pred_1, mobile_grou_1)
                # plot_my_pred_ground(temp_pred_1, temp_grou_1, num_plots=3)
            if val_loss <= min_val_loss:
                wait = 0
                if save_model > 0:
                    model_filename = self.save(sess, val_loss)
                self._logger.info(
                    'Val loss decrease from %.4f to %.4f, saving to %s' % (min_val_loss, val_loss, model_filename))
                min_val_loss = val_loss
            else:
                wait += 1
                if wait > patience:
                    self._logger.warning('Early stopping at epoch: %d' % self._epoch)
                    break

            history.append(val_mae)
            self._epoch += 1

            sys.stdout.flush()
        return np.min(history)

    def evaluate(self, sess, **configs):
        global_step = sess.run(tf.train.get_or_create_global_step())
        test_results = self.run_epoch_generator(sess, self._test_model,
                                                self._datas[-1]['test_loader'].get_iterator(),
                                                return_output=True,
                                                training=False)

        # y_preds:  a list of (batch_size, horizon, num_nodes, output_dim)
        test_loss, y_preds = test_results['loss'], test_results['outputs']
        add_simple_summary(self._writer, ['loss/test_loss'], [test_loss], global_step=global_step)

        y_preds = np.concatenate(y_preds, axis=0)
        predictions_conc = []
        y_truths_conc = []
        
        start_idx = 0
        for i, signal in enumerate(self._signals):
            predictions = []
            y_truths = []
            
            self._logger.info("Graph: {}".format(signal))
            num_nodes = self._l_num_nodes[i]

            end_idx = start_idx + num_nodes
            for horizon_i in range(self._datas[-1]['y_test'].shape[1]):

                y_truth = self._datas[i]['y_test'][:, horizon_i, :, :]
                y_pred = y_preds[:y_truth.shape[0], horizon_i, start_idx:end_idx, :]

                if len(self._scalers) > 0:
                    Shape = y_truth.shape
                    y_truth = y_truth.squeeze()
                    y_truth = self._scalers[i].inverse_transform(y_truth)
                    y_truth = np.reshape(y_truth, Shape)
                    Shape = y_pred.shape
                    y_pred = y_pred.squeeze()
                    y_pred = self._scalers[i].inverse_transform(y_pred)
                    y_pred = np.reshape(y_pred, Shape)

                y_truths.append(y_truth)
                predictions.append(y_pred)
                
                mae = masked_mae_np(y_pred, y_truth, null_val=0)
                rmse = masked_rmse_np(y_pred, y_truth, null_val=0)
                self._logger.info(
                    "Horizon {:02d}, MAE: {:.2f}, RMSE: {:.2f}, MAPE: {:.4f}".format(
                        horizon_i + 1, mae, rmse, mape
                    )
                )
                add_simple_summary(self._writer,
                                         ['%s_%d' % (item, horizon_i + 1) for item in
                                          ['metric/rmse', 'metric/mape', 'metric/mae']],
                                         [rmse, mape, mae],
                                         global_step=global_step)
            start_idx = end_idx
            predictions_conc.append(predictions)
            y_truths_conc.append(y_truths)
        outputs = {
            'predictions': predictions_conc,
            'groundtruth': y_truths_conc
        }
        
        return outputs

    def load(self, sess, model_filename):
        print("resoring from saved model")
        self._saver.restore(sess, model_filename)
    def save(self, sess, val_loss):
        configs = dict(self._configs)
        global_step = np.asscalar(sess.run(tf.train.get_or_create_global_step()))
        prefix = os.path.join(self._log_dir, 'models-{:.4f}'.format(val_loss))
        configs['train']['epoch'] = self._epoch
        configs['train']['global_step'] = global_step
        configs['train']['log_dir'] = self._log_dir
        configs['train']['model_filename'] = self._saver.save(sess, prefix, global_step=global_step,
                                                             write_meta_graph=False)
        fn_configs = 'config_{}.yaml'.format(self._epoch)
        with open(os.path.join(self._log_dir, fn_configs), 'w') as f:
            yaml.dump(configs, f, default_flow_style=False)
        return configs['train']['model_filename']
    def get_lr(self, sess):
        return np.asscalar(sess.run(self._lr))
    def set_lr(self, sess, lr):
        sess.run(self._lr_update, feed_dict={
            self._new_lr: lr
        })
    @staticmethod
    def _get_log_dir(configs):
        log_dir = configs['train'].get('log_dir')
        if log_dir is None:
            batch_size = configs.get('train_batch_size')
            gc_step = configs.get('gc_step')
            learning_rate = configs['train'].get('base_lr')
            num_rnn_layers = configs.get('num_rnn_layers')
            num_rnn_units = configs.get('num_rnn_units')
            structure = '-'.join(
                ['%d' % num_rnn_units for _ in range(num_rnn_layers)])
            
            filter_type = configs.get('filter_type')
            filter_type_abbr = 'L'
            if filter_type == 'random_walk':
                filter_type_abbr = 'R'
            elif filter_type == 'dual_random_walk':
                filter_type_abbr = 'DR'
            run_id = 'MG_%s_%d_%s_lr_%g_bs_%d_%s/' % (
                filter_type_abbr, gc_step,
                structure, learning_rate, batch_size,
                time.strftime('%m%d%H%M%S'))
            
            lags = configs.get('lags', 3)
            horizon = configs.get('horizon', 3)
            scaler_type = configs.get('Standard', 'Standard')
            matrix_type = configs.get('matrix_type', 'Kernel')
            dp_output = configs.get('output_dir')
            dp_output = os.path.join(dp_output, '')
            dp_output_config = dp_output + \
                    os.path.join('config_{}_{}_{}_{}'.format(lags, horizon, scaler_type, matrix_type), 'models')
            log_dir = os.path.join(dp_output_config, run_id)
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        return log_dir

In [None]:
arguments = {
    "fp_configs": './config.yaml',
    "only_cpu" : True,
}
tf_config = tf.ConfigProto()
if arguments["only_cpu"] == False:
    tf_config = tf.ConfigProto(device_count={'GPU': 0})
    tf_config.gpu_options.allow_growth = True
configs = load_config(arguments['fp_configs'])

In [None]:
logging.shutdown()
reload(logging)
tf.reset_default_graph()
sess = tf.Session(config=tf_config)
writer = tf.summary.FileWriter("/tmp/MG_graph/2")
supervisor = MGSupervisor(writer=writer, **configs)
supervisor.train(sess=sess)
outputs = supervisor.evaluate(sess=sess)