In [1]:
import numpy as np
import talib
import pandas as pd

# Load the CSV file
aapl_df = pd.read_csv("aapl_data.csv")

# Ensure the Date column is in datetime format
if 'Date' in aapl_df.columns:
    aapl_df['Date'] = pd.to_datetime(aapl_df['Date'])

# Sort by date
aapl_df = aapl_df.sort_values(by='Date', ascending=True) if 'Date' in aapl_df.columns else aapl_df

### 1. Candlestick Pattern Recognition ###
patterns = {
    "Doji": talib.CDLDOJI,
    "Engulfing": talib.CDLENGULFING,
    "Hammer": talib.CDLHAMMER,
    "Morning Star": talib.CDLMORNINGSTAR,
    "Evening Star": talib.CDLEVENINGSTAR,
}

for pattern_name, pattern_func in patterns.items():
    aapl_df[pattern_name] = pattern_func(aapl_df['Open'], aapl_df['High'], aapl_df['Low'], aapl_df['Close'])

### 2. Technical Indicators ###
aapl_df['SMA_10'] = talib.SMA(aapl_df['Close'], timeperiod=10)
aapl_df['SMA_50'] = talib.SMA(aapl_df['Close'], timeperiod=50)
aapl_df['EMA_10'] = talib.EMA(aapl_df['Close'], timeperiod=10)
aapl_df['EMA_50'] = talib.EMA(aapl_df['Close'], timeperiod=50)
aapl_df['RSI_14'] = talib.RSI(aapl_df['Close'], timeperiod=14)
aapl_df['MACD'], aapl_df['MACD_Signal'], aapl_df['MACD_Hist'] = talib.MACD(aapl_df['Close'], fastperiod=12, slowperiod=26, signalperiod=9)

# ### 3. Normalization ###
# price_columns = ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'SMA_10', 'SMA_50', 'EMA_10', 'EMA_50', 'RSI_14', 'MACD', 'MACD_Signal', 'MACD_Hist']
# aapl_df[price_columns] = (aapl_df[price_columns] - aapl_df[price_columns].min()) / (aapl_df[price_columns].max() - aapl_df[price_columns].min())

# ### 4. Windowed Representation ###
# window_size = 3  
# feature_columns = ['Close', 'Volume', 'SMA_10', 'SMA_50', 'RSI_14', 'MACD', 'MACD_Signal', 'MACD_Hist']

# for col in feature_columns:
#     for i in range(1, window_size + 1):
#         aapl_df[f"{col}_lag{i}"] = aapl_df[col].shift(i)

# aapl_df = aapl_df.dropna().reset_index(drop=True)

# Save the processed data
aapl_df.to_csv("processed_aapl_data.csv", index=False)

print("Feature engineering completed! Processed data saved as 'processed_aapl_data.csv'.")

Feature engineering completed! Processed data saved as 'processed_aapl_data.csv'.


In [1]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces

class StockTradingEnv(gym.Env):
    def __init__(self, df, initial_balance=10000, trading_fee=0.001, window_size=10):
        super(StockTradingEnv, self).__init__()
        
        # Load market data
        self.df = df.copy()
        self.initial_balance = initial_balance
        self.trading_fee = trading_fee
        self.window_size = window_size
        self.current_step = 0
        self.done = False
        
        # Define state space (features from the dataset)
        self.feature_columns = [
            'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Doji',
            'Engulfing', 'Hammer', 'Morning Star', 'Evening Star', 'SMA_10',
            'SMA_50', 'EMA_10', 'EMA_50', 'RSI_14', 'MACD', 'MACD_Signal',
            'MACD_Hist'
        ]
        self.state_size = len(self.feature_columns)
        
        # Define action space (Buy, Sell, Hold)
        self.action_space = spaces.Discrete(3)
        self.observation_space = spaces.Box(low=0, high=1, shape=(self.state_size,), dtype=np.float32)
        
        # Portfolio state variables
        self.balance = initial_balance
        self.shares_held = 0
        self.portfolio_value = initial_balance
        self.next_portfolio_value = initial_balance
        self.returns = []
        self.actions = []
        
    def reset(self):
        """Resets the environment to the initial state."""
        self.current_step = 0
        self.done = False
        self.balance = self.initial_balance
        self.shares_held = 0
        self.portfolio_value = self.initial_balance
        self.returns = []
        
        return self._next_observation()
    
    def _next_observation(self):
        """Returns the current market state as a feature vector."""
        return np.array(self.df.iloc[self.current_step][[col for col in self.feature_columns]], dtype=np.float32)
    
    def step(self, action):
        """Executes the given action and moves the environment forward."""
        current_price = self.df.iloc[self.current_step]['Close']
        next_price = self.df.iloc[self.current_step + 1]['Close']
        
        if action == 1:  # Buy
            if self.balance > 0.001:
                shares_to_buy = self.balance / (current_price * (1 + self.trading_fee))
                self.shares_held = shares_to_buy
                self.balance -= shares_to_buy * current_price * (1 + self.trading_fee)
        elif action == 2:  # Sell
            if self.shares_held > 0.001:
                self.balance += self.shares_held * current_price * (1 - self.trading_fee)
                self.shares_held = 0
        
        self.portfolio_value = self.balance + (self.shares_held * current_price)
        self.next_portfolio_value = self.balance + (self.shares_held * next_price)
        
        # Compute returns
        self.returns.append((self.next_portfolio_value - self.portfolio_value) / self.portfolio_value)
        self.actions.append(action)
        
        # Compute Sharpe Ratio (risk-adjusted reward)
        if len(self.returns) > 1:
            mean_return = np.mean(self.returns)
            std_return = np.std(self.returns) if np.std(self.returns) > 0 else 1
            sharpe_ratio = mean_return / std_return
        else:
            sharpe_ratio = 0
        
        reward = sharpe_ratio
        
        self.portfolio_value = self.next_portfolio_value

        # Move to the next step
        self.current_step += 1
        if self.current_step >= len(self.df) - 1:
            self.done = True
        
        return self._next_observation(), reward, self.done, {}
    
    def render(self):
        """Displays the current portfolio state."""
        print(f'Step: {self.current_step}, Balance: {self.balance:.2f}, Shares Held: {self.shares_held}, Portfolio Value: {self.portfolio_value}, Sharpe Ratio: {reward}')


ModuleNotFoundError: No module named 'gymnasium'

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

# Define preprocessing function
def preprocess_state(state, window_size=20):
    """Normalize the input features and apply a 20-day window."""
    state = np.array(state, dtype=np.float32)
    state = (state - np.mean(state)) / (np.std(state) + 1e-5)  # Standardization
    state = np.concatenate([state[-window_size:], np.zeros(max(0, window_size - len(state)))])
    return torch.tensor(state, dtype=torch.float32).unsqueeze(0)  # Convert to tensor

# Define the Deep Q-Network (DQN)
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim * 20, 128)  # Adjusted for 20-day window
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)
        
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)  # Q-values for each action

# Define the DQN Agent
class DQNAgent:
    def __init__(self, state_size, action_size, gamma=0.90, lr=0.001, batch_size=32, memory_size=10000):
        self.state_size = state_size * 20  # Adjusted for 20-day window
        self.action_size = action_size
        self.gamma = gamma  # Discount factor
        self.lr = lr
        self.batch_size = batch_size
        self.memory = deque(maxlen=memory_size)
        
        self.model = DQN(self.state_size, action_size)
        self.target_model = DQN(self.state_size, action_size)
        self.target_model.load_state_dict(self.model.state_dict())  # Sync target model
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.criterion = nn.MSELoss()
        
    def select_action(self, state, epsilon=0.1):
        """Select an action using an epsilon-greedy policy."""
        if random.random() < epsilon:
            return random.randint(0, self.action_size - 1)  # Random action
        with torch.no_grad():
            return torch.argmax(self.model(state)).item()  # Best action from Q-network
    
    def store_experience(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def train(self):
        """Train the agent using experience replay."""
        if len(self.memory) < self.batch_size:
            return
        
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        states = torch.cat(states)
        actions = torch.tensor(actions, dtype=torch.long).unsqueeze(1)
        rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
        next_states = torch.cat(next_states)
        dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1)
        
        q_values = self.model(states).gather(1, actions)
        next_q_values = self.target_model(next_states).max(1)[0].unsqueeze(1)
        target_q_values = rewards + (self.gamma * next_q_values * (1 - dones))
        
        loss = self.criterion(q_values, target_q_values.detach())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    def update_target_model(self):
        """Sync the target model with the main model."""
        self.target_model.load_state_dict(self.model.state_dict())
