# Actividad 1
# Black Jack Utilizando RL DQN
## Dario Castro 719910
## Ingenieria Financiera

In [2]:
import random
import numpy as np
def draw():
    cards = [2,3,4,5,6,7,8,9,10,10,10,10,11]
    return random.choice(cards)

def deal():
    hand = [draw(), draw()]
    return sum(hand), hand
def Naive_blackjack():
    ##starting hand
    aces = 0
    total = random.randint(2,11)
    if total == 11:
        aces = 1
    drawn = draw()
    if drawn == 11:
        aces += 1
    while total < 17:
        total += drawn
        if total > 21 and aces > 0:
            total -= 10
            aces -= 1
        drawn = draw()
        if drawn == 11:
            aces += 1
    if total > 21:
        return 'Bust'
    return total 


def simulate_blackjack(n):
    results = {}
    for i in range(n):
        play = (Naive_blackjack())
        if play in results:
            results[play] += 1
        else:
            results[play] = 1
    return (results)

simulate_blackjack(100000)

{18: 14649, 'Bust': 30504, 19: 13964, 21: 12193, 20: 13293, 17: 15397}

In [3]:
import gymnasium as gym
from typing import Optional, Tuple, Union
from gymnasium import logger, spaces

import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
class BlackJackEnv(gym.Env):
    def __init__(self):
        super(BlackJackEnv, self).__init__()
        self.action_space = gym.spaces.Discrete(2)  # Hit or Stand
        self.observation_space = gym.spaces.Box(low=np.array([0, 0]), high=np.array([21, 4]), dtype=np.float32)
        self.reset()

    def rewardfnc(self, score):
        return {21: 1.0, 20: 0.878, 19: 0.746, 18: 0.606, 17: 0.458}.get(score, 0.303)

    def step(self, action):
        if action == 0:  # Stand
            return np.array([self.score, self.aces], dtype=np.float32), self.rewardfnc(self.score), True, {}
        
        card = draw()
        self.score += card
        if card == 11:
            self.aces += 1

        if self.score > 21 and self.aces > 0:
            self.score -= 10
            self.aces -= 1

        done = self.score > 21
        reward = -1 if done else 0
        return np.array([self.score, self.aces], dtype=np.float32), reward, done, {}

    def reset(self):
        self.score, self.hand = deal()
        self.aces = sum(1 for card in self.hand if card == 11)
        return np.array([self.score, self.aces], dtype=np.float32)

env = BlackJackEnv()

In [5]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))


class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [6]:
class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(n_observations, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, n_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)
    
    
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 1.0
EPS_END = 0.05
EPS_DECAY = 5000
LR = 1e-4
TAU = 0.005

# DQN Setup
n_actions = env.action_space.n
n_observations = 2
policy_net = DQN(n_observations, n_actions)
target_net = DQN(n_observations, n_actions)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.Adam(policy_net.parameters(), lr=LR)
memory = ReplayMemory(10000)
steps_done = 0

# Epsilon-Greedy Action Selection
def select_action(state):
    global steps_done
    eps_threshold = EPS_END + (EPS_START - EPS_END) * np.exp(-steps_done / EPS_DECAY)
    steps_done += 1
    if random.random() > eps_threshold:
        with torch.no_grad():
            return policy_net(state).argmax(dim=1).view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], dtype=torch.long)
    
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return

    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

    state_action_values = policy_net(state_batch).gather(1, action_batch)

    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values

    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1)
    optimizer.step()
    
num_episodes = 10000
total_rewards = []

for episode in range(num_episodes):
    state = env.reset()
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    episode_reward = 0

    for t in count():
        action = select_action(state)
        next_state, reward, done, _ = env.step(action.item())

        reward = torch.tensor([reward], dtype=torch.float32)
        next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0) if not done else None
        memory.push(state, action, next_state, reward)
        state = next_state

        optimize_model()
        
        episode_reward += reward.item()
        if done:
            total_rewards.append(episode_reward)
            break

    if episode % 100 == 0:
        print(f"Episode {episode}, Total Reward: {episode_reward}")

Episode 0, Total Reward: 0.30300000309944153
Episode 100, Total Reward: -1.0
Episode 200, Total Reward: 0.30300000309944153
Episode 300, Total Reward: 0.30300000309944153
Episode 400, Total Reward: 0.30300000309944153
Episode 500, Total Reward: 0.7459999918937683
Episode 600, Total Reward: 0.4580000042915344
Episode 700, Total Reward: -1.0
Episode 800, Total Reward: 0.7459999918937683
Episode 900, Total Reward: 0.30300000309944153
Episode 1000, Total Reward: 0.30300000309944153
Episode 1100, Total Reward: 0.30300000309944153
Episode 1200, Total Reward: 0.30300000309944153
Episode 1300, Total Reward: 0.878000020980835
Episode 1400, Total Reward: 0.30300000309944153
Episode 1500, Total Reward: 0.878000020980835
Episode 1600, Total Reward: -1.0
Episode 1700, Total Reward: 0.30300000309944153
Episode 1800, Total Reward: 0.4580000042915344
Episode 1900, Total Reward: 0.7459999918937683
Episode 2000, Total Reward: 0.30300000309944153
Episode 2100, Total Reward: 0.30300000309944153
Episode 22

In [7]:
# Convertir total_rewards a numpy array
total_rewards = np.array(total_rewards, dtype=np.float32)

# Contar victorias (recompensa > 0)
wins = np.sum(total_rewards > 0)
total_episodes = len(total_rewards)

# Calcular porcentaje de victorias
win_percentage = (wins / total_episodes) * 100

print(f"Total de episodios: {total_episodes}")
print(f"Episodios ganados: {wins}")
print(f"Porcentaje de victorias: {win_percentage:.2f}%")



Total de episodios: 10000
Episodios ganados: 8984
Porcentaje de victorias: 89.84%
