In [1]:
import numpy as np
import functools
from pathlib import Path
import tensorflow as tf
from tensorflow.contrib import predictor
import time
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
class pro_data():
    
    # generate expariment data with the shape [10000,2,10] for training features;
    # with the shape [10000,1] for training label;
    # with the shape [1000,2,10] for test features
    # with the shape [1000,1] for test label
    
    def __init__(self,data_dict):
        
        self.ts = data_dict['ts']
        self.pred_n = data_dict['pred_n']
        self.n_train = data_dict['n_train']
        self.n_test = data_dict['n_test']
        self.d_t = data_dict['d_t']
        self.time_steps = data_dict['time_steps']
        self.ratio = data_dict['ratio']
        self.file_name = data_dict['file_name']
        
    def detrend(self,order):
    
        data = pd.read_csv(self.file_name)
        x = df.index.values
        y = df.iloc[:,-1].values
        poly = np.polyfit(x, y, deg = order)

        return poly

    def addtrend(self,x_dex,poly):
        # x_dex: np.array. index of predict value

        y = np.ployval(poly,x_dex)
        return y

    def norm_op(self,data,op,norm_opt):
    
        '''This  funtion is to apply normalization for inputdata, and return new data
        Input:
                  data: input data with the shape [N,1] for label, and [time,features] for features
                  norm_opt: normalize type option, here have 3 different type 'norm','max_min','max_abs' now
                  op: scalar, 1 for labels and 0 for features

        Output:
                 new_data: new data after normalization have the same shape with input data
                 if op == 1, also return k and b for recover use'''
        
        # for features data  
        if op == 0:
            if norm_opt == 'max_min':
                new_data = (data-data.min(axis=0))/(data.max(axis=0)-data.min(axis=0)) 
            elif norm_opt == 'norm':
                new_data = (data-data.mean(axis=0))/(np.std(data,axis=0))
            elif norm_opt == 'max_abs':
                new_data = data/np.max(np.abs(data),axis=0)

            return new_data

        elif op == 1: # for labels data

            if norm_opt == 'norm':  # use norm distribution and so-called normalization
                b, k = np.mean(data), np.std(data)
            elif norm_opt == 'max_min': # makes data range in the interval [-1 1 ] with substrct the minimum value
                b, k = np.min(data), (np.max(data)-np.min(data))
            elif norm_opt == 'max_abs': # makes data range in the intreval [0 1] with divide the absolute maximum valiue
                b, k = 0,np.max(np.abs(data))
            
            return (data - b)/k,k,b
    
    

    def read_file(self):

        '''function is to get input from csv file(file_name),and we demand input file with
         features each colunme and the time steps each rank. then we re-form the time
         -series with time step length 'time_steps'

         input:
                    file_name: name of input csv file
                    tine_steps: time window's length
                    ratio: ratio for train and test'''
        # here if data is too big, we can consider use generator,and when we use tf,
        # can use with tf.data.from_generator
    
        data = pd.read_csv(self.file_name) # column 1 to -2 features; column -1 for labels
        
        # if have date-information we use
        #data = np.array(data.iloc[:,1:])
        
        # if have no data infornation in the file, we use:
        data = np.array(data)
        
        data_fea = self.norm_op(data[:,:-1],0,'max_abs')        #  input feature data with normlization
        data_lab,k,b = self.norm_op(data[:,-1],1,'max_abs')   # inout label data witg normlization
        
        features = []
        labels = []
        
        for i in range(data_fea.shape[0]- self.time_steps-self.pred_n+1):
            features.append(data_fea[i:i+ self.time_steps,:])
            labels.append(data_lab[i+ self.time_steps:i+self.time_steps+self.pred_n]) 

        features= np.array(features,dtype=np.float32).transpose((0,2,1))
        labels = np.array(labels,dtype=np.float32)
        labels = labels.reshape(labels.shape[0],self.pred_n)
        
        dex = round(data.shape[0]* self.ratio)

        # return train_X train_y test_X test_y k b
        return features[0:dex,:,:], labels[0:dex,:], features[dex:,:,:], labels[dex:,:],k,b
    
    
    def de_norm(self,data):
        '''
        de-normaliz of data, for prediction use
        Input:
            data: data will be recover'
            para: [2,1] list for k and b
        '''
        return data*self.k+self.b
    
    def exp_data(self,seq):

        '''this function is to generate features and labels of training and test input data, 
            and only for experimental use. Here the input is index of time series, and we
            construct the features and labels with following formula:

            features = [cos(seq),sin(seq)]
            labels = [sin(seq)*cos(seq)]

            with shape [sample_number,features_number,time_steps] for features and
            [sample_number] for labels

            And finally return features and labels'''


        features, labels = [], []
        for i in range(len(seq)-self.ts-self.pred_n+1):
            features.append([np.cos(seq[i: i + self.ts]),np.sin(seq[i: i + self.ts])])
            labels.append([np.cos(seq[i+self.ts:i+self.ts+self.pred_n])*np.sin(seq[i+self.ts:i+self.ts+self.pred_n])])

        return np.array(features, dtype=np.float32), np.array(labels, dtype=np.float32) 
    

    def new_data(self):
        
        start = (self.n_train + self.ts) * self.d_t            # start value of input series
        end = start + (self.n_test + self.ts) * self.d_t    # end value of input series
        train_X, train_y = self.exp_data(np.linspace(0, start, self.n_train + self.ts, dtype=np.float32))
        test_X, test_y = self.exp_data(np.linspace(start, end, self.n_test + self.ts, dtype=np.float32))
        b,k = 0,1
        
        test_y = test_y.reshape(test_y.shape[0],test_y.shape[2])
        train_y = train_y.reshape(train_y.shape[0],train_y.shape[2])
        return train_X,train_y,test_X,test_y,k,b

