## Simple RNN
https://medium.com/@thisislong/building-a-recurrent-neural-network-from-scratch-ba9b27a42856
<br>https://www.analyticsvidhya.com/blog/2019/01/fundamentals-deep-learning-recurrent-neural-networks-scratch-python/
<br>input data : 조업편차분석
<br>input data size : (53, 7596)
<br>output data feature : 'A10' column

In [1]:
# import libraries
import numpy as np
import matplotlib.pyplot as plt # only for graph visualization only

In [2]:
# file upload and parsing
# separating input and output data
file_dir = '../00_cnn/조업편차분석.txt'

with open(file_dir, 'r') as f:
    input_data = [x.strip().split(' ')[1:] for x in f.readlines()]
    f.close()

input_data = np.array(input_data[1:])
y = input_data[:, 9].astype('float')
x = np.delete(input_data, np.s_[9:10], axis = 1).astype('float')

np.shape(x), np.shape(y)

((7596, 53), (7596,))

### Define necessary functions
- train-test split
- normalization
- activation functions (derivatives)
- loss functions
- initialize parameters
- forward pass
- BPTT (backpropagation through time)
- weight update

In [5]:
def Standard(X):
    X_scaled = ( X - X.mean(axis=0) ) / X.std(axis=0)
    return X_scaled, X.mean(axis=0), X.std(axis=0)

def de_Standard(X_scaled, mean, std):
    X = ( X_scaled * std ) + mean
    return X

In [6]:
# sigmoid function
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [7]:
# train-test split
def train_test_split(x, y, ratio=0.7):
    split_idx = int(len(x)*ratio)
    trainX = x[:split_idx]
    trainy = y[:split_idx]

    testX = x[split_idx:]
    testy = y[split_idx:]

    return trainX, testX, trainy, testy

In [8]:
# data preprocessing
x_scaled, X_mean, X_std = Standard(x)
y_scaled, y_mean, y_std = Standard(y)
trainX, testX, trainy, testy = train_test_split(x_scaled, y_scaled)
print(f'train set of input data:\t{trainX.shape}\ntest set of input data:\t\t{testX.shape}')
trainy.shape, testy.shape

train set of input data:	(5317, 53)
test set of input data:		(2279, 53)


((5317,), (2279,))

In [9]:
#normalization
def Standard(X):
    X_scaled = ( X - X.mean(axis=0) ) / X.std(axis=0)
    return X_scaled, X.mean(axis=0), X.std(axis=0)

def de_Standard(X_scaled, mean, std):
    X = ( X_scaled * std ) + mean
    return X

In [10]:
#activation functions + derivatives
def sigmoid(x, derivative=False):
    if derivative == True:
        return x*(1-x)
    else:
        return 1 / (1 + np.exp(-x))
    
def tanh(x, derivative=False):
    if derivative:
        return 1-np.tanh(x)**2
    else:
        return np.tanh(x)


In [11]:
#loss functions
# MSE loss function
def MSE(y_pred, y_true):
    total_error = np.sum((y_pred - y_true)**2)
    return total_error / len(y_true)

# MAE loss function
def MAE(y_pred, y_true):
    total_error = np.sum(np.abs(y_pred - y_true))
    return total_error / len(y_true)

#accuracy calculation
def r_squared(y_pred, y_true):
    ssr = np.sum((y_pred-y_true)**2)
    sst = np.sum((y_true.mean()-y_true)**2)
    return 1- (ssr/sst)

In [12]:
learning_rate = 0.0001    
nepoch = 25               
T = 53                   # length of sequence
hidden_dim = 64         
output_dim = 1

bptt_truncate = 5
min_clip_value = -10
max_clip_value = 10

In [13]:
U = np.random.uniform(0, 1, (hidden_dim, T))
W = np.random.uniform(0, 1, (hidden_dim, hidden_dim))
V = np.random.uniform(0, 1, (output_dim, hidden_dim))

In [15]:
for epoch in range(nepoch):
    # check loss on train
    loss = 0.0
    
    # do a forward pass to get prediction
    for i in range(trainy.shape[0]):
        x, y = trainX[i], trainy[i]                    # get input, output values of each record
        prev_s = np.zeros((hidden_dim, 1))   # here, prev-s is the value of the previous activation of hidden layer; which is initialized as all zeroes
        for t in range(T):
            new_input = np.zeros(x.shape)    # we then do a forward pass for every timestep in the sequence
            new_input[t] = x[t]              # for this, we define a single input for that timestep
            mulu = np.dot(U, new_input)
            mulw = np.dot(W, prev_s)
            add = mulw + mulu
            s = sigmoid(add)
            mulv = np.dot(V, s)
            prev_s = s

    # calculate error 
        loss_per_record = (y - mulv)**2 / 2
        loss += loss_per_record
    loss = loss / float(trainy.shape[0])
    val_loss = 0.0
    for i in range(testy.shape[0]):
        x, y = testX[i], testy[i]
        prev_s = np.zeros((hidden_dim, 1))
        for t in range(T):
            new_input = np.zeros(x.shape)
            new_input[t] = x[t]
            mulu = np.dot(U, new_input)
            mulw = np.dot(W, prev_s)
            add = mulw + mulu
            s = sigmoid(add)
            mulv = np.dot(V, s)
            prev_s = s

        loss_per_record = (y - mulv)**2 / 2
        val_loss += loss_per_record
    val_loss = val_loss / float(testy.shape[0])

    print('Epoch: ', epoch + 1, ', Loss: ', loss, ', Val Loss: ', val_loss)

Epoch:  1 , Loss:  [[656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161 656.94478161
  656.94478161 656.94478161 656.94478161 656.94478161]] , Val Loss:  [[616.93859548 616.93859548 616.93859548 616.93859548 616.93859548
  616.93859548 616.93859548 616.93859548 

KeyboardInterrupt: 