In [1]:
# change dir
import os

os.chdir("../")
os.getcwd()

'c:\\Users\\HP\\Desktop\\GenHack\\Genhack_competition'

In [2]:
# imports
import numpy as np
import pandas as pd

import plotly.graph_objects as go
import plotly.express as px

import torch
from torch import nn

In [3]:
# load real data
# stocks name
STOCK_NAMES = [f"stock_{i}" for i in range(4)]

# load data
stocks_df = pd.read_csv(
    # path to data
    "Data/train.csv",
    # use 1st col as index
    index_col=0,
    # name of cols
    header=None,
    names=STOCK_NAMES
)

In [4]:
# get data as tensor
real_data = stocks_df.values
# remove last 2 obs
real_data = real_data[:-2, :2]
real_data = torch.Tensor(real_data)

In [5]:
real_data.shape

torch.Size([744, 2])

In [6]:
# general setups and params

# for reproducibility
torch.manual_seed(123456789)
np.random.seed(123456789)

# params data
nb_obs, nb_tickers = real_data.shape

# to train nn
num_epochs = 300
batch_size = 31 * 2
loss_function = nn.BCELoss()
lr = 0.001

``1`` is the label of fake and ``0`` is the label of real

The objective is to:
- Max the error for the generator
- Min the error for the discriminator 

In [7]:
def gen_noise(nb_rows: int = 410, nb_cols: int = nb_tickers):

    normal_samples = np.random.normal(size=(nb_rows, nb_cols))
    return torch.Tensor(normal_samples)


# # check noise
# gen_data = gen_noise(1000, 4)

# # distribution of stocks
# import plotly.figure_factory as ff


# # Group data together
# hist_data = [gen_data[:, i] for i in range(4)]
# group_labels = [0, 1, 2, 3]

# # create distplot
# fig = ff.create_distplot(hist_data, group_labels)

# fig.update_layout(
#     title="Distribution of stocks",
#     xaxis={
#         "title": "values"
#     },
#     yaxis={
#         "title": "fraction",
#     }
# )
# fig.show()


In [8]:
# prepare data (add label 0 real, 1 gen)

train_labels = torch.zeros(nb_obs)
train_set = [
    (real_data[i], train_labels[i]) for i in range(nb_obs)
]

# arrange as batches
train_loader = torch.utils.data.DataLoader(
    train_set, batch_size=batch_size, shuffle=True
)

In [9]:
# define discreminator
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(2, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        output = self.model(x)
        return output

In [10]:
# define generator
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(2, 16),
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 2),
            nn.Softplus(),
        )
        pass

    def forward(self, x):
        output = self.model(x)
        return output

In [11]:
# define models
discriminator = Discriminator()
generator = Generator()

optimizer_discriminator = torch.optim.Adam(discriminator.parameters(), lr=lr)
optimizer_generator = torch.optim.Adam(generator.parameters(), lr=lr)

In [12]:
import tqdm

# store loss
arr_loss_dis = []
arr_loss_gen = []

arr_mean_dis = []
arr_mean_gen = []

for epoch in tqdm.tqdm(range(num_epochs)):
    # re init arr
    arr_loss_dis = []
    arr_loss_gen = []

    # loop over batches
    for n, (real_samples, _) in enumerate(train_loader):
        # Data for training the discriminator
        # labels samples
        real_samples_labels = torch.ones((batch_size, 1))
        generated_samples_labels = torch.zeros((batch_size, 1))

        # if 0 <= epoch % 10 < 5:
        latent_space_samples = gen_noise(batch_size, nb_tickers)
        generated_samples = generator(latent_space_samples)

        all_samples = torch.cat(
            (real_samples, generated_samples)
        )
        all_samples_labels = torch.cat(
            (real_samples_labels, generated_samples_labels)
        )

        # Training the discriminator
        discriminator.zero_grad()
        output_discriminator = discriminator(all_samples)

        loss_discriminator = loss_function(
            output_discriminator, all_samples_labels
        )
        loss_discriminator.backward()
        optimizer_discriminator.step()

        arr_loss_dis.append(loss_discriminator.item())
        
        # Data for training the generator
        latent_space_samples = gen_noise(batch_size, nb_tickers)

        # Training the generator
        generator.zero_grad()
        generated_samples = generator(latent_space_samples)
        output_discriminator_generated = discriminator(generated_samples)
        loss_generator = loss_function(
            output_discriminator_generated, real_samples_labels
        )
        loss_generator.backward()
        optimizer_generator.step()

        arr_loss_gen.append(loss_generator.item())

    # save mean of epoch
    arr_mean_dis.append(np.mean(arr_loss_dis))
    arr_mean_gen.append(np.mean(arr_loss_gen))

100%|██████████| 300/300 [00:43<00:00,  6.97it/s]


In [13]:

fig = go.Figure(
    data=[
        go.Scatter(
            y=arr_mean_dis,
            name="dis"
        ),
        go.Scatter(
            y=arr_mean_gen,
            name="gen"
        ),
    ]
)

# fig.update_layout(
#     xaxis={
#         'type': "log"
#     },
#     yaxis={
#         'type': "log"
#     },
# )

fig.show()

In [14]:
fig = px.line(
    stocks_df,
    # x="stock_0",
    y="stock_0"
)

fig.show()

In [20]:
# generate data for test
noise = gen_noise(nb_rows=nb_obs, nb_cols=nb_tickers)

generated_dt = generator(noise)

In [21]:

fig = px.line(
    y=[ten.item() for ten in generated_dt[:, 0]]
)


fig.update_layout(
    yaxis_range=[0, 0.1]
)
fig.show()

In [22]:
generated_dt

tensor([[0.0199, 0.0190],
        [0.0373, 0.0407],
        [0.0140, 0.0147],
        ...,
        [0.0219, 0.0233],
        [0.0177, 0.0177],
        [0.0112, 0.0105]], grad_fn=<SoftplusBackward>)

In [23]:
def tensor_to_arr(ten):
    # dim of ten
    n, p = ten.shape

    rows = []
    # explode
    for col in range(p):
        rows.append([ele.item() for ele in ten[:, col]])

    return np.array(rows, dtype=float).T

In [31]:
from py_scripts.marginals import Anderson_Darling
from py_scripts.dependance_absolute_kendall_error import Absolute_Kendall_error

Absolute_Kendall_error(
    np.array([[2 for i in range(744)], [1 for i in range(744)]]).T,
    tensor_to_arr(real_data), 
)

0.32238708230220625

In [34]:
Anderson_Darling(
    tensor_to_arr(generated_dt),
    tensor_to_arr(real_data)
)

-164.08421054252244