In [2]:
import random
from coup.agent import Player

In [3]:
# Create a deck with 15 cards
roles = ['duke', 'assassin', 'captain', 'ambassador', 'contessa']

# Number of players
n_players = 5
n_cards_per_hand = 2
n_center_cards = 3

In [4]:
cards = random.sample(
    roles, n_players * n_cards_per_hand + n_center_cards, counts=[3] * 5
)

## Start w/ a super simple model

In [5]:
import random
from dataclasses import dataclass, fields
from typing import List, Dict, Set, Optional, Tuple
from enum import Enum
import inspect

@dataclass
class Character:
    DUKE: str = "Duke"
    ASSASSIN: str = "Assassin"
    CAPTAIN: str = "Captain"
    AMBASSADOR: str = "Ambassador"
    CONTESSA: str = "Contessa"

@dataclass
class Action:
    INCOME: str = "Income"
    FOREIGN_AID: str = "Foreign Aid"
    COUP: str = "Coup"
    TAX: str = "Tax"  # Duke
    ASSASSINATE: str = "Assassinate"  # Assassin
    STEAL: str = "Steal"  # Captain
    EXCHANGE: str = "Exchange"  # Ambassador

n_cards_per_hand = 2
n_center_cards = 3

class Arena:
    """
    """
    def __init__(self, n_players=3):
        """
        """
        self.cards = random.sample(
            list(Character.__dataclass_fields__.keys()), n_players * n_cards_per_hand + n_center_cards, counts=[3] * 5
        )
        self.players = {}
        self.history = []
        self.player_order = []

        for i in range(n_players):
            player_id = random.randint(0, 10000000)
            idx = slice(i * n_cards_per_hand, i * n_cards_per_hand + 2)
            self.players[player_id] = Player(self.cards[idx], player_id=player_id)
            self.player_order.append(player_id)

        self.deck = self.cards[-3:]

    def take_turn(self, player_id: int) -> None:
        """
        """
        # Get all available actions for a given player_id
        valid_actions = self.get_valid_actions(player_id)

        # TODO: Make better choice
        action = random.choice(valid_actions)

        # Response to counter
        counters = {}
        for player in self.players:
            if player == player_id:
                continue
                
            counter = self.players[player].challenge_action(action)
            counters[player] = counter

        # Challenge pick
        if len(counters) > 0:
            challenger_id = random.choice(list(counters.keys()))
            challenge_response = self.players[player_id].respond_to_challenge(
                counters[challenge_player_id]
            )
        else:
            challenger_id = None
            challenge_response = None

    def get_valid_actions(self, player_id: int) -> List[Action]:
        """Get list of valid actions for the current player"""
        player = self.players[player_id]
        valid_actions = [Action.INCOME, Action.STEAL]  # Income is always valid
        
        if player.coins >= 10:
            return [Action.COUP]  # Must coup if 7+ coins

        if player.coins >= 7:
            valid_actions.append(Action.COUP)
        
        if player.coins >= 3:
            valid_actions.append(Action.ASSASSINATE)
        
        valid_actions.extend([Action.FOREIGN_AID, Action.TAX, Action.EXCHANGE])
        return valid_actions

    def resolve_actions(self, player_id, action, challenge: Tuple=None, challenge_id: int=None, challenge_response: Tuple=None) -> None:
        """
        """
        # If a challenge is issues and no response is made, end turn
        if challenge and challenge_response is None:
            if action == Action.STEAL:
                # If someone attempts to steal from another player, that player can either claim to have 
                # the captain/ambassador, or challenge that the acting player has a captain.
                # XXX: Add branch for 
                if Character.CAPTAIN not in self.players[player_id]:
                    self.players[player_id].remove_influence()
            elif action == Action.ASSASSINATE:
                # If someone attempts to assassinate another player, 
                # that player can either claim to have the contessa, or challenge that the player actually has
                # the assassin
                if Character.CAPTAIN not in self.players[player_id]:
                    self.players[player_id].remove_influence()
            elif action == Action.EXCHANGE:
                if Character.AMBASSADOR not in self.players[player_id]:
                    self.players[player_id].remove_influence()
            elif action == Action.TAX:
                if Character.DUKE not in self.players[player_id]:
                    self.players[player_id].remove_influence()

        # If challenge is made and response is issued, resolve challenge
        elif challenge and challenge_response:
            if action == Action.FOREIGN_AID:
                if Character.DUKE not in self.players[challenge_id]:
                    self.players[challenge_id].remove_influence()
            elif action == Action.ASSASSINATE:
                # If someone attempts to assassinate another player, 
                # that player can either claim to have the contessa, or challenge that the player actually has
                # the assassin
                pass
            elif action == Action.STEAL:
                # If someone attempts to steal from another player, that player can either claim to have 
                # the captain/ambassador, or challenge that the acting player has a captain. If someone
                if Character.CAPTAIN not in self.players[challenge_id] or Character.AMBASSADOR not in self.players[challenge_id]:
                    self.players[challenge_id].remove_influence()            
                
        else:
            if action == Action.INCOME:
                self.players[player_id].coins += 1
            elif action == Action.FOREIGN_AID:
                self.players[player_id].coins += 2
            elif action == Action.TAX:
                self.players[player_id].coins += 3
            elif action == Action.STEAL:
                self.players[player_id].coins += min(2, self.players[target_id].coins)
                self.players[target_id].coins -= min(2, self.players[target_id].coins)
            elif action == Action.COUP:
                self.players[target_id].remove_influence()
                self.players[player_id].coins -= 7
            elif action == Action.ASSASSINATE:
                self.players[target_id].remove_influence()
                self.players[player_id].coins -= 3
            else:
                self.players[player_id].swap_card(self.deck)

