In [1]:
import json
import math
import random
from collections import defaultdict
from itertools import count

import numpy as np
import gymnasium as gym
from gymnasium.spaces import Discrete
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt

import games

## DQN

In [2]:
env = gym.make('games/Blackjack')

In [None]:
class UpperConfidenceBounds:
    def __init__(self):
        # total number of selections
        self.total = 0
        # number of times each action has been chosen
        self.times_selected = {}
    
    def reset(self):
        self.total = 0
        self.times_selected = {}
    
    def select(self, state, actions, qfunction):
        # First execute each action on time
        for action in range(actions.start, actions.n):
            if action not in self.times_selected.keys():
                self.times_selected[action] = 1
                self.total += 1
                return action

        max_actions = []
        max_value = float('-inf')
        for action in range(actions.start, actions.n):
            value = qfunction.get_q_value(state, action) + math.sqrt(
                (2 * math.log(self.total)) / self.times_selected[action]
            )
            if value > max_value:
                max_actions = [action]
                max_value = value
            elif value == max_value:
                max_actions += [action]
        
        # For multiple actions with highest value, choose one randomly
        result = random.choice(max_actions)
        self.times_selected[result] += 1
        self.total += 1
        return result

In [None]:
class DeepQFunction(nn.Module):
    def __init__(self, state_space, action_space, hidden_dim=32, alpha=1e-4):
        super().__init__()
        self.layer1 = nn.Linear(state_space, hidden_dim)
        self.layer2 = nn.Linear(hidden_dim, hidden_dim)
        self.layer3 = nn.Linear(hidden_dim, action_space)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

    def get_q_values(self, states, actions):
        states_tensor = torch.as_tensor(states, dtype=torch.float32)
        actions_tensor = torch.as_tensor(actions, dtype=torch.float32)
        with torch.no_grad():
            q_values = self.forward(states_tensor).gather(1, actions_tensor.unsqueeze(1))
        return q_values.squeeze(1).tolist()
    
    

In [None]:
class QLearning:
    def __init__(self, env, bandit, qfunction, gamma=0.9):
        self.env = env
        self.bandit = bandit
        self.qfunction = qfunction
        self.gamma = gamma
    
    def execute(self, episodes=2000):
        rewards = []
        for episode in range(episodes):
            observation, info = self.env.reset(seed=1)
            state = tuple(np.concatenate([observation['agent'], observation['target']]))
            actions = self.env.action_space
            action = self.bandit.select(state, actions, self.qfunction)
            episode_reward = 0.0
            for step in count():
                (next_observation, reward, terminated, truncated, info) = self.env.step(action)             
                next_state = tuple(
                    np.concatenate(
                        [next_observation['agent'], next_observation['target']]
                        )
                    )
                next_action = self.bandit.select(next_state, actions, self.qfunction)
                delta = self.get_delta(state, action, reward, next_state, next_action)
                self.qfunction.update(state, action, delta)
                state = next_state
                action = next_action
                episode_reward += reward * (self.gamma ** step)
                if terminated or truncated:
                    break
            rewards.append(episode_reward)
        return rewards
    
    def get_delta(self, state, action, reward, next_state, next_action):
        q_value = self.qfunction.get_q_value(state, action)
        next_state_value = self.state_value(next_state, next_action)
        delta = reward + self.gamma * next_state_value - q_value
        return delta
    
    def state_value(self, state, action):
        actions = self.env.action_space
        max_q_value = self.qfunction.get_max_q(state, actions)
        return max_q_value
    

class SARSA:
    def __init__(self, env, bandit, qfunction, gamma=0.9):
        self.env = env
        self.bandit = bandit
        self.qfunction = qfunction
        self.gamma = gamma
    
    def execute(self, episodes=2000):
        rewards = []
        for episode in range(episodes):
            observation, info = self.env.reset(seed=1)
            state = tuple(np.concatenate([observation['agent'], observation['target']]))
            actions = self.env.action_space
            action = self.bandit.select(state, actions, self.qfunction)
            episode_reward = 0.0
            for step in count():
                (next_observation, reward, terminated, truncated, info) = self.env.step(action)             
                next_state = tuple(
                    np.concatenate(
                        [next_observation['agent'], next_observation['target']]
                        )
                    )
                next_action = self.bandit.select(next_state, actions, self.qfunction)
                delta = self.get_delta(state, action, reward, next_state, next_action)
                self.qfunction.update(state, action, delta)
                state = next_state
                action = next_action
                episode_reward += reward * (self.gamma ** step)
                if terminated or truncated:
                    break
            rewards.append(episode_reward)
        return rewards
    
    def get_delta(self, state, action, reward, next_state, next_action):
        q_value = self.qfunction.get_q_value(state, action)
        next_state_value = self.state_value(next_state, next_action)
        delta = reward + self.gamma * next_state_value - q_value
        return delta
    
    def state_value(self, state, action):
        actions = self.env.action_space
        max_q_value = self.qfunction.get_q_value(state, action)
        return max_q_value


def get_ema(rewards, smoothing_factor=0.9):
    smoothed_rewards = []
    for reward in rewards:
        if smoothed_rewards == []:
            smoothed_rewards = [reward]
        else:
            smoothed_rewards += [
                smoothed_rewards[-1] * smoothing_factor
                + reward * (1 - smoothing_factor)
            ]
    return smoothed_rewards