# DQN Technical Analysis Trading Bot

In [None]:
import gym
import numpy as np
import tensorflow as tf
from gym import spaces

In [None]:


class TradingEnv(gym.Env):
    def __init__(self, data, window_size):
        self.action_space = spaces.Discrete(3) # 0: sell, 1: hold, 2: buy
        self.observation_space = spaces.Box(low=0, high=1, shape=(window_size + 1,))
        self.data = data
        self.window_size = window_size
        self.reset()
        
    def reset(self):
        self.current_step = 0
        self.profit = 0
        self.bought_price = 0
        self.shares = 0
        return self._next_observation()

    def _next_observation(self):
        end = self.current_step + self.window_size + 1
        obs = np.array([self.data[self.current_step:end], self.shares, self.bought_price])
        return obs
    
    def step(self, action):
        assert self.action_space.contains(action)
        prev_profit = self.profit
        if action == 0: # sell
            if self.shares > 0:
                self.profit += self.data[self.current_step] - self.bought_price
                self.shares = 0
                self.bought_price = 0
        elif action == 1: # hold
            pass
        elif action == 2: # buy
            if self.profit >= self.data[self.current_step]:
                self.shares += 1
                self.bought_price = self.data[self.current_step]
                self.profit -= self.bought_price
        self.current_step += 1
        done = self.current_step >= len(self.data) - 1
        reward = self.profit - prev_profit
        return self._next_observation(), reward, done, {}


In [None]:
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

def build_model(input_shape, output_shape):
    model = Sequential()
    model.add(Dense(128, input_shape=input_shape, activation='relu'))
    model.add(Dense(128, activation='relu'))
    model.add(Dense(output_shape, activation='linear'))
    model.compile(loss='mse', optimizer=Adam())
    return model

import numpy as np
from collections import deque
import random

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model


In [None]:
from msilib.schema import SelfReg
from typing import Self

from sklearn.semi_supervised import SelfTrainingClassifier

def 

SelfReg.target_model = build_model(self.state_size, self.action_size)
Self.model = build_model(SelfTrainingClassifier.state_size, self.action_size

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.target_model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)

    def target_train(self):
        weights = self.model.get_weights()
        self.target_model.set_weights(weights)

    def decrease_epsilon(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

if __name__ == "__main__":
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    window_size = 3
    agent = DQNAgent((window_size + 1, ), 3)
    env = TradingEnv(data, window_size)

    batch_size = 32
    episodes = 1000

    for e in range(episodes):
        state = env.reset()
        state = np.reshape(state, [1, agent.state_size[0]])
        for time in range(len(data) - window_size - 1):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            next_state = np.reshape(next_state, [1, agent.state_size[0]])
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            agent.replay(batch_size)
            if done:
                agent.target_train()
                print("episode: {}/{}, score: {}".format(e, episodes, env.profit))
                break
        agent.decrease_epsilon()