In [13]:
arena = Arena()
player_ids = list(arena.players.keys())

In [14]:
arena.players[player_ids[0]].coins += 3
arena.get_valid_actions(player_ids[0])

['Income', 'Steal', 'Assassinate', 'Foreign Aid', 'Tax', 'Exchange']

In [15]:
z = random.choice(arena.players[player_ids[0]].hand)
print (arena.players[player_ids[0]].hand)
arena.players[player_ids[0]].hand.remove(z)
print (arena.players[player_ids[0]].hand)

['ASSASSIN', 'AMBASSADOR']
['ASSASSIN']


# LSTM

In [64]:
import torch
import torch.nn as nn
import numpy as np
import tqdm
from torch.utils.data import Dataset, DataLoader

class SimpleDataset(Dataset):
    def __init__(self, seq_length=10, num_samples=1000):
        # Generate simple sequences where next number is sum of previous two
        self.sequences = []
        self.targets = []
        
        for _ in range(num_samples):
            seq = [np.random.rand(), np.random.rand()]
            for i in range(seq_length - 2):
                seq.append(seq[-1] + seq[-2])
            
            self.sequences.append(seq[:-1])
            self.targets.append(seq[-1])
            
        self.sequences = torch.FloatTensor(self.sequences)
        self.targets = torch.FloatTensor(self.targets)
        
    def __len__(self):
        return len(self.sequences)
        
    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]

class LSTMPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, dropout=0.2):
        super(LSTMPredictor, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM layer
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        # Initialize hidden state and cell state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Forward propagate LSTM
        lstm_out, _ = self.lstm(x.unsqueeze(-1), (h0, c0))
        
        # Get output from last time step
        output = self.fc(lstm_out[:, -1, :])
        return output.squeeze()

def train_model(model, train_loader, num_epochs=10, learning_rate=0.001):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    for epoch in tqdm.tqdm(range(num_epochs)):
        model.train()
        total_loss = 0
        
        for batch_sequences, batch_targets in train_loader:
            batch_sequences = batch_sequences.to(device)
            batch_targets = batch_targets.to(device)
            
            # Forward pass
            outputs = model(batch_sequences)
            loss = criterion(outputs, batch_targets)
            
            # Backward pass and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            
        avg_loss = total_loss / len(train_loader)
        #print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

def main():
    # Create dataset and dataloader
    dataset = SimpleDataset()
    train_loader = DataLoader(dataset, batch_size=32, shuffle=True)
    
    # Initialize model
    model = LSTMPredictor()
    
    # Train model
    train_model(model, train_loader, num_epochs=200)
    
    # Example prediction
    model.eval()
    with torch.no_grad():
        sample_seq = dataset.sequences[0].unsqueeze(0)
        prediction = model(sample_seq)
        actual = dataset.targets[0]
        print(f'Prediction: {prediction.item():.4f}')
        print(f'Actual: {actual.item():.4f}')

    return dataset, model

In [65]:
model = main()

100%|█████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:31<00:00,  6.34it/s]

Prediction: 26.8539
Actual: 26.5534





In [67]:
dataset, _model = model

In [68]:
sample_seq = dataset.sequences[0].unsqueeze(0)
prediction = _model(sample_seq)
actual = dataset.targets[0]