In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
import time
import os

In [2]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim = 0, is_value_net=False):
        super(LSTMModel, self).__init__()
        self.is_value_net = is_value_net
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim if not is_value_net else 1)

    def forward(self, x, hidden):
        out, hidden = self.lstm(x, hidden)
        if self.is_value_net:
            return self.fc(out[:, -1, :]), hidden
        else:  # Policy network
            return torch.softmax(self.fc(out[:, -1, :]), dim=-1), hidden

    def init_hidden(self, batch_size):
        return (torch.zeros(1, batch_size, self.lstm.hidden_size),
                torch.zeros(1, batch_size, self.lstm.hidden_size))

In [3]:
SUITES = ['H','D','C','S']

RANKS = ['6', '7', '8', '9', 'J', 'Q', 'K', 'A']

CARDS = [f'{a}{r}' for a in SUITES for r in RANKS]

def sym_to_cat(symbol, d):
    return [1 if s in symbol else 0 for s in d]

def input_arr(alpha, c_hand, hand, desk_my, desk_enemy):
    return np.array(sym_to_cat(alpha, SUITES) + sym_to_cat(c_hand, SUITES) + sym_to_cat(hand, CARDS) + sym_to_cat(desk_my, CARDS) + sym_to_cat(desk_enemy, CARDS))

In [4]:
class OomiEnvironment:
    def __init__(self, policy_net, value_net):
        self.players = []
        self.desk = [0] * 4  # Desk cards played by each player in a round
        self.p_rewards = [0] * 4  # Rewards for each player
        self.c_player = None  # Current player
        self.cards = CARDS.copy()  # Deck of cards
        self.trump = np.random.choice(SUITES)  # Randomly chosen trump suit
        self.hand = 'z'  # Current leading suit in play

        # Initialize players
        for i in range(4):
            self.players.append(Player(self, policy_net, value_net, i % 2, i))

    def round_reset(self):
        self.hand = 'z'
        self.desk = [0] * 4
        self.p_rewards = [0] * 4

    def reset(self):
        self.deal_cards()
        self.desk = [0] * 4
        self.p_rewards = [0] * 4
        self.c_player = None
        self.hand = 'z'
        for player in self.players:
            player.reset_hidden()
        return self.desk

    def deal_cards(self):
        random.shuffle(self.cards)
        for i in range(4):
            self.players[i].actions = self.cards[i * 8:(i + 1) * 8]

    def next_player(self):
        if self.c_player is None:
            self.c_player = 0

        if self.desk[self.c_player] != 0:
            if self.c_player == 3:
                self.c_player = 0
            else:
                self.c_player += 1
        return self.c_player

    def card_to_val(self, card, alpha):
        v = CARDS.index(card) % 8
        if alpha in card:
            v += 16
        elif self.hand in card:
            v += 8
        return v

    def desk_winner(self):
        print(self.desk)
        values = [self.card_to_val(card, self.trump) for card in self.desk]
        max_value = max(values)
        max_indices = [i for i, value in enumerate(values) if value == max_value]

        # Check if the max value is unique
        is_unique = len(max_indices) == 1

        return is_unique, max_indices

    def step(self, action, player_id):
        # Ensure the action is valid
        if action not in self.players[player_id].actions:
            return self.desk, -10, False

        # Penalize if the player did not follow the suit
        if self.desk == [0, 0, 0, 0]:
            self.hand = action[0]
        else:
            if self.hand != action[0]:
                if any(self.hand in a for a in self.players[player_id].actions):
                    return self.desk, -10, False

        pre_state = self.desk.copy()
        self.desk[player_id] = action
        self.players[player_id].actions.remove(action)

        if 0 not in self.desk:
            is_win, winner = self.desk_winner()
            if is_win:
                winning_player = winner[0]
                self.p_rewards[winning_player] += 1
                if winning_player % 2 == 0:
                    self.p_rewards[0] += 1
                    self.p_rewards[2] += 1
                else:
                    self.p_rewards[1] += 1
                    self.p_rewards[3] += 1
                self.c_player = winning_player
            else:
                for i in winner:
                    self.p_rewards[i] -= 5

            self.hand = 'z'
            self.desk = [0] * 4
            return pre_state, 0, True

        return pre_state, 0, False