In [None]:
class get_model():
    
    def __init__(self,para):
        
        self.hidden_size = para['hidden_size']
        self.num_layers = para['num_layers']
        self.batch_size = para['batch_size']
        self.epoch = para['epoch']
        self.learning_rate = para['learning_rate']
        self.shuffle_size = para['shuffle_size']
        self.optimize = para['optimize']
        self.train_step = para['train_step']
        self.save_path = para['save_path']
        self.pred_n = para['pred_n']
        
    def input_data(self,features,labels,mode):
    
    # data.dataset is an important senior API of tensoeflow for construct deeplearning
    # algorithm. this API mainly use function such as from_tensor_slices, shuffle,map,
    # repeat,batch and generator make_one_shot_iterator().get_next().
    #
    # here mainly have 3 ways to input numerical data into dataset:
    # 1: from data(features and labels)
    # 2: from file(sucn as with the format of tfrecord)
    # 3: from tensor(tensor of features and tensor of labels)
    #
    # here we use .from_tensor_slices for input format, this way is suitable for data size 
    # not very big size, in this case can get data from generator.
    # reference:
    #                   https://tensorflow.google.cn/guide/performance/datasets
    #                   https://www.tensorflow.org/api_docs/python/tf/data/Dataset
    
        ds = tf.data.Dataset.from_tensor_slices({"feature": features, "label": labels}) # not need shuffle when eval
        if mode == 'train':
            ds = ds.shuffle(self.shuffle_size).repeat(self.epoch).batch(self.batch_size)
        elif mode == 'eval' or mode == 'predict':
            ds = ds.repeat(1).batch(self.batch_size)

            
        #x, y = ds.make_one_shot_iterator().get_next() # tensor of features and labels

        rst = ds.make_one_shot_iterator().get_next() # tensor of features and labels
        return rst['feature'], rst['label']
    
    #-----------------------------------------------------------------------------------------------------------------
    def model_fn(self,  features,  labels,  mode):
    
    # construct the RNN net with num_layers layers and num_hidden units for each layer
    # and return tf.estimator.EstimatorSpec for using tf.estimator
    
    # one thing need to remember is will we need multipl-processing?
    
        if isinstance(features, dict):  # For serving
            features = features['feature']
        
        with tf.name_scope("RNN"):
            cell = tf.nn.rnn_cell.MultiRNNCell([tf.nn.rnn_cell.LSTMCell(self.hidden_size) 
                                            for _ in range(self.num_layers)])  
            outputs, _ = tf.nn.dynamic_rnn(cell, features, dtype=tf.float32)
            output = outputs[:, -1, :]
            
            # addtion a full connection layer at last 
            predictions = tf.contrib.layers.fully_connected(output, self.pred_n, activation_fn=None)
       
        # for Predict use
        if mode == tf.estimator.ModeKeys.PREDICT:
            
            predict_output = {'values': predictions}
            export_outputs = {'predictions': tf.estimator.export.PredictOutput(predict_output)}
        
            return tf.estimator.EstimatorSpec(
                mode = mode,
                predictions = predict_output,
                export_outputs=export_outputs)

 
        # now mse, and can set this part optional
        with tf.name_scope("Loss"):
            loss = tf.losses.mean_squared_error(labels=labels, predictions=predictions)
        
        with tf.name_scope("Train"):
            train_op = tf.contrib.layers.optimize_loss(loss, tf.train.get_global_step(),
                                                   optimizer=self.optimize, 
                                                   learning_rate=self.learning_rate)
            metrics = {"mae": tf.metrics.mean_absolute_error(labels, predictions)} 
            tf.summary.scalar('Loss', loss)
            
        if mode == tf.estimator.ModeKeys.EVAL or mode == tf.estimator.ModeKeys.TRAIN:

            return tf.estimator.EstimatorSpec(mode=mode,
                                                loss=loss,
                                                train_op=train_op)
        
    #----------------------------------------------------------------------------------
    def create_estimator(self):
        
        training_config = tf.estimator.RunConfig(model_dir=self.save_path,tf_random_seed=1234)
        estimator = tf.estimator.Estimator(model_fn=self.rnn_model, config=training_config)
        return estimator
    
    
    def serving_input_receiver_fn(self,xshape):
        
        number = tf.placeholder(dtype=tf.float32, shape=[None,xshape[0],xshape[1]], name='serve_input')
        receiver_tensors = {'serve_input': number}
        #features = tf.convert_to_tensor(np.zeros((1,xshape[0],xshape[1])),dtype=tf.float32)
        features =  number
        return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)

        
    def pred_model(self,test_X,test_y,md,k,b):
        
        '''test_X: features of  test/predict data
        test_y: labels of test/predict data, for re_norm and for predict use
        md: estimator
        k: slope
        b: interpret'''
        
        
        results = md.predict(lambda:self.input_data(test_X,test_y,'eval'))
        rst = [result["pred"] for result in results]
        rst = np.array(rst)

        test_denorm = test_y*k+b
        rst_denorm = rst*k+b
        
        return rst,rst_denorm,test_denorm

    def plot_rst(self,test_y,rst):
        
        colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
        plt.plot(test_y, label="Actual Values", color='red')
        plt.plot(rst, label="Predicted Values", color='green', )

        plt.title('Result')
        plt.xlabel('Index')
        plt.ylabel('Amplitude')
        plt.legend(loc='best')
        plt.show()
        
    def plot_rst2(self,test_y,rst):
        
        colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
        plt.plot(test_y[0], label="Actual Values", color='red')
        plt.plot(rst[0], label="Predicted Values", color='green', )

        plt.title('Result')
        plt.xlabel('Index')
        plt.ylabel('Amplitude')
        plt.legend(loc='best')
        plt.show()

In [None]:
# parameters dictionary for Class model
# we can geuss the meanning of all parameters from the name of them

para_dict = {'hidden_size':30,
             'num_layers':1,
             'batch_size':32,
            'epoch':5,
            'learning_rate':0.06,
            'shuffle_size':10000,
            'optimize':'Adam',
            'train_step':3000,
            'save_path':'./model',
            'pred_n':1,
            'save_model':'./serve_model'}

data_dict = {'ts':200,
             'pred_n':para_dict['pred_n'],
            'n_train':10000,
            'n_test':1000,
            'd_t':0.1,
            'time_steps':30,
            'ratio':0.8,
            'file_name':'/Users/kappa0517/Code/RNN_predict/data0416.csv'}

In [None]:
train_X,train_y,test_X,test_y,k,b = pro_data(data_dict).new_data()
s_p = (train_X.shape[1],train_X.shape[2])

Path('model').mkdir(exist_ok=True)
estimator = tf.estimator.Estimator(get_model(para_dict).model_fn, 'model')

train_spec = tf.estimator.TrainSpec(
    input_fn=lambda :get_model(para_dict).input_data(train_X,train_y,'train'))

eval_spec = tf.estimator.EvalSpec(
    input_fn=lambda :get_model(para_dict).input_data(test_X,test_y,'eval'),
    exporters=[tf.estimator.LatestExporter(
        name="eval", 
        serving_input_receiver_fn=lambda :get_model(para_dict).serving_input_receiver_fn(s_p),
        exports_to_keep=1,
        as_text=True)],
        steps=None)


tf.estimator.train_and_evaluate(
    estimator=estimator,
    train_spec=train_spec, 
    eval_spec=eval_spec)

estimator.export_saved_model(para_dict['save_model'], 
                             lambda :get_model(para_dict).serving_input_receiver_fn(s_p))

In [None]:
#  预测
Path(para_dict['save_path']).mkdir(exist_ok=True)
estimator = tf.estimator.Estimator(get_model(para_dict).model_fn, para_dict['save_path'])
results =estimator.predict(input_fn=lambda :get_model(para_dict).input_data(test_X,test_y,'predict'))
rst = [result["values"] for result in results]
s = np.array(rst)
# 服务
#-----------------  提供服务
#-----------------  path为文件夹时，Path.iterdir()产生path文件夹下的所有文件、文件夹路径的迭代器
subdirs = [x for x in Path(para_dict['save_model']).iterdir() if x.is_dir() and 'temp' not in str(x)]
latest = str(sorted(subdirs)[-1])
predict_fn = predictor.from_saved_model(latest)

a = test_X
#a = np.random.random((30,s_p[0],s_p[1]))
rst = []
tic = time.time()
for i in range(a.shape[0]):
    
    pred = predict_fn({'serve_input': [a[i,:,:]]})['values']
    rst.append(pred[0])
    
rst = np.array(rst)
rst2 = rst*k+b
toc = time.time()

print(toc-tic)
#print(rst)
print(s-rst)