<a href="https://colab.research.google.com/github/manny-uncharted/pytorch-projects-learnings/blob/main/Reinforcement_Learning_(Trading_bot_sample).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import random
import numpy as np
import pandas as pd
import yfinance as yf
from collections import defaultdict




class TradingAgent:
    def __init__(self, symbol, window_size, episode_length, capital, num_shares, max_position, data):
        self.symbol = symbol
        self.window_size = window_size
        self.episode_length = episode_length
        self.max_position = max_position
        self.capital = capital
        self.num_shares = num_shares
        self.data = data  # store the stock price history in the 'data' attribute
        self.stock_price_history = []
        self.state = None
    
    def reset(self):
        self.capital = 100  # reset the capital to its initial value
        self.num_shares = 10  # reset the number of shares to its initial value
        self.stock_price_history = []  # reset the stock price history
        self.state = None  # reset the state variable

    def act(self, state, epsilon):
        """
        This method takes in a state and an epsilon value and returns the action
        to take. With probability epsilon, it will return a random action.
        Otherwise, it will return the action with the highest Q-value.
        """
        if np.random.uniform(0, 1) < epsilon:
            return np.random.randint(-self.max_position, self.max_position + 1)
        else:
            return np.argmax(state)

    def reward(self, state, action, next_state):
      """
      This method takes in a state, an action, and a next state and returns
      the reward for taking that action.
      """
      if next_state is None:
          return 1
      elif action > 0:
          return (next_state[0] - state[0]) * state[1]
      else:
          return (state[0] - next_state[0]) * state[1]


    def get_state(self, t):
      """
      This method takes in a time t and returns the state at that time.
      """
      if t < self.window_size:
          return None
      else:
          window = self.data.iloc[t - self.window_size:t]
          return (window.iloc[-1]['close'],
                  window.mean()['close'],
                  window.std()['close'])


    # def train(self, Q, alpha=0.2, gamma=0.9, epsilon=0.1, episodes=50):
    #     """
    #     This method trains the agent over a number of episodes.
    #     """
    #     for episode in range(episodes):
    #         state = self.get_state(0)
    #         total_reward = 0
    #         for t in range(1, self.episode_length):
    #             action = self.act(state, epsilon)
    #             next_state = self.get_state(t)
    #             reward = self.reward(state, action, next_state)
    #             total_reward += reward
    #             next_action = self.act(next_state, epsilon)
    #             Q[state][action] = Q[state][action] + alpha * (reward + gamma * Q[next_state][next_action] - Q[state][action])
    #             state = next_state
    #         print('Total reward for episode {}: {}'.format(episode, total_reward))




class QLearningAgent:
    def __init__(self, alpha, gamma, epsilon, max_position):
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.max_position = max_position
        self.Q = defaultdict(lambda: [0] * (2 * max_position + 1))
      
        # Initialize Q-table with zeros for all possible state-action pairs
        for i in range(-self.max_position, self.max_position + 1):
            for j in range(-1, 2):
                self.Q[(i, j)] = [0, 0]
        
    def act(self, state, epsilon):
        if np.random.uniform(0, 1) < epsilon:
            return np.random.randint(-self.max_position, self.max_position + 1)
        else:
            return np.argmax(self.Q[state])

    def get_action(self, state):
        state_tuple = tuple(state)
        if state_tuple not in self.Q:
            self.Q[state_tuple] = [0, 0, 0]
        if random.uniform(0, 1) < self.epsilon:
            action = np.random.choice(self.actions)
        else:
            q_values = self.Q[state_tuple]
            max_q_value = max(q_values)
            count = q_values.count(max_q_value)
            if count > 1:
                best_actions = [i for i in range(len(self.actions)) if q_values[i] == max_q_value]
                action_index = np.random.choice(best_actions)
            else:
                action_index = q_values.index(max_q_value)
            action = self.actions[action_index]
        return action

    def reward(self, state, action, next_state):
      if state is None or next_state is None:
          return 0
      elif action > 0:
          return (next_state[0] - state[0]) * state[1]
      else:
          return (state[0] - next_state[0]) * state[1]


    def learn(self, state, action, reward, next_state, done):
        next_action = self.act(next_state, self.epsilon)
        td_target = reward + self.gamma * self.Q[next_state][next_action]
        td_error = td_target - self.Q[state][action]
        self.Q[state][action] += self.alpha * td_error

    def update_q_table(self, state, action, reward, next_state):
        state_tuple = tuple(state.tolist())
        next_state_tuple = tuple(next_state.tolist())
        if next_state_tuple not in self.q_table:
            self.q_table[next_state_tuple] = [0, 0, 0]
        current_q = self.q_table[state_tuple][action]
        max_q = max(self.q_table[next_state_tuple])
        new_q = current_q + self.alpha * (reward + self.gamma * max_q - current_q)
        self.q_table[state_tuple][action] = new_q

    def decay_epsilon(self, factor):
        self.epsilon *= factor


def train(trading_agent, q_agent, num_episodes):
    Q = q_agent.Q
    for episode in range(num_episodes):
        trading_agent.reset()
        state = trading_agent.get_state(0)
        total_reward = 0
        for t in range(1, trading_agent.episode_length):
            action = trading_agent.act(state, q_agent.epsilon)
            next_state = trading_agent.get_state(t)
            reward = trading_agent.reward(state, action, next_state)
            total_reward += reward
            next_action = trading_agent.act(next_state, q_agent.epsilon)
            Q[state][action] = Q[state][action] + q_agent.alpha * (reward + q_agent.gamma * Q[next_state][next_action] - Q[state][action])
            state = next_state
        print('Total reward for episode {}: {}'.format(episode, total_reward))
    return Q


def test(trading_agent, q_agent):
    trading_agent.reset()
    state = trading_agent.get_state()
    done = False
    while not done:
        action = q_agent.get_action(state)
        next_state, reward, done = trading_agent.step(action)
        state = next_state
    return trading_agent.balance


In [None]:
import yfinance as yf
import pandas as pd

def download_data(symbol, start_date, end_date):
    """
    This function downloads stock price data from Yahoo Finance for the given
    symbol and date range.
    """
    # Download the data from Yahoo Finance
    data = yf.download(symbol, start=start_date, end=end_date)
    
    # Keep only the 'Close' column
    data = pd.DataFrame(data['Close'])
    
    # Rename the column to 'close'
    data.columns = ['close']
    
    # Add a new column for the date
    data['date'] = data.index
    
    # Reset the index
    data = data.reset_index(drop=True)
    
    return data

In [None]:
data = download_data('AAPL', '2020-01-01', '2022-04-06')


In [None]:
trading_agent = TradingAgent('AAPL', 10, 100, 1000, 10, 5, data)
q_agent = QLearningAgent(0.2, 0.9, 0.1, 100)
trades = train(trading_agent, q_agent, num_episodes=50)