In [5]:
# Define the game environment (stub example)
class OomiEnvironment:
    def __init__(self, policy_net, value_net):
        self.players = []
        self.desk = [0] * 4
        self.p_rewards = [0] * 4
        self.c_player = None
        self.cards = CARDS.copy()
        self.trump = np.random.choice(SUITES)
        self.hand = 'z'
        self.team_score = [0] * 2

        for i in range(4):
            self.players.append(Player(self, policy_net, value_net, i%2, i))

    def reset_round(self):
        self.hand = 'z'
        self.desk = [0] * 4
        self.p_rewards = [0] * 4


    def reset(self):
        self.deal_cards()
        self.desk = [0] * 4
        self.p_rewards = [0] * 4
        self.c_player = None
        self.hand = 'z'
        for player in self.players:
            player.reset_hidden()

        return self.desk

    def desk_winner(self):
        values = [self.card_to_val(card, self.trump) for card in self.desk]
        max_value = max(values)

        return values.index(max_value)

    def prep_rewards(self, rewards):
        ret = []
        for i, r in enumerate(self.p_rewards):
            ret.append([l + r for l in rewards[i]])

        self.p_rewards = [0] * 4
        return ret

    def next_player(self):
        if self.c_player is None or self.c_player == 3:
            self.c_player = 0
        else:
            self.c_player += 1

        return self.c_player

    def deal_cards(self):
        random.shuffle(self.cards)
        for i in range(4):
            self.players[i].actions = self.cards[i*8:(i+1)*8]

    def card_to_val(self, card, alpha):
        v = CARDS.index(card) % 8

        if alpha in card:
            v += 16
        elif self.hand in card:
            v += 8

        return v

    def step(self, action, player_id):
        if action not in self.players[player_id].actions:
            return self.desk, -10, False, False

        # Penalize if the action card has already been played in this round
#         if any(self.card_to_val(card, self.alpha, self.players[player_id].actions) == self.card_to_val(action, self.alpha, self.players[player_id].actions) for card in self.desk):
#             return self.state, -2, False

        if self.desk == [0, 0, 0, 0]:
            self.hand = action[0]
        else:
            if self.hand != action[0]:
                if any(self.hand in a for a in self.players[player_id].actions):
                    return self.desk, -10, False, False
        self.desk[player_id] = action
        self.players[player_id].actions.remove(action)

        if 0 not in self.desk:
            winner = self.desk_winner()
            self.p_rewards[winner] += 1
            if winner%2==0:
                self.p_rewards[0] += 1
                self.p_rewards[2] += 1
                self.team_score[0] += 1
            else:
                self.p_rewards[1] += 1
                self.p_rewards[3] += 1
                self.team_score[1] += 1


            if self.players[player_id].actions == []:
                return self.desk, 0, True, True
            else:
                return self.desk, 0, True, False

        return self.desk, 0, False, False

In [6]:
class Player:
    def __init__(self, env, policy_net, value_net, team, id):
        self.policy_net = policy_net
        self.value_net = value_net
        self.env = env
        self.actions = []
        self.pre_rewards = []
        self.policy_hidden = self.policy_net.init_hidden(1)
        self.value_hidden = self.value_net.init_hidden(1)
        self.team = team
        self.id = id


    def reset_hidden(self):
        self.policy_hidden = self.policy_net.init_hidden(1)
        self.value_hidden = self.value_net.init_hidden(1)


    def play(self):
        state = torch.tensor(input_arr(self.env.trump, self.env.hand, self.actions, [c for i, c in enumerate(self.env.desk) if i%2 == self.team], [c for i, c in enumerate(self.env.desk) if i%2 != self.team]), dtype=torch.float32).unsqueeze(0).unsqueeze(0)

        # state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
        action_probs, self.policy_hidden = self.policy_net(state, self.policy_hidden)

        try:
            action = torch.multinomial(action_probs, 1).item()
        except RuntimeError:
            print(f'action probs - {action_probs}')
            print(f'state - {state}')
            print(f'policy_hidden - {self.policy_hidden}')
            raise RuntimeError

        next_state, reward, r_done, done = self.env.step(CARDS[action], self.id)

        return state,action,reward, r_done, done

In [7]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, states, actions, rewards):
        self.buffer.append((states, actions, rewards))

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)

In [8]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [9]:
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()

2024-07-06 16:25:29.633709: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-06 16:25:29.633887: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-06 16:25:29.818743: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [10]:
from math import e
import time

time.sleep(3)

# Define paths to save and load the models
policy_model_path = 'policy_model.pth'
value_model_path = 'value_model.pth'
backup_directory = '/content/drive/My Drive/Colab Notebooks/Oomi Backups' # Replace with your path


# Initialize the models and optimizers
input_dim = 104
hidden_dim = 128
output_dim = 32

batch_size = 32


