In [1]:
# Importing necessary libraries
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import math
     


In [2]:
# Load data into DataFrame 
data = pd.read_csv('df_KO_.csv',parse_dates=True, index_col=0)

In [3]:
data

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,Reported EPS,Price Change,RSI,K,EMA12,EMA26,MACD Line,Signal Line,MACD Histogram,ROC,OBV,P/E
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
1996-05-06,19.937500,20.218750,19.750000,20.218750,10.236874,7170400,0.14,0.281250,54.166667,70.000000,20.167723,20.122550,0.045173,0.045976,-0.000803,-0.154321,-10631600,144.419643
1996-05-07,20.218750,20.406250,20.156250,20.343750,10.300159,6702800,0.14,0.125000,56.578947,83.333333,20.194804,20.138935,0.055869,0.047955,0.007914,1.244168,-3928800,145.312500
1996-05-08,20.343750,20.687500,20.062500,20.687500,10.474213,8292800,0.14,0.343750,65.853659,100.000000,20.270603,20.179569,0.091034,0.056571,0.034463,1.846154,4364000,147.767857
1996-05-09,20.687500,20.937500,20.593750,20.687500,10.474213,4820400,0.14,0.000000,71.052632,78.947368,20.334741,20.217194,0.117547,0.068766,0.048781,1.846154,4364000,147.767857
1996-05-10,20.718750,20.968750,20.718750,20.968750,10.616608,4942800,0.14,0.281250,68.571429,100.000000,20.432281,20.272865,0.159416,0.086896,0.072520,3.389831,9306800,149.776786
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2022-12-23,63.500000,63.869999,63.200001,63.820000,62.855492,6463300,0.69,0.480000,53.333314,62.869238,63.351839,62.744877,0.606962,0.768770,-0.161808,-0.234487,3006553500,92.492753
2022-12-27,63.930000,64.290001,63.709999,64.209999,63.239597,7320700,0.69,0.389999,56.862744,79.324942,63.483864,62.853404,0.630459,0.741108,-0.110649,0.343800,3013874200,93.057970
2022-12-28,64.459999,64.650002,63.490002,63.570000,62.609272,7159400,0.69,-0.639999,50.243892,52.320693,63.497115,62.906485,0.590630,0.711012,-0.120382,-0.656356,3006714800,92.130434
2022-12-29,63.799999,64.150002,63.700001,63.950001,62.983532,7169300,0.69,0.380001,51.118206,68.354526,63.566790,62.983783,0.583007,0.685411,-0.102404,1.331010,3013884100,92.681161


In [4]:
# Creating a new column 'y' to store the 'Close' values
data['y'] = data['Close']

In [5]:
# Selecting all columns for x and the 'y' column for y
x = data.iloc[:, :18].values
y = data.iloc[:, 18].values

In [8]:
# Splitting the dataset into training and testing sets (80-20 split)
split = int(data.shape[0]* 0.8)
train_x, test_x = x[: split, :], x[split:, :]
train_y, test_y = y[: split, ], y[split: , ]

# Check the shapes of the training and testing datasets
print(f'trainX: {train_x.shape} trainY: {train_y.shape}')
print(f'testX: {test_x.shape} testY: {test_y.shape}')

# Initializing MinMaxScaler to scale the features and labels between 0 and 1
x_scaler = MinMaxScaler(feature_range = (0, 1))
y_scaler = MinMaxScaler(feature_range = (0, 1))

# Fitting and transforming the training features and labels
train_x = x_scaler.fit_transform(train_x)
test_x = x_scaler.transform(test_x)

# Transforming the testing features and labels
train_y = y_scaler.fit_transform(train_y.reshape(-1, 1))
test_y = y_scaler.transform(test_y.reshape(-1, 1))

trainX: (5368, 18) trainY: (5368,)
testX: (1343, 18) testY: (1343,)


In [9]:

# Definition of sliding_window function
def sliding_window(x, y, window):

"""
`sliding_window` Function
- The sliding_window function is used to transform time-series data into a suitable 
  format for training the GAN.
- It takes input arrays `x` and `y`, and a `window` size as input.
- The function slides the window across the `x` and extracts corresponding windows of 
  `x` and `y` data.
- The function returns these windows as new `x` and `y` tensors, as well as additional
  `y` tensors for the GAN.
"""
    
    x_ = []    # to store slices of x
    y_ = []    # to store corresponding y values
    y_gan = [] # to store slices of y for GAN
    
    for i in range(window, x.shape[0]):
        # extracting a window of data from x and y
        tmp_x = x[i - window: i, :]
        tmp_y = y[i]
        tmp_y_gan = y[i - window: i + 1]
        # appending the extracted data to the lists
        x_.append(tmp_x)
        y_.append(tmp_y)
        y_gan.append(tmp_y_gan)
    # converting lists to PyTorch tensors
    x_ = torch.from_numpy(np.array(x_)).float()
    y_ = torch.from_numpy(np.array(y_)).float()
    y_gan = torch.from_numpy(np.array(y_gan)).float()
    return x_, y_, y_gan
     


