In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
import gym
import torch
from torch import nn
import matplotlib.pyplot as plt

# 데이터 수집 및 전처리
def load_data(ticker):
    data = yf.download(ticker, start="2020-01-01", end="2023-01-01")
    data['SMA'] = data['Close'].rolling(window=20).mean()
    data['Return'] = data['Close'].pct_change()
    data.dropna(inplace=True)
    return data

data = load_data("AAPL")

# 환경 정의
class StockTradingEnv(gym.Env):
    def __init__(self, data):
        super(StockTradingEnv, self).__init__()
        self.data = data.reset_index()
        self.current_step = 0
        self.action_space = gym.spaces.Discrete(3)  # 매수, 매도, 보유
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32)

    def reset(self):
        self.current_step = 0
        self.balance = 1000  # 초기 잔고
        self.stock_owned = 0  # 보유 주식 수
        return self._next_observation()

    def _next_observation(self):
        obs = self.data.iloc[self.current_step][['Open', 'High', 'Low', 'Close', 'Volume', 'Return']].values
        return obs.astype(np.float32)  # 데이터 타입 변환

    def step(self, action):
        current_price = self.data.iloc[self.current_step]['Close']
        reward = 0

        if action == 0:  # 매수
            if self.balance >= current_price:
                self.stock_owned += 1
                self.balance -= current_price
        elif action == 1:  # 매도
            if self.stock_owned > 0:
                self.stock_owned -= 1
                self.balance += current_price

        self.current_step += 1
        done = self.current_step >= len(self.data) - 1
        next_state = self._next_observation()

        # 보상: 잔고 변화에 따른 보상
        reward = self.balance + self.stock_owned * current_price - 1000  # 초기 잔고 1000

        return next_state, reward, done, {}

# 트랜스포머 모델 정의
class TransformerModel(nn.Module):
    def __init__(self, input_dim, model_dim, n_heads, n_layers):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, model_dim)
        self.transformer = nn.Transformer(d_model=model_dim, nhead=n_heads, num_encoder_layers=n_layers)
        self.fc = nn.Linear(model_dim, 3)  # 행동의 확률 분포

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)  # 배치 차원 추가
        x = self.transformer(x)
        return torch.softmax(self.fc(x[-1]), dim=-1)  # 마지막 출력을 사용

# 엑터-크리틱 모델 정의
class Critic(nn.Module):
    def __init__(self, input_dim):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 1)  # 상태의 가치 추정

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

# PPO 에이전트 구현
class PPOAgent:
    def __init__(self, actor, critic, clip_param=0.2, gamma=0.99, lr=3e-4):
        self.actor = actor
        self.critic = critic
        self.clip_param = clip_param
        self.gamma = gamma
        self.optimizer = torch.optim.Adam(list(actor.parameters()) + list(critic.parameters()), lr=lr)

    def select_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs = self.actor(state_tensor)
        action = torch.multinomial(action_probs, num_samples=1)
        return action.item()

    def update_policy(self, states, actions, rewards, next_states):
        rewards = torch.FloatTensor(rewards)
        states_tensor = torch.FloatTensor(states)
        actions_tensor = torch.LongTensor(actions)

        values = self.critic(states_tensor).squeeze()
        next_values = self.critic(torch.FloatTensor(next_states)).squeeze()

        td_targets = rewards + self.gamma * next_values
        advantages = td_targets - values

        action_probs = self.actor(states_tensor)
        action_log_probs = torch.log(action_probs.gather(1, actions_tensor.unsqueeze(1)).squeeze())
        ratio = action_log_probs - torch.log(action_probs.gather(1, actions_tensor.unsqueeze(1)).squeeze())

        surrogate1 = ratio * advantages.detach()
        surrogate2 = torch.clamp(ratio, 1 - self.clip_param, 1 + self.clip_param) * advantages.detach()
        actor_loss = -torch.min(surrogate1, surrogate2).mean()

        critic_loss = nn.functional.mse_loss(values, td_targets.detach())

        loss = actor_loss + critic_loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

# 모델 초기화
env = StockTradingEnv(data)
actor = TransformerModel(input_dim=6, model_dim=64, n_heads=4, n_layers=2)
critic = Critic(input_dim=6)
agent = PPOAgent(actor, critic)

# 학습 루프
num_episodes = 1000
rewards_list = []

for episode in range(num_episodes):
    state = env.reset()
    done = False
    states, actions, rewards = [], [], []
    
    while not done:
        action = agent.select_action(state)
        next_state, reward, done, _ = env.step(action)
        
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        
        state = next_state

    agent.update_policy(states, actions, rewards, [next_state] * len(states))
    rewards_list.append(sum(rewards))

# 결과 플롯
plt.figure(figsize=(12, 6))
plt.plot(rewards_list)
plt.title('Total Rewards over Episodes')
plt.xlabel('Episodes')
plt.ylabel('Total Rewards')
plt.grid()
plt.show()

# 최종 잔고 및 손실률 계산
final_balance = env.balance + env.stock_owned * env.data.iloc[env.current_step]['Close']
initial_investment = 1000
loss_percentage = ((initial_investment - final_balance) / initial_investment) * 100

print(f"최종 잔고: {final_balance:.2f}")
print(f"손실률: {loss_percentage:.2f}%")


[*********************100%%**********************]  1 of 1 completed


TypeError: forward() missing 1 required positional argument: 'tgt'