# Initialize Replay Buffer
replay_buffer = ReplayBuffer(capacity=1000)  # Adjust capacity as needed

# policy_net = LSTMModel(input_dim, hidden_dim, output_dim)
# value_net = LSTMModel(input_dim, hidden_dim, output_dim, is_value_net=True)

# Load existing models if available, otherwise create new ones
if os.path.exists(policy_model_path):
    policy_net = torch.load(policy_model_path)
else:
    policy_net = LSTMModel(input_dim, hidden_dim, output_dim)

if os.path.exists(value_model_path):
    value_net = torch.load(value_model_path)
else:
    value_net = LSTMModel(input_dim, hidden_dim, output_dim, is_value_net=True)


env = OomiEnvironment(policy_net, value_net)
policy_optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
value_optimizer = optim.Adam(value_net.parameters(), lr=1e-3)

# Training loop
num_episodes = 100
gamma = 0.99

for episode in range(num_episodes):
    # Reset hidden states for all players
    for player in env.players:
        player.reset_hidden()

    desk = env.reset()
    done = False
    states = [[] for _ in range(4)]
    actions = [[] for _ in range(4)]
    rewards = [[] for _ in range(4)]

    for _ in range(8):
        r_done = False
        r_rewards = [[] for _ in range(4)]
        for id in range(4):
              state, action, reward, r_done, done = env.players[id].play()
              states[id].append(state.tolist())
              actions[id].append(action)
              r_rewards[id].append(reward)

        r_rewards = env.prep_rewards(r_rewards)
        for i in range(4):
            rewards[i].extend(r_rewards[i])
        env.reset_round()

    # add some overoll rewards
    for i in range(4):
        env.p_rewards[i] = env.team_score[i%2]
    # if env.team_score[0] > env.team_score[1]:
    #     env.p_rewards[0] += 5
    #     env.p_rewards[2] += 5
    # elif env.team_score[0] < env.team_score[1]:
    #     env.p_rewards[1] += 5
    #     env.p_rewards[3] += 5

    rewards = env.prep_rewards(rewards)

    for i in range(4):
      replay_buffer.push(torch.tensor(states[i], dtype=torch.float32).squeeze(1),
                         torch.tensor(actions[i], dtype=torch.int),
                         torch.tensor(rewards, dtype=torch.float32))

    # Sample from the replay buffer and update the networks
    if len(replay_buffer) > batch_size:
        policy_losses = []
        value_losses = []

        for states, actions, rewards  in replay_buffer.sample(batch_size):
            sample_size = states.size(0)

            # Policy Network Forward Pass
            policy_hidden = policy_net.init_hidden(sample_size)
            action_probs, policy_hidden = policy_net(states, policy_hidden)
            selected_action_probs = action_probs[np.arange(sample_size), actions]

            # Value Network Forward Pass
            value_hidden = value_net.init_hidden(sample_size)
            state_values, value_hidden = value_net(states, value_hidden)
            state_values = state_values.squeeze()

            advantages = rewards - state_values

            # Compute policy loss and value loss
            policy_loss = -torch.mean(torch.log(selected_action_probs + 1e-8) * advantages.detach())
            value_loss = torch.mean((state_values - rewards) ** 2)

            policy_losses.append(policy_loss.item())
            value_losses.append(value_loss.item())

            total_loss = policy_loss + value_loss

            policy_optimizer.zero_grad()
            value_optimizer.zero_grad()

            total_loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 0.5)
            torch.nn.utils.clip_grad_norm_(value_net.parameters(), 0.5)

            policy_optimizer.step()
            value_optimizer.step()

        policy_loss_mean = torch.mean(torch.tensor(policy_losses, dtype=torch.float32))
        value_loss_mean = torch.mean(torch.tensor(value_losses, dtype=torch.float32))

        writer.add_scalar("Loss/policy", policy_loss_mean.item(), episode)
        writer.add_scalar("Loss/value", value_loss_mean.item(), episode)


    # Save the models after each episode
    torch.save(policy_net, policy_model_path)
    torch.save(value_net, value_model_path)
    os.makedirs(backup_directory, exist_ok=True)
    torch.save(policy_net, os.path.join(backup_directory, f'policy_model.pth'))
    torch.save(value_net, os.path.join(backup_directory, f'value_model.pth'))

    try:
        if episode % 1000 == 0:
            print(f'Episode {episode}, Policy Loss: {policy_loss_mean.item()}, Value Loss: {value_loss_mean.item()}')
    except NameError:
        pass


writer.flush()
writer.close()

In [11]:
%tensorboard --logdir runs