In [14]:
import os
import openai
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

In [1]:
"""
LLM plays Minesweeper N times
"""

from __future__ import annotations
import random, json, csv, time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import List, Tuple, Literal

import openai
import pandas as pd
from tqdm import trange

#  A. Python Minesweeper engine 
class MinesweeperBoard:
    NEIGHB = (
        (-1, -1), (-1,  0), (-1,  1),
        ( 0, -1),          ( 0,  1),
        ( 1, -1), ( 1,  0), ( 1,  1),
    )

    def __init__(self, rows: int, cols: int, mines: int, rng=random):
        if not (0 < mines < rows * cols):
            raise ValueError("mines must be in (0, rows*cols)")
        self.R, self.C, self.M = rows, cols, mines
        self._rng = rng

        # actual content (hidden from player)
        self._grid: List[List[str]] = [["0"] * cols for _ in range(rows)]
        self._lay_mines()
        self._fill_numbers()

        # player’s view: "□" unknown, "⚑" flagged, "0-8" open numbers
        self.view  = [["□"] * cols for _ in range(rows)]
        self.opened = 0
        self.finished, self.win = False, False

    # internal helpers 
    def _lay_mines(self):
        cells = self._rng.sample(range(self.R * self.C), self.M)
        for idx in cells:
            r, c = divmod(idx, self.C)
            self._grid[r][c] = "*"

    def _fill_numbers(self):
        def around(r, c):  # count adjacent mines
            return sum(
                1 for dr, dc in self.NEIGHB
                if 0 <= r+dr < self.R and 0 <= c+dc < self.C
                and self._grid[r+dr][c+dc] == "*"
            )
        for r in range(self.R):
            for c in range(self.C):
                if self._grid[r][c] == "*": continue
                self._grid[r][c] = str(around(r, c))

    # public API 
    def step(self, r: int, c: int, action: Literal["open", "flag"]) -> str:
        """Returns one of {"ok", "hit-mine", "invalid", "already"}"""
        if self.finished:            return "invalid"
        if not (0 <= r < self.R and 0 <= c < self.C): return "invalid"

        cell = self.view[r][c]

        if action == "flag":
            if cell in ("□", "⚑"):
                self.view[r][c] = "⚑" if cell == "□" else "□"
                return "ok"
            return "already"

        # action == "open"
        if cell != "□":              return "already"
        if self._grid[r][c] == "*":
            self.view[r][c] = "*"
            self.finished = True
            self.win = False
            return "hit-mine"
        self._flood_fill(r, c)
        if self.opened == self.R * self.C - self.M:
            self.finished = True
            self.win = True
        return "ok"

    def _flood_fill(self, r, c, seen=None):
        if seen is None: seen = set()
        if (r, c) in seen or self.view[r][c] != "□": return
        seen.add((r, c))
        self.view[r][c] = self._grid[r][c]
        self.opened += 1
        if self._grid[r][c] == "0":
            for dr, dc in self.NEIGHB:
                nr, nc = r+dr, c+dc
                if 0 <= nr < self.R and 0 <= nc < self.C:
                    self._flood_fill(nr, nc, seen)

    # formats 
    def player_view_str(self) -> str:
        return "\n".join(" ".join(row) for row in self.view)

    def reveal_str(self) -> str:
        """Original grid with mines visible – for logging/debug."""
        return "\n".join(" ".join(row) for row in self._grid)

# Dataclass for a single game result
@dataclass
class GameRecord:
    game_id   : int
    rows      : int
    cols      : int
    mines     : int
    moves     : int
    outcome   : Literal["win", "loss"]
    duration_s: float
    rng_seed  : int



# Run a batch of games
def run_experiment(n_games=5,
                   rows=10, cols=10, mines=15,
                   temperature=0.0):
    records: List[GameRecord] = []
    for gid in trange(1, n_games + 1, desc="Games"):
        seed = random.randrange(2**32)
        rec = play_one(gid, rows, cols, mines, temperature, seed=seed)
        records.append(rec)

        # write incrementally (safe if kernel crashes mid-way)
        with open("results.csv", "a", newline="") as f:
            w = csv.DictWriter(f, fieldnames=GameRecord.__annotations__.keys())
            if f.tell() == 0: w.writeheader()
            w.writerow(asdict(rec))

    # Summary
    df = pd.DataFrame([asdict(r) for r in records])
    print("\nBatch summary:")
    print(df.groupby("outcome").size())
    print("Mean moves:", df["moves"].mean())


In [3]:
#  Probability helper
def compute_probabilities(board) -> dict[tuple[int, int], float]:
    """
    Return P(cell has a mine) for every still-unknown square.

    Heuristic:
      • For every opened number N with U unknown neighbours and F flags,
        each of those U squares carries base-prob  (N-F)/U.
      • A cell may be adjacent to several numbers – average their values.
      • Non-frontier unknowns get a 'global' probability:
            (remaining_mines) / (remaining_unknown_cells).
    Good enough for >80 % wins; you can swap in a Monte-Carlo sampler later
    without touching the solver loop.
    """
    frontier_probs: dict[tuple[int, int], list[float]] = {}
    remaining_mines = board.M - sum(
        1 for r in range(board.R) for c in range(board.C) if board.view[r][c] == "⚑"
    )

    # Pass 1 – collect constraints from every opened number
    for r in range(board.R):
        for c in range(board.C):
            ch = board.view[r][c]
            if ch.isdigit():
                N = int(ch)
                flags, unknowns = 0, []
                for dr, dc in board.NEIGHB:
                    nr, nc = r + dr, c + dc
                    if 0 <= nr < board.R and 0 <= nc < board.C:
                        if board.view[nr][nc] == "⚑":
                            flags += 1
                        elif board.view[nr][nc] == "□":
                            unknowns.append((nr, nc))
                if unknowns:
                    p = (N - flags) / len(unknowns)
                    for cell in unknowns:
                        frontier_probs.setdefault(cell, []).append(p)

    # Pass 2 – average multiple contributions
    probs: dict[tuple[int, int], float] = {}
    for cell, plist in frontier_probs.items():
        probs[cell] = sum(plist) / len(plist)

    # Non-frontier unknowns (“deep fog” squares)
    unknown_cells = [
        (r, c)
        for r in range(board.R)
        for c in range(board.C)
        if board.view[r][c] == "□"
    ]
    global_p = remaining_mines / len(unknown_cells) if unknown_cells else 0.0
    for cell in unknown_cells:
        probs.setdefault(cell, global_p)

    return probs


def query_llm(board, candidate_cells, probs, temperature=0.0):
    """
    Ask the model to pick *one* cell among equally-good candidates.
    `candidate_cells`  – list[(r,c)] with the same minimal probability.
    `probs`            – full probability map (for richer context).
    """
    # Compact probability table only for the tied cells
    prob_table = "\n".join(
        f"({r},{c})  P≈{probs[(r,c)]:.3f}" for (r, c) in candidate_cells
    )

    prompt = f"""
You are playing Minesweeper. The deterministic solver found several squares
with the *lowest current mine probability*.  Pick ONE to open next.

Respond ONLY with JSON like:
{{"row":R,"col":C,"action":"open"}}

Tied candidates and their probabilities:
{prob_table}

Full board (□ = unknown, ⚑ = flag):
{board.player_view_str()}
"""
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        temperature=temperature,
        messages=[
            {"role": "system", "content": "You are an expert logical reasoner."},
            {"role": "user", "content": prompt.strip()},
        ],
    )
    txt = resp.choices[0].message.content
    try:
        move = json.loads(txt.strip().split("```")[-1] if "```" in txt else txt)
        return int(move["row"]), int(move["col"]), "open"
    except Exception:
        # Fallback – just return the first candidate
        r, c = candidate_cells[0]
        return r, c, "open"



def play_one(game_id: int,
             rows=9, cols=9, mines=10,
             temperature=0.0, max_moves=500, seed=None):
    rng   = random.Random(seed)
    board = MinesweeperBoard(rows, cols, mines, rng=rng)
    start = time.time()
    moves = 0

    while not board.finished and moves < max_moves:
        # 1.  Run classical deterministic rules until nothing changes
        changed = True
        while changed and not board.finished:
            changed = deterministic_step(board)  # <-- implement or reuse
            moves  += int(changed)

        if board.finished: break  # won through pure logic

        # 2.  Probability pass
        probs   = compute_probabilities(board)
        min_p   = min(probs.values())
        best    = [cell for cell, p in probs.items() if abs(p - min_p) < 1e-9]

        if len(best) == 1:
            r, c = best[0]          # clear favourite – open directly
        else:
            r, c, _ = query_llm(board, best, probs, temperature)

        board.step(r, c, "open")
        moves += 1

    duration = time.time() - start
    return GameRecord(
        game_id, rows, cols, mines, moves,
        "win" if board.win else "loss",
        round(duration, 3), seed or 0
    )


def deterministic_step(board) -> bool:
    """
    Apply the standard single-point rule once.
    Return True if *anything* was opened or flagged.
    """
    for r in range(board.R):
        for c in range(board.C):
            ch = board.view[r][c]
            if ch.isdigit() and ch != "0":
                n = int(ch)
                unknowns, flags = [], 0
                for dr, dc in board.NEIGHB:
                    nr, nc = r + dr, c + dc
                    if 0 <= nr < board.R and 0 <= nc < board.C:
                        v = board.view[nr][nc]
                        if v == "□":
                            unknowns.append((nr, nc))
                        elif v == "⚑":
                            flags += 1

                if unknowns and n - flags == len(unknowns):
                    # all unknowns are mines → flag them
                    for cell in unknowns:
                        board.step(*cell, action="flag")
                    return True
                if unknowns and flags == n:
                    # all other unknowns are safe → open them
                    for cell in unknowns:
                        board.step(*cell, action="open")
                    return True
    return False

In [8]:
if __name__ == "__main__":
    run_experiment(n_games=5, rows=9, cols=9, mines=10, temperature=0.2)

Games: 100%|████████████████████████████████████████| 5/5 [00:10<00:00,  2.08s/it]


Batch summary:
outcome
loss    2
win     3
dtype: int64
Mean moves: 15.2





In [9]:
if __name__ == "__main__":
    run_experiment(n_games=10, rows=20, cols=20, mines=20, temperature=0.1)

Games: 100%|██████████████████████████████████████| 10/10 [00:08<00:00,  1.16it/s]


Batch summary:
outcome
loss    1
win     9
dtype: int64
Mean moves: 28.6





In [10]:
if __name__ == "__main__":
    run_experiment(n_games=15, rows=25, cols=25, mines=30, temperature=0.3)

Games: 100%|██████████████████████████████████████| 15/15 [00:22<00:00,  1.50s/it]


Batch summary:
outcome
loss     1
win     14
dtype: int64
Mean moves: 46.06666666666667





In [5]:
if __name__ == "__main__":
    run_experiment(n_games=30, rows=30, cols=30, mines=30, temperature=0.4)

Games: 100%|██████████████████████████████████████| 30/30 [01:08<00:00,  2.29s/it]


Batch summary:
outcome
loss     2
win     28
dtype: int64
Mean moves: 40.06666666666667



