In [None]:
import os
import json
import math
import random
import networkx as nx
import torch
import torch.nn as nn
import torch.optim as optim
import torch_geometric.transforms as T
from torch_geometric.nn import GCNConv, GATConv, BatchNorm
from torch_geometric.data import Data, DataLoader
from torch_geometric.utils import from_networkx, negative_sampling
from sklearn.preprocessing import MinMaxScaler
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
text_model = SentenceTransformer('all-MiniLM-L6-v2')
EFFECT_DIM = 384  # Dimension of the effect embedding

In [5]:
def load_card_attributes(file="data/labeled_cards.json"):
    """
    Load card attributes from JSON.
    For each card, compute a text embedding for its "effect" using the sentence transformer.
    """
    with open(file, "r") as f:
        cards = json.load(f)
    card_dict = {}
    for card in cards:
        effect_text = card.get("effect", "").strip()
        if effect_text:
            effect_embedding = text_model.encode(effect_text)
        else:
            effect_embedding = np.zeros(EFFECT_DIM)
        card["effect_embedding"] = effect_embedding.tolist()
        card_dict[card["id"]] = card
    return card_dict

def load_card_synergies(file="data/card_synergies.graphml"):
    """
    Load known synergy relationships from a GraphML file.
    """
    return nx.read_graphml(file)

In [6]:
def normalize_features(features):
    scaler = MinMaxScaler()
    return scaler.fit_transform(features)

def convert_to_pyg(graphs, card_data):
    """
    Converts a list of NetworkX graphs into PyTorch Geometric data objects.
    Each node (card) feature vector includes:
      - Basic numeric features (copies, cost, power, count(labels), counter)
      - One-hot encoding for colors
      - Hashed features for type and traits (as proxies)
      - The effect text embedding (EFFECT_DIM dimensions)
    We remove any unwanted edge attributes.
    """
    pyg_graphs = []
    base_feature_list = ["copies", "cost", "power", "labels", "counter", "type", "traits",
                         "color_Red", "color_Green", "color_Blue", "color_Purple", "color_Black", "color_Yellow"]
    effect_feature_list = [f"effect_{i}" for i in range(EFFECT_DIM)]
    full_feature_list = base_feature_list + effect_feature_list

    for G in graphs:
        # Remove all edge attributes.
        for u, v, d in G.edges(data=True):
            d.clear()

        node_features = []
        for node in G.nodes():
            card_id = node
            card = card_data.get(card_id, {})
            features = [
                float(G.nodes[node].get("copies", 1)),
                float(card.get("cost", 0)),
                float(card.get("power", 0)),
                float(len(card.get("labels", []))),
                float(card.get("counter", 0))
            ]
            colors = ["Red", "Green", "Blue", "Purple", "Black", "Yellow"]
            features.extend([1.0 if c in card.get("color", []) else 0.0 for c in colors])
            # Use hash mod 10 for type and traits as a simple categorical proxy.
            features.append(float(hash(card.get("type", "None")) % 10))
            features.append(float(hash(" ".join(card.get("traits", []))) % 10))
            effect_emb = card.get("effect_embedding", [0] * EFFECT_DIM)
            features.extend(effect_emb)
            node_features.append(features)
        
        if not node_features:
            continue

        normalized_features = normalize_features(node_features)
        for i, node in enumerate(G.nodes()):
            for j, attr in enumerate(full_feature_list):
                G.nodes[node][attr] = normalized_features[i][j]
        # Do not specify group_edge_attrs so edge attributes are ignored.
        pyg_data = from_networkx(G, group_node_attrs=full_feature_list)
        pyg_graphs.append(pyg_data)
    return pyg_graphs

In [7]:
class SynergyGNN(nn.Module):
    def __init__(self, in_dim, hidden_dim=64, out_dim=None):
        super(SynergyGNN, self).__init__()
        if out_dim is None:
            out_dim = in_dim
        self.conv1 = GCNConv(in_dim, hidden_dim)
        self.bn1 = BatchNorm(hidden_dim)
        self.conv2 = GATConv(hidden_dim, hidden_dim)
        self.bn2 = BatchNorm(hidden_dim)
        self.conv3 = GCNConv(hidden_dim, out_dim)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index).relu()
        x = self.bn1(x)
        x = self.dropout(x)
        x = self.conv2(x, edge_index).relu()
        x = self.bn2(x)
        x = self.dropout(x)
        x = self.conv3(x, edge_index)
        return x

In [18]:
def train_synergy_gnn(deck_graphs, card_data, epochs=50, lr=0.001, batch_size=16):
    """
    Train the GNN on deck graphs (winning decks) to learn latent representations.
    """
    pyg_data = convert_to_pyg(deck_graphs, card_data)
    loader = DataLoader(pyg_data, batch_size, shuffle=True)
    num_features = pyg_data[0].num_features
    model = SynergyGNN(in_dim=num_features, out_dim=num_features)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        total_loss = 0
        for data in loader:
            optimizer.zero_grad()
            x, edge_index = data.x, data.edge_index
            pred = model(x.float(), edge_index)
            pred = pred.view_as(x)
            loss = criterion(pred, x.float())
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Loss: {total_loss:.4f}")
    return model, num_features


In [9]:
def save_model(model, input_dim, path="graphs/synergy_gnn.pth"):
    torch.save({
        'model_state_dict': model.state_dict(),
        'input_dim': input_dim
    }, path)
    print(f"Model saved to {path}")

def load_model(model_path="graphs/synergy_gnn.pth", hidden_dim=32, out_dim=16):
    checkpoint = torch.load(model_path, map_location=torch.device('cpu'))
    input_dim = checkpoint['input_dim']
    model = SynergyGNN(in_dim=input_dim, hidden_dim=hidden_dim, out_dim=out_dim)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()
    return model, input_dim

In [10]:
def compute_card_embeddings(model, card_data):
    """
    Build a complete card graph (nodes only) and compute an embedding for every card.
    """
    G = nx.Graph()
    for card_id in card_data.keys():
        G.add_node(card_id)
    pyg_data = convert_to_pyg([G], card_data)[0]
    x, edge_index = pyg_data.x, pyg_data.edge_index
    with torch.no_grad():
        embeddings = model(x.float(), edge_index).numpy()
    card_ids = list(G.nodes())
    emb_dict = {cid: embeddings[i] for i, cid in enumerate(card_ids)}
    return emb_dict

In [11]:
def compute_pairwise_synergy(emb_dict, synergy_graph, alpha=2.0, beta=1.0):
    """
    For each card pair, compute a synergy score based on:
      - α * cosine similarity of GNN embeddings, and
      - a bonus (β) if a known synergy exists in synergy_graph.
    Increasing alpha prioritizes the effect (learned via the GNN).
    """
    card_ids = list(emb_dict.keys())
    emb_matrix = np.array([emb_dict[cid] for cid in card_ids])
    cos_sim = cosine_similarity(emb_matrix)
    synergy_dict = {}
    known_synergy = set()
    for u, v, data in synergy_graph.edges(data=True):
        known_synergy.add(tuple(sorted([u, v])))
    n = len(card_ids)
    for i in range(n):
        for j in range(i, n):
            cid_i = card_ids[i]
            cid_j = card_ids[j]
            base = alpha * cos_sim[i, j]
            bonus = beta if tuple(sorted([cid_i, cid_j])) in known_synergy else 0.0
            synergy_dict[(cid_i, cid_j)] = base + bonus
            synergy_dict[(cid_j, cid_i)] = base + bonus
    return synergy_dict

In [12]:
def determine_copies(card):
    """
    Determine the maximum allowed copies for a card.
    Key cards (labeled "Main") are allowed 4 copies.
    Event cards (or spice cards) are allowed only 1 copy.
    Expensive cards (cost >= 5) are limited to 2-3 copies.
    Other cards are allowed 2-3 copies.
    """
    if card["type"] == "Leader":
        return 1
    if "Main" in card.get("labels", []):
        return 4
    if card["type"] == "Event":
        return 1
    if card.get("cost", 0) >= 5:
        return random.choice([2, 3])
    return random.choice([2, 3])

In [13]:
def evaluate_deck(deck, card_data, synergy_dict, w1=1.0, w2=0.5, w3=0.4, w4=0.6):
    """
    Evaluate a deck (dictionary mapping card_id to count) by combining:
      - Individual card quality (cost efficiency, power, consistency)
      - Pairwise synergy contributions.
    """
    total_individual = 0
    for cid, count in deck.items():
        card = card_data[cid]
        cost_eff = 1 / (card.get("cost", 0) + 1)
        power_score = card.get("power", 0) / 10000
        consistency = 1.0 if "Searcher" in card.get("labels", []) else 0.5
        ind_score = w2 * cost_eff + w3 * power_score + w4 * consistency
        total_individual += count * ind_score

    total_synergy = 0
    card_list = []
    for cid, count in deck.items():
        card_list.extend([cid] * count)
    n = len(card_list)
    for i in range(n):
        for j in range(i + 1, n):
            cid_i = card_list[i]
            cid_j = card_list[j]
            total_synergy += synergy_dict.get((cid_i, cid_j), 0)
    objective = w1 * total_synergy + total_individual
    return objective

In [44]:
def generate_deck_greedy(model, card_data, deck_size=50, leader=None):
    """
    Build an initial legal deck by ranking cards based on a composite score
    that includes learned synergy (from embeddings) and intrinsic features.
    This version also:
      - Penalizes cards with no effect text.
      - Does not filter out cards that don't share leader traits.
      - Uses a helper function to determine maximum allowed copies for variety.
    """
    banned_cards = {"ST10-001", "OP03-098", "OP05-041", "ST06-015", "OP06-116", "OP02-024", "OP02-052"}
    emb_dict = compute_card_embeddings(model, card_data)
    effect_weight = 3.0  # Increase effect importance in the scoring
    
    def card_score(cid):
        card = card_data[cid]
        cost_eff = 1 / (card.get("cost", 0) + 1)
        power_score = card.get("power", 0) / 10000
        consistency = 1.0 if "Searcher" in card.get("labels", []) else 0.5
        synergy_score = effect_weight * emb_dict[cid].sum()
        # Penalize cards with no effect text.
        effect_penalty = -10.0 if not card.get("effect", "").strip() else 0.0
        return synergy_score + cost_eff + power_score + consistency + effect_penalty

    ranked_cards = sorted(card_data.keys(), key=card_score, reverse=True)
    
    # Choose a legal leader.
    if leader is None:
        for cid in ranked_cards:
            if card_data[cid]["type"] == "Leader" and cid not in banned_cards:
                leader = cid
                break

    else:
        if leader not in card_data or card_data[leader]["type"] != "Leader":
            raise ValueError("Invalid Leader specified.")
    if leader is None:
        raise ValueError("No valid Leader found.")
    
    leader_color = set(card_data[leader]["color"])
    
    deck = {}
    total_cards = 0
    for cid in ranked_cards:
        if total_cards >= deck_size:
            break
        if cid == leader or cid in banned_cards or card_data[cid]["type"] == "Leader":
            continue
        # Enforce color consistency (if desired).
        if not leader_color.intersection(set(card_data[cid]["color"])):
            continue
        max_copies = determine_copies(card_data[cid])
        copies = min(max_copies, deck_size - total_cards)
        deck[cid] = copies
        total_cards += copies

    if total_cards != deck_size:
        raise ValueError(f"Greedy deck underfilled: only {total_cards} cards selected.")
    return leader, deck

In [15]:
def optimize_deck(leader, deck, card_data, synergy_dict, deck_size=50,
                  iterations=2000, initial_temp=1.0, cooling_rate=0.995):
    """
    Refine the deck by swapping cards via simulated annealing,
    maximizing the overall objective.
    """
    current_deck = deck.copy()
    best_deck = deck.copy()
    current_obj = evaluate_deck(current_deck, card_data, synergy_dict)
    best_obj = current_obj
    temp = initial_temp

    leader_color = set(card_data[leader]["color"])
    leader_traits = set(card_data[leader].get("traits", []))
    eligible = [cid for cid in card_data if cid != leader and
                cid not in {"ST10-001", "OP03-098", "OP05-041", "ST06-015", "OP06-116", "OP02-024", "OP02-052"} and
                card_data[cid]["type"] != "Leader" and
                leader_color.intersection(set(card_data[cid]["color"])) and
                leader_traits.intersection(set(card_data[cid].get("traits", [])))]
    
    current_count = sum(current_deck.values())
    if current_count != deck_size:
        raise ValueError("Initial deck size constraint violated.")
    
    for it in range(iterations):
        remove_candidate = random.choice(list(current_deck.keys()))
        if current_deck[remove_candidate] <= 0:
            continue
        candidate = random.choice(eligible)
        new_deck = current_deck.copy()
        new_deck[remove_candidate] -= 1
        if new_deck[remove_candidate] == 0:
            del new_deck[remove_candidate]
        # Respect maximum copies for the candidate.
        max_copies_candidate = determine_copies(card_data[candidate])
        if new_deck.get(candidate, 0) + 1 > max_copies_candidate:
            continue
        new_deck[candidate] = new_deck.get(candidate, 0) + 1
        new_obj = evaluate_deck(new_deck, card_data, synergy_dict)
        delta = new_obj - current_obj
        if delta > 0 or random.random() < math.exp(delta / temp):
            current_deck = new_deck
            current_obj = new_obj
            if current_obj > best_obj:
                best_deck = current_deck.copy()
                best_obj = current_obj
        temp *= cooling_rate
    print(f"Optimization complete. Best objective: {best_obj:.4f}")
    return best_deck

In [16]:
card_data = load_card_attributes("data/labeled_cards.json")
synergy_graph = load_card_synergies("graphs/card_synergies.graphml")

# Load deck graphs (sample winning decks) from a folder (GraphML files).
deck_folder = "graphs/deck-lists"
deck_graphs = []
for file in os.listdir(deck_folder):
    if file.endswith(".graphml"):
        G = nx.read_graphml(os.path.join(deck_folder, file))
        deck_graphs.append(G)

print(f"Loaded {len(deck_graphs)} deck graphs.")

Loaded 1732 deck graphs.


In [24]:
print("Training GNN...")
model, input_dim = train_synergy_gnn(deck_graphs, card_data, epochs=200, batch_size=80, lr=0.001)

Training GNN...




Epoch 0, Loss: 11.4976
Epoch 10, Loss: 2.0486
Epoch 20, Loss: 1.8138
Epoch 30, Loss: 1.7139
Epoch 40, Loss: 1.6251
Epoch 50, Loss: 1.5264
Epoch 60, Loss: 1.4451
Epoch 70, Loss: 1.3888
Epoch 80, Loss: 1.3445
Epoch 90, Loss: 1.3127
Epoch 100, Loss: 1.2802
Epoch 110, Loss: 1.2660
Epoch 120, Loss: 1.2378
Epoch 130, Loss: 1.2175
Epoch 140, Loss: 1.1947
Epoch 150, Loss: 1.1892
Epoch 160, Loss: 1.1757
Epoch 170, Loss: 1.1625
Epoch 180, Loss: 1.1517
Epoch 190, Loss: 1.1441


In [25]:
save_model(model, input_dim)

Model saved to graphs/synergy_gnn.pth


In [26]:
print("Computing card embeddings...")
emb_dict = compute_card_embeddings(model, card_data)

Computing card embeddings...


In [27]:
print("Computing pairwise synergy scores...")
synergy_dict = compute_pairwise_synergy(emb_dict, synergy_graph, alpha=2.0, beta=1.0)

Computing pairwise synergy scores...


In [28]:
print("Generating initial deck...")
leader, init_deck = generate_deck_greedy(model, card_data, deck_size=50)
init_obj = evaluate_deck(init_deck, card_data, synergy_dict)
print(f"Initial deck objective: {init_obj:.4f}")

initial_deck_list = [f"1x{leader}"]
for card, count in init_deck.items():
    initial_deck_list.append(f"{count}x{card}")

print("Initial Deck List:")
print("\n".join(initial_deck_list))

Generating initial deck...
Initial deck objective: 1959.9707
Initial Deck List:
1xOP04-040
2xOP05-047
2xOP04-053
2xOP04-100
2xOP02-063
2xST12-010
2xST13-006
3xEB01-024
3xOP07-107
2xOP02-059
2xOP06-114
3xOP07-054
3xOP01-068
3xOP01-078
2xOP06-052
3xST09-003
3xOP05-118
3xOP03-106
4xOP08-103
3xST07-012
1xOP04-101


In [29]:
print("Optimizing deck using simulated annealing...")
optimized_deck = optimize_deck(leader, init_deck, card_data, synergy_dict,
                                   deck_size=50, iterations=2000)

deck_list = [f"1x{leader}"]
for cid, count in optimized_deck.items():
    deck_list.append(f"{count}x{cid}")

print("Optimized Deck List:")
print("\n".join(deck_list))

Optimizing deck using simulated annealing...
Optimization complete. Best objective: 2541.5220
Optimized Deck List:
1xOP04-040
2xST12-010
3xOP07-107
2xOP07-054
2xOP06-052
3xST09-003
3xOP05-118
2xST07-012
1xOP04-101
4xOP04-055
4xOP04-052
3xST09-004
3xOP04-047
3xOP04-045
3xOP04-048
3xOP04-049
3xOP05-049
3xEB01-024
3xOP05-043


In [42]:
# Move evaluation and storing of the decklist to functions, then add deck optimization.

def build_and_save_synergy_graph(deck, synergy_dict, output_path):
    G = nx.Graph()
    cards = [cid for cid, count in deck.items() for _ in range(count)]
    for cid in cards:
        if not G.has_node(cid):
            G.add_node(cid)
    for i, cid_a in enumerate(cards):
        for j, cid_b in enumerate(cards):
            if j > i:
                syn_score = synergy_dict.get((cid_a, cid_b), 0)
                G.add_edge(cid_a, cid_b, synergy=syn_score)
    nx.write_graphml(G, output_path)

def save_deck_list(leader_cid, deck, output_path):
    with open(output_path, "w") as f:
        f.write(f"1x{leader_cid}\n")
        for cid, count in deck.items():
            f.write(f"{count}x{cid}\n")

    print(f"Deck list saved to {output_path}")

def process_deck(leader_cid, index, model, card_data, synergy_dict, stats_file,
                 deck_graph_dir, deck_list_dir, deck_size=50):
    # Generate and evaluate initial deck
    ldr_str, deck = generate_deck_greedy(model, card_data, deck_size=deck_size, leader=leader_cid)
    score_init = evaluate_deck(deck, card_data, synergy_dict)

    # Optimize and evaluate optimized deck
    deck_opt = optimize_deck(leader_cid, deck, card_data, synergy_dict, deck_size=deck_size)
    score_opt = evaluate_deck(deck_opt, card_data, synergy_dict)

    # Store results
    with open(stats_file, "a") as f:
        f.write(f"{leader_cid},{index},{score_init:.4f},{score_opt:.4f}\n")

    # Save synergy graphs and deck lists
    graph_path_init = os.path.join(deck_graph_dir, "initial", f"deck_{index}.graphml")
    list_path_init = os.path.join(deck_list_dir, "initial", f"deck_{index}.txt")
    build_and_save_synergy_graph(deck, synergy_dict, graph_path_init)
    save_deck_list(ldr_str, deck, list_path_init)

    graph_path_opt = os.path.join(deck_graph_dir, "optimized", f"deck_{index}.graphml")
    list_path_opt = os.path.join(deck_list_dir, "optimized", f"deck_{index}.txt")
    build_and_save_synergy_graph(deck_opt, synergy_dict, graph_path_opt)
    save_deck_list(ldr_str, deck_opt, list_path_opt)

    print(f"[Leader {leader_cid}] Deck {index} generated (score={score_init:.4f}), optimized (score={score_opt:.4f}).")

In [45]:
# Main code to iterate over leaders and decks (with updated stats file header).
output_dir = "evaluation"
os.makedirs(output_dir, exist_ok=True)
stats_file = os.path.join(output_dir, "stats.csv")
possible_leaders = [cid for cid in card_data if card_data[cid]["type"] == "Leader"]

with open(stats_file, "w") as f:
    f.write("leader,deck_index,score_init,score_opt\n")

for leader_cid in possible_leaders:
    deck_graph_dir = os.path.join(output_dir, "decks", leader_cid, "graphs")
    deck_list_dir = os.path.join(output_dir, "decks", leader_cid, "lists")

    os.makedirs(deck_graph_dir, exist_ok=True)
    os.makedirs(os.path.join(deck_graph_dir, "initial"), exist_ok=True)
    os.makedirs(os.path.join(deck_graph_dir, "optimized"), exist_ok=True)
    os.makedirs(deck_list_dir, exist_ok=True)
    os.makedirs(os.path.join(deck_list_dir, "initial"), exist_ok=True)
    os.makedirs(os.path.join(deck_list_dir, "optimized"), exist_ok=True)

    for i in range(1000):
        try:
            process_deck(
                leader_cid, i+1, model, card_data, synergy_dict, stats_file,
                deck_graph_dir, deck_list_dir, deck_size=50
            )
        except Exception as e:
            print(f"[Leader {leader_cid}] Deck {i+1} failed: {e}")

Optimization complete. Best objective: 3155.0032
Deck list saved to evaluation\decks\OP01-003\lists\initial\deck_1.txt
Deck list saved to evaluation\decks\OP01-003\lists\optimized\deck_1.txt
[Leader OP01-003] Deck 1 generated (score=1372.0535), optimized (score=3155.0032).
Optimization complete. Best objective: 3137.8003
Deck list saved to evaluation\decks\OP01-003\lists\initial\deck_2.txt
Deck list saved to evaluation\decks\OP01-003\lists\optimized\deck_2.txt
[Leader OP01-003] Deck 2 generated (score=1223.6913), optimized (score=3137.8003).
Optimization complete. Best objective: 3181.3621
Deck list saved to evaluation\decks\OP01-003\lists\initial\deck_3.txt
Deck list saved to evaluation\decks\OP01-003\lists\optimized\deck_3.txt
[Leader OP01-003] Deck 3 generated (score=1521.7756), optimized (score=3181.3621).
Optimization complete. Best objective: 3127.7297
Deck list saved to evaluation\decks\OP01-003\lists\initial\deck_4.txt
Deck list saved to evaluation\decks\OP01-003\lists\optimize

KeyboardInterrupt: 