In [40]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# 데이터 생성 및 데이터셋 클래스 (이전 코드와 동일)
def generate_stock_data(n_samples, seq_length, n_features=1):
    data = np.zeros((n_samples, seq_length, n_features))
    for i in range(n_samples):
        price = 100 * np.ones(seq_length + 1)
        returns = np.random.normal(0, 0.01, seq_length)
        price[1:] = price[0] * np.exp(np.cumsum(returns))
        log_returns = np.diff(np.log(price))
        data[i, :, 0] = log_returns
    return data

class StockDataset(Dataset):
    def __init__(self, data, input_window, target_window):
        self.data = torch.FloatTensor(data)
        self.input_window = input_window
        self.target_window = target_window

    def __len__(self):
        return len(self.data) - self.input_window - self.target_window + 1

    def __getitem__(self, idx):
        x = self.data[idx:idx+self.input_window]
        y = self.data[idx+self.input_window:idx+self.input_window+self.target_window]
        return x, y

# Generator 네트워크
class Generator(nn.Module):
    def __init__(self, condition_dim, noise_dim, hidden_dim, output_dim):
        super(Generator, self).__init__()
        self.condition_fc = nn.Linear(condition_dim, hidden_dim)
        self.noise_fc = nn.Linear(noise_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim * 2, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, condition, noise):
        batch_size = condition.size(0)
        seq_len = condition.size(1)

        # Flatten and process condition
        flat_condition = condition.view(batch_size, -1)
        # Remove the extra dimension from flat_condition
        flat_condition = flat_condition[:, :condition_dim]
        condition_hidden = self.condition_fc(flat_condition)
        condition_hidden = condition_hidden.unsqueeze(1).repeat(1, seq_len, 1)

        # Process noise
        noise_hidden = self.noise_fc(noise)
        noise_hidden = noise_hidden.unsqueeze(1).repeat(1, seq_len, 1)

        # Combine condition and noise
        combined = torch.cat([condition_hidden, noise_hidden], dim=-1)

        lstm_out, _ = self.lstm(combined)
        return self.sigmoid(self.fc(lstm_out[:, -1, :])) # Apply sigmoid to the output

# Discriminator 네트워크
class Discriminator(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(Discriminator, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        return torch.sigmoid(self.fc(lstm_out[:, -1, :]))

# Fin-GAN 손실 함수
def fin_gan_loss(pred, target, pnl_weight=1.0, mse_weight=1.0, sr_weight=1.0):
    # Reshape target to match pred and scale to [0, 1]
    target = target.squeeze()[:, -1].unsqueeze(1)
    target = (target - target.min()) / (target.max() - target.min()) # Scale target to [0, 1]

    bce_loss = nn.BCELoss()(pred, target)
    pnl_loss = -torch.mean(torch.tanh(100 * pred) * target)
    mse_loss = nn.MSELoss()(pred, target)

    mean_pnl = torch.mean(torch.tanh(100 * pred) * target)
    std_pnl = torch.std(torch.tanh(100 * pred) * target)
    sr_loss = -mean_pnl / (std_pnl + 1e-6)

    return bce_loss + pnl_weight * pnl_loss + mse_weight * mse_loss + sr_weight * sr_loss

# 하이퍼파라미터 설정
n_samples = 1000
seq_length = 60
input_window = 50
target_window = 1
hidden_dim = 64
noise_dim = 10
num_epochs = 100
batch_size = 32
lr = 0.0001

# 데이터 생성 및 준비
data = generate_stock_data(n_samples, seq_length)
dataset = StockDataset(data, input_window, target_window)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 모델 초기화
condition_dim = input_window * 1  # 50 * 1
generator = Generator(condition_dim, noise_dim, hidden_dim, target_window)
discriminator = Discriminator(input_dim=input_window, hidden_dim=hidden_dim)

# 최적화기 초기화
g_optimizer = optim.Adam(generator.parameters(), lr=lr)
d_optimizer = optim.Adam(discriminator.parameters(), lr=lr)

# 훈련 루프
for epoch in range(num_epochs):
    for condition, real_data in dataloader:
        batch_size = condition.size(0)

        # Discriminator 훈련
        noise = torch.randn(batch_size, noise_dim)
        fake_data = generator(condition, noise)

        # Reshape condition to 3D before passing to discriminator, taking only the input_window timesteps
        real_output = discriminator(condition[:, :, :input_window].view(batch_size, input_window, -1))
        fake_output = discriminator(condition[:, :, :input_window].view(batch_size, input_window, -1))

        d_loss = nn.BCELoss()(real_output, torch.ones_like(real_output)) + \
                nn.BCELoss()(fake_output, torch.zeros_like(fake_output))

        d_optimizer.zero_grad()
        d_loss.backward()
        d_optimizer.step()

        # Generator 훈련
        noise = torch.randn(batch_size, noise_dim)
        fake_data = generator(condition, noise)
        # Reshape condition to 3D before passing to discriminator, taking only the input_window timesteps
        fake_output = discriminator(condition[:, :, :input_window].view(batch_size, input_window, -1))

        g_loss = fin_gan_loss(fake_data, real_data)

        g_optimizer.zero_grad()
        g_loss.backward()
        g_optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], D Loss: {d_loss.item():.4f}, G Loss: {g_loss.item():.4f}")

# 모델 평가 (간단한 예시)
generator.eval()
with torch.no_grad():
    test_condition = dataset[0][0].unsqueeze(0)  # 첫 번째 샘플의 입력 데이터
    test_noise = torch.randn(1, noise_dim)
    predicted = generator(test_condition, test_noise)
    print("예측값:", predicted.item())
    print("실제값:", dataset[0][1][:, -1].item()) # Extract the last element of the target for the given timestep

Epoch [10/100], D Loss: 1.3863, G Loss: -1.3789
Epoch [20/100], D Loss: 1.3863, G Loss: -1.0762
Epoch [30/100], D Loss: 1.3863, G Loss: -1.2458
Epoch [40/100], D Loss: 1.3863, G Loss: -2.1581
Epoch [50/100], D Loss: 1.3863, G Loss: -0.8275
Epoch [60/100], D Loss: 1.3863, G Loss: -2.1868
Epoch [70/100], D Loss: 1.3863, G Loss: -1.9265
Epoch [80/100], D Loss: 1.3863, G Loss: -1.8041
Epoch [90/100], D Loss: 1.3863, G Loss: -1.7253
Epoch [100/100], D Loss: 1.3863, G Loss: -1.4139
예측값: 0.5022321343421936
실제값: -0.006826336961239576