In [13]:
# Call to sliding_window function for both training and testing data
train_x_slide, train_y_slide, train_y_gan = sliding_window(train_x, train_y, 3)
test_x_slide, test_y_slide, test_y_gan = sliding_window(test_x, test_y, 3)

# Checking the shapes of the tensors
print(f'train_x: {train_x_slide.shape} train_y: {train_y_slide.shape} train_y_gan: {train_y_gan.shape}')
print(f'test_x: {test_x_slide.shape} test_y: {test_y_slide.shape} test_y_gan: {test_y_gan.shape}')


train_x: torch.Size([5365, 3, 18]) train_y: torch.Size([5365, 1]) train_y_gan: torch.Size([5365, 4, 1])
test_x: torch.Size([1340, 3, 18]) test_y: torch.Size([1340, 1]) test_y_gan: torch.Size([1340, 4, 1])


In [14]:

"""
`Generator` Class
- The Generator is a neural network class with three GRU layers and three Linear layers.
- It takes an input tensor and outputs a tensor of a single dimension.
- The forward method of the class performs the forward pass of the network.
- The forward pass involves passing the input through the GRU layers, applying dropout 
  to the output, and then passing the result through the Linear layers.
- The network uses CUDA if available for GPU acceleration.
"""

# Definition of Generator class
class Generator(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.gru_1 = nn.GRU(input_size, 1024, batch_first = True)
        self.gru_2 = nn.GRU(1024, 512, batch_first = True)
        self.gru_3 = nn.GRU(512, 256, batch_first = True)
        self.linear_1 = nn.Linear(256, 128)
        self.linear_2 = nn.Linear(128, 64)
        self.linear_3 = nn.Linear(64, 1)
        self.dropout = nn.Dropout(0.2)
        
    # Defining the forward pass
    def forward(self, x):
        # Checking CUDA availability
        use_cuda = 1
        device = torch.device("cuda" if (torch.cuda.is_available() & use_cuda) else "cpu")
        h0 = torch.zeros(1, x.size(0), 1024).to(device)  # Initial hidden state
        out_1, _ = self.gru_1(x, h0)  # Passing through first GRU layer
        out_1 = self.dropout(out_1)
        h1 = torch.zeros(1, x.size(0), 512).to(device)
        out_2, _ = self.gru_2(out_1, h1)
        out_2 = self.dropout(out_2)
        h2 = torch.zeros(1, x.size(0), 256).to(device)
        out_3, _ = self.gru_3(out_2, h2)
        out_3 = self.dropout(out_3)
        out_4 = self.linear_1(out_3[:, -1, :])
        out_5 = self.linear_2(out_4)
        out = self.linear_3(out_5)
        return out

"""
`Discriminator` Class
- The Discriminator is a neural network class with three 1D Convolutional layers and three Linear layers.
- It takes a 1D tensor as input and outputs a single value between 0 and 1, representing the 
  probability that the input is real (as opposed to generated).
- The forward method of the class performs the forward pass of the network.
- The forward pass involves passing the input through the Convolutional layers, flattening 
  the output, and then passing the result through the Linear layers.
- The network uses LeakyReLU activation functions for the Convolutional layers and ReLU 
  for the Linear layers, and it applies Sigmoid activation to the output.
"""


# Definition of Discriminator class
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        # Defining Conv and Linear layers
        self.conv1 = nn.Conv1d(4, 32, kernel_size = 3, stride = 1, padding = 'same')
        self.conv2 = nn.Conv1d(32, 64, kernel_size = 3, stride = 1, padding = 'same')
        self.conv3 = nn.Conv1d(64, 128, kernel_size = 3, stride = 1, padding = 'same')
        self.linear1 = nn.Linear(128, 220)
        self.batch1 = nn.BatchNorm1d(220)
        self.linear2 = nn.Linear(220, 220)
        self.batch2 = nn.BatchNorm1d(220)
        self.linear3 = nn.Linear(220, 1)
        # Activation layers
        self.leaky = nn.LeakyReLU(0.01)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    # Defining the forward pass
    def forward(self, x):
        conv1 = self.conv1(x)      
        conv1 = self.leaky(conv1)   
        conv2 = self.conv2(conv1)
        conv2 = self.leaky(conv2)
        conv3 = self.conv3(conv2)
        conv3 = self.leaky(conv3)
        flatten_x = conv3.reshape(conv3.shape[0], conv3.shape[1]) 
        out_1 = self.linear1(flatten_x)
        out_1 = self.leaky(out_1)
        out_2 = self.linear2(out_1)
        out_2 = self.relu(out_2)
        out_3 = self.linear3(out_2)
        out = self.sigmoid(out_3)    
        return out

In [None]:
# Setting up CUDA for GPU acceleration, if available
use_cuda = 1
device = torch.device("cuda" if (torch.cuda.is_available() & use_cuda) else "cpu")

# Hyperparameters for training
batch_size = 128
learning_rate = 0.00016
num_epochs = 165

# Creating DataLoader for batching training data
trainDataloader = DataLoader(TensorDataset(train_x_slide, train_y_gan), batch_size = batch_size, shuffle = False)

# Initializing the Generator and Discriminator models
modelG = Generator(18).to(device) # Updated input size
modelD = Discriminator().to(device)

# Setting the loss function and optimizers
criterion = nn.BCELoss()
optimizerG = torch.optim.Adam(modelG.parameters(), lr = learning_rate, betas = (0.0, 0.9))
optimizerD = torch.optim.Adam(modelD.parameters(), lr = learning_rate, betas = (0.0, 0.9))

# Arrays to keep track of losses over epochs
histG = np.zeros(num_epochs)
histD = np.zeros(num_epochs)
count = 0

# Training loop
for epoch in range(num_epochs):
    loss_G = []
    loss_D = []
    for (x, y) in trainDataloader:
        x = x.to(device)
        y = y.to(device)
        
        # Generate fake data with the Generator
        fake_data = modelG(x)
        fake_data = torch.cat([y[:, :3, :], fake_data.reshape(-1, 1, 1)], axis = 1)

        # Discriminator's loss calculation on real data
        dis_real_output = modelD(y)
        real_labels = torch.ones_like(dis_real_output).to(device)
        lossD_real = criterion(dis_real_output, real_labels)

        # Discriminator's loss calculation on fake data
        dis_fake_output = modelD(fake_data)
        fake_labels = torch.zeros_like(real_labels).to(device)
        lossD_fake = criterion(dis_fake_output, fake_labels)
        
        # Total Discriminator's loss
        lossD = (lossD_real + lossD_fake)
        
        # Zero gradients, backpropagation, and optimization for Discriminator
        modelD.zero_grad()
        lossD.backward(retain_graph=True)
        optimizerD.step()
        loss_D.append(lossD.item())
        
        # Generator's loss calculation
        output_fake = modelD(fake_data)
        lossG = criterion(output_fake, real_labels)
        
        # Zero gradients, backpropagation, and optimization for Generator
        modelG.zero_grad()
        lossG.backward()
        optimizerG.step()
        loss_G.append(lossG.item()) 

    # Storing the average losses for this epoch
    histG[epoch] = sum(loss_G) 
    histD[epoch] = sum(loss_D)    
    
    # Print losses for this epoch
    print(f'[{epoch+1}/{num_epochs}] LossD: {sum(loss_D)} LossG:{sum(loss_G)}')
     

[1/165] LossD: 58.228445291519165 LossG:29.326889872550964


In [None]:
# Plotting the Generator and Discriminator Loss
plt.figure(figsize = (12, 6))
plt.plot(histG, color = 'blue', label = 'Generator Loss')
plt.plot(histD, color = 'black', label = 'Discriminator Loss')
plt.title('GAN Loss')
plt.xlabel('Days')
plt.legend(loc = 'upper right')
     


In [None]:
# Setting the Generator to evaluation mode
modelG.eval()
# Getting the predictions for training and testing data
pred_y_train = modelG(train_x_slide.to(device))
pred_y_test = modelG(test_x_slide.to(device))

# Inverting the scaling for true and predicted values for comparison
y_train_true = y_scaler.inverse_transform(train_y_slide)
y_train_pred = y_scaler.inverse_transform(pred_y_train.cpu().detach().numpy())

y_test_true = y_scaler.inverse_transform(test_y_slide)
y_test_pred = y_scaler.inverse_transform(pred_y_test.cpu().detach().numpy())
     
# Plotting the actual vs predicted prices for the training dataset
plt.figure(figsize=(12, 8))
plt.plot(y_train_true, color = 'red', label = 'Acutal Price')
plt.plot(y_train_pred, color = 'blue', label = 'Predict Price')
plt.title('GAN prediction training dataset')
plt.ylabel('TWD')
plt.xlabel('Days')
plt.legend(loc = 'upper right')

# Calculating and printing the Root Mean Squared Error for the training dataset
MSE = mean_squared_error(y_train_true, y_train_pred)
RMSE = math.sqrt(MSE)
print(f'Training dataset RMSE:{RMSE}')
     


In [None]:
# Plotting the actual vs predicted prices for the testing dataset
plt.figure(figsize=(12, 8))
plt.plot(y_test_true, color = 'red', label = 'Acutal Price')
plt.plot(y_test_pred, color = 'blue', label = 'Predict Price')
plt.title('GAN prediction testing dataset')
plt.ylabel('TWD')
plt.xlabel('Days')
plt.legend(loc = 'upper right')

# Calculating and printing the Root Mean Squared Error for the testing dataset
MSE = mean_squared_error(y_test_true, y_test_pred)
RMSE = math.sqrt(MSE)
print(f'Test dataset RMSE:{RMSE}')