MTG Judge RAG
---------------------------------------------------
This is a simple Python script for building an AI-powered MTG rules assistant
using Retrieval-Augmented Generation (RAG) with OpenAI + ChromaDB.

- Loads the Comprehensive Rules from a text file.
- Splits rules into chunks.
- Creates embeddings with OpenAI.
- Stores them in ChromaDB for fast search (not using FAISS due to py version conflict)
- Lets you ask questions, retrieves relevant rules, and asks the LLM to answer.

In [None]:
# -------- IMPORTS --------
import os
import re
import json
import chromadb

from openai import OpenAI

from dotenv import load_dotenv


In [None]:
# -------- CONFIG --------
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
EMBED_MODEL = "text-embedding-3-large"
CHAT_MODEL = "gpt-4o-mini"
CHROMA_DB_DIR = "../chroma_db"
os.makedirs(CHROMA_DB_DIR, exist_ok=True) # to create folder if it doesn't exist
RULES_FILE = "../data/comprehensive-rules.txt"
CARDS_FILE = "../data/clean-standard-cards.json"
CHUNK_SIZE = 700 # words approximation
TOP_K = 6

In [96]:
# -------- INITIALIZATION --------
client = OpenAI(api_key=OPENAI_API_KEY)
load_dotenv()

True

In [97]:
# -------- HELPER LOAD RULES --------
def load_rules(path):
    """Load the MTG comprehensive rules from a text file."""
    if not os.path.exists(path):
        print(f"Rules file not found at {path}")
        return []

    docs = []
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            # Rules usually like: 603.1. Some text
            match = re.match(r"^(\d{1,3}(?:\.\d+)+)\s+(.*)$", line)
            if match:
                rule_id, body = match.groups()
                docs.append({
                    "id": f"CR:{rule_id}",
                    "text": f"{rule_id} {body}",
                    "rule_id": rule_id,
                    "source": "Comprehensive Rules"
                })
    return docs

In [99]:
# -------- HELPER LOAD CARDS --------
def load_cards(path):
    """Load MTG card data from your JSON export."""
    if not os.path.exists(path):
        print(f"Card file not found at {path}")
        return []

    with open(path, "r", encoding="utf-8") as f:
        cards = json.load(f)

    docs = []
    for c in cards:
        # Skip cards without names or text
        if "name" not in c or not c.get("originalText"):
            continue

        # Construct a searchable text block for embedding
        text_parts = [
            f"Name: {c['name']}",
            f"Mana Cost: {c.get('manaCost', '')}",
            f"Types: {' '.join(c.get('types', []))}",
            f"Subtypes: {' '.join(c.get('subtypes', []))}",
            f"Abilities/Keywords: {', '.join(c.get('keywords', []))}",
            f"Text: {c['originalText']}"
        ]

        # Add rulings (big chunk but useful)
        rulings = c.get("rulings", [])
        if rulings:
            rulings_text = " | ".join(r["text"] for r in rulings if "text" in r)
            text_parts.append(f"Rulings: {rulings_text}")

        full_text = "\n".join(text_parts)

        docs.append({
            "id": f"CARD:{c['uuid']}",   # use UUID for uniqueness
            "text": full_text,
            "source": "Card Database",
            "card_name": c["name"],
            "manaCost": c.get("manaCost", ""),
            "types": ", ".join(c.get("types", [])),       # FIXED: stringify list
            "subtypes": ", ".join(c.get("subtypes", [])), # FIXED: stringify list
            "keywords": ", ".join(c.get("keywords", [])), # FIXED: stringify list
            "rarity": c.get("rarity", "")
        })

    print(f"Loaded {len(docs)} cards from {path}")
    return docs


In [100]:
# -------- HELPER CHUNK TEXT --------
def chunk_text(text, chunk_size=CHUNK_SIZE):
    """Split text into smaller chunks so embeddings don't get too big."""
    sentences = re.split(r'(?<=[.!?]) +', text)
    chunks = []
    current = []
    length = 0

    for s in sentences:
        tokens = len(s.split())
        if length + tokens > chunk_size:
            chunks.append(" ".join(current))
            current = [s]
            length = tokens
        else:
            current.append(s)
            length += tokens
    if current:
        chunks.append(" ".join(current))

    return chunks

In [101]:
# -------- HELPER BUILD INDEX --------
def build_index():
    """Create ChromaDB collection from rules + card data."""
    client = OpenAI(api_key=OPENAI_API_KEY)

    print("Loading rules...")
    rules = load_rules(RULES_FILE)

    print("Loading cards...")
    cards = load_cards(CARDS_FILE)  # add this

    all_docs = rules + cards  # merge datasets

    texts, metas, ids = [], [], []

    for d in all_docs:
        chunks = chunk_text(d["text"])
        for i, ch in enumerate(chunks):
            texts.append(ch)
            metas.append(d)
            ids.append(f"{d['id']}_{i}")

    if not texts:
        raise ValueError("No valid chunks found to embed.")

    print(f"Total chunks: {len(texts)}")

    # Create embeddings
    embeddings = client.embeddings.create(model=EMBED_MODEL, input=texts)
    vecs = [d.embedding for d in embeddings.data]

    # Initialize Chroma client
    chroma_client = chromadb.PersistentClient(path=CHROMA_DB_DIR)

    # Drop old collection (clean rebuild)
    try:
        chroma_client.delete_collection("mtg_data")
    except:
        pass

    collection = chroma_client.get_or_create_collection(name="mtg_data")

    # Add to Chroma
    collection.add(
        ids=ids,
        embeddings=vecs,
        documents=texts,
        metadatas=metas
    )

    print("Index built and saved with ChromaDB!")




In [102]:
# -------- HELPER SEARCH INDEX --------
def search_index(query, top_k=TOP_K):
    """Search ChromaDB for relevant rule chunks."""
    query = query.strip()
    if not query:
        raise ValueError("Empty query provided.")

    # client = OpenAI()
    emb = client.embeddings.create(model=EMBED_MODEL, input=[query])
    vec = emb.data[0].embedding

    chroma_client = chromadb.PersistentClient(path=CHROMA_DB_DIR)
    collection = chroma_client.get_or_create_collection(name="mtg_rules")

    results = collection.query(query_embeddings=[vec], n_results=top_k)

    docs = []
    for i, doc in enumerate(results["documents"][0]):
        docs.append({
            "text": doc,
            "meta": results["metadatas"][0][i]
        })
    return docs

In [103]:
# -------- HELPER GENERATE SUBQUERIES --------
def generate_subqueries(query, n=10):
    """Chain of Thought decomposition function. Use the LLM to break a user query into smaller sub-questions."""
    #client = OpenAI(api_key=OPENAI_API_KEY)
    prompt = f"""
    Break down the following Magic: The Gathering rules question into {n} smaller, 
    more specific sub-questions that cover timing, abilities, rules interactions, 
    and possible edge cases. Return them as a numbered list.

    Original Question: {query}
    """
    resp = client.chat.completions.create(
        model=CHAT_MODEL,
        temperature=0.2,
        messages=[
            {"role": "system", "content": "You are an expert MTG judge assistant."},
            {"role": "user", "content": prompt}
        ]
    )
    text = resp.choices[0].message.content
    subqueries = [line.strip("0123456789. ") for line in text.splitlines() if line.strip()]
    return subqueries

In [151]:
# -------- HELPER JSON PARSE --------
def safe_json_parse(text):
    """Function to safely parse from string to json, even if wrapped in markdown fences. Converts response from gpt to json"""
    fixed = text.strip()

    # Strip markdown fences if present
    if fixed.startswith("```"):
        # Remove the first line (``` or ```json)
        fixed = "\n".join(fixed.split("\n")[1:])
        # Remove trailing fence
        if fixed.strip().endswith("```"):
            fixed = "\n".join(fixed.strip().split("\n")[:-1])

    # Try JSON decode
    try:
        return json.loads(fixed)
    except json.JSONDecodeError:
        return {"error": "Failed to parse JSON", "raw": text}


In [None]:
# ---------- ANSWER WITH SUBQUERIES ----------
def answer_with_subqueries(query, max_context_chunks=20, max_subqueries=20):
    """Break question into subqueries, search index for each, and generate final structured ruling."""

    # Step 1: Generate subqueries
    subqueries = generate_subqueries(query, n=max_subqueries)

    # Step 2: Collect retrieval results
    all_results = []
    for sq in subqueries:
        results = search_index(sq, top_k=8)
        for r in results:
            all_results.append({
                "subquery": sq,
                "source": r["meta"].get("source", ""),
                "text": r["text"]
            })

    # Prune context if too large (keep only top N chunks by length relevance)
    if len(all_results) > max_context_chunks:
        all_results = all_results[:max_context_chunks]

    context = "\n\n".join(
        f"Subquery: {r['subquery']}\n- Source: {r['source']}\n- Text: {r['text']}"
        for r in all_results
    )

    # Response format instructions
    response_format = """
    Provide a structured JSON with the following fields:

    - "question": rephrased user question,
    - "single_word_answer": "Yes", "No" or "Unclear",
    - "short_answer": short paragraph summary,
    - "full_explanation": detailed reasoning with rules and card interactions,
    - "sources": cite only the rules/cards you actually used, not everything retrieved.
    """

    # System + user prompt for judge #1
    system_prompt = f"""
    You are an expert Magic: The Gathering judge assistant.

    Sources available (rules + card texts):
    {context}

    Answer format:
    {response_format}
    """

    user_prompt = f"""
    The user's question is:
    {query}
    """

    #* Initial judge calling
    resp = client.chat.completions.create(
        model=CHAT_MODEL,
        temperature=0,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        response_format={"type": "json_object"} 
    )

    judge1_answer = resp.choices[0].message.content

    # ---------- SECONDARY JUDGE ----------
    judge2_system_prompt = f"""
    You are an expert MTG high judge reviewing another judge’s ruling.

    You will be given:
    - User's question
    - Judge’s ruling
    - The context they used

    If you agree: reply only with "Accepted".
    If you disagree: reply with "Denied, [reason + extra context suggestions]".
    """

    judge_prompt = f"""
    User Question:
    {query}

    Context Used:
    {context}

    Judge's Ruling:
    {judge1_answer}
    """

    #* Secondary judge calling
    resp2 = client.chat.completions.create(
        model=CHAT_MODEL,
        temperature=0,
        messages=[
            {"role": "system", "content": judge2_system_prompt},
            {"role": "user", "content": judge_prompt}
        ]
    )

    judge2_response = resp2.choices[0].message.content.strip()

    # ---------- ACCEPTED CASE ----------
    if judge2_response.startswith("Accepted"):
        return safe_json_parse(judge1_answer)

    # ---------- DENIED CASE ----------
    # Generate refined subqueries based on judge2 feedback
    new_subqueries = generate_subqueries(judge2_response, n=max_subqueries)
    print("2nd judge conflict")

    refined_results = []
    for sq in new_subqueries:
        results = search_index(sq, top_k=5)
        for r in results:
            refined_results.append({
                "subquery": sq,
                "source": r["meta"].get("source", ""),
                "text": r["text"]
            })

    # Keep within limits
    if len(refined_results) > max_context_chunks:
        refined_results = refined_results[:max_context_chunks]

    refined_context = "\n\n".join(
        f"Subquery: {r['subquery']}\n- Source: {r['source']}\n- Text: {r['text']}"
        for r in refined_results
    )

    new_prompt = f"""
    A higher judge denied your ruling for lack of context. Use the new context and improve your ruling.

    Higher judge feedback:
    {judge2_response}

    New context:
    {refined_context}

    Use the same JSON format as before.
    """

    #* Loop back to initial judge
    resp3 = client.chat.completions.create(
        model=CHAT_MODEL,
        temperature=0,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
            {"role": "assistant", "content": judge1_answer},
            {"role": "user", "content": new_prompt}
        ],
        response_format={"type": "json_object"} 
    )

    return safe_json_parse(resp3.choices[0].message.content)


In [None]:
        # return {
        #     "single_word_answer": "Denied",
        #     "question": query,
        #     "full_explanation": f"""
        #         We’re sorry, our virtual judges were unable to reach an agreement on a final response to your question.

        #         Reason: The original judge’s ruling was: {judge2_response}

        #         Please try asking your question again, this time with clearer wording to help us provide a more definitive answer.
        #     """
        # }

In [105]:
# -------- BUILDING INDEX --------
build_index()  # only first time

Loading rules...
Loading cards...
Loaded 90 cards from ./clean-standard-cards.json
Total chunks: 91
Index built and saved with ChromaDB!


# Single testing

In [150]:
single_question = "Can there be infinite or multiple cleanup steps triggered by effects like Kozilek plus discard effects?"
response = answer_with_subqueries(single_question)
print(response)

{
    "question": "Can there be infinite or multiple cleanup steps triggered by effects like Kozilek plus discard effects?",
    "single_word_answer": "No",
    "short_answer": "There cannot be infinite or multiple cleanup steps in Magic: The Gathering. The cleanup step is a single step in the turn structure, and while effects may trigger during it, they do not create additional cleanup steps.",
    "full_explanation": "In Magic: The Gathering, the cleanup step is a defined part of the turn structure where players discard down to their maximum hand size and any end-of-turn effects are resolved. According to the Comprehensive Rules, there is only one cleanup step per turn. While effects like Kozilek, Butcher of Truth may trigger discard effects, they do not create additional cleanup steps. If a player has to discard and has no cards in hand, they simply do not discard anything, and the cleanup step proceeds to the next phase of the turn. Therefore, even if multiple effects are triggered

# Multiple testing

In [None]:
with open("../data/easy-questions.json", "r", encoding="utf-8") as f:
    easy_questions = json.load(f)
with open("../data/hard-questions.json", "r", encoding="utf-8") as f:
    hard_questions = json.load(f)
with open("../data/own-questions.json", "r", encoding="utf-8") as f:
    own_questions = json.load(f)
# with open("../data/wrong-questions.json", "r", encoding="utf-8") as f:
#     wrong_questions = json.load(f)

all_questions = easy_questions + hard_questions + own_questions
# all_questions = wrong_questions

correct_answers = []
judge_ruling_conflict_questions = []
unclear_questions = []
incorrect_questions = []

for question in all_questions:
    response = answer_with_subqueries(question["text"])

    gold = question["answer"].strip().lower()
    pred = response["single_word_answer"].strip().lower()

    if pred == gold:
        correct_answers.append({"question": question, "response": response})
    elif pred == "denied":
        judge_ruling_conflict_questions.append({"question": question, "response": response})
    elif pred == "unclear":
        unclear_questions.append({"question": question, "response": response})
    else:
        incorrect_questions.append({"question": question, "response": response})

total_question_length = len(all_questions)

print(f"correct answers: {len(correct_answers)}/{total_question_length}")

print(f"judge ruling conflict: {len(judge_ruling_conflict_questions)}/{total_question_length}")
# for question in judge_ruling_conflict_questions:
#     print(question)

print(f"unclear questions: {len(unclear_questions)}/{total_question_length}")
# for question in unclear_questions:
#     print(question)

print(f"incorrect answers: {total_question_length - len(correct_answers) - len(judge_ruling_conflict_questions) - len(unclear_questions)}/{total_question_length}")
# for question in incorrect_questions:
#     print(question)


correct answers: 42/45
judge ruling conflict: 0/45
unclear questions: 0/45
incorrect answers: 3/45


In [None]:
for question in incorrect_questions:
    print("original question:", question["question"]["text"])
    print("correct answer: ", question["question"]["answer"])
    print("reworded question: ", question["response_dict"]["question"])
    print("single_word_answer: ", question["response_dict"]["single_word_answer"])
    print("short_answer: ", question["response_dict"]["short_answer"])
    print("full_explanation: ", question["response_dict"]["full_explanation"])
    print("sources: ", question["response_dict"]["sources"])
    print("-------------------------")