## Initial setup

Before running the notebook:
- update the .env file with your team's `OPENAI_API_KEY`
- Install uv `pip install uv`
- run `uv sync`
- run `uv run phoenix serve` (If Arize Phoenix is enabled) - takes a few minutes
- Select Kernal for this notebook

## Raven Starter Bot

This notebook contains a **Raven Starter Bot** with:

1. **Message parsing** ( `parse_message`, `parse_outgoing_message`)
2. **Role-specific random responders** (Villager, Raven, Detective, Doctor)
4. **A single async loop** `connect_parse_respond_forever()`

You can:
- Run the bot and let it play matches.
- Inspect each game later by `gameId` using the log utilities.
- Stop the bot




## Imports and basic configuration

In [1]:
import asyncio
import json
import os
import random
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from collections import defaultdict
import itertools
from datetime import datetime
from pathlib import Path

import websockets

from openai import AsyncOpenAI
from pydantic import BaseModel
from uuid import uuid4
from dotenv import load_dotenv


load_dotenv()

# --- WebSocket configuration ---
WS_URL = os.getenv("WS_URL", "ws://localhost:2025")
CONNECT_TIMEOUT = 10 # seconds to wait when opening
RECV_TIMEOUT = 0.1
KEEP_ALIVE = True
ENABLE_PHOENIX = True


def ts() -> str:
    """Return a human-readable timestamp for logging."""
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


## Initialize Phoenix

In [2]:
if ENABLE_PHOENIX:

    PHOENIX_PROJECT_NAME = "raven-bot" + '-' + ts()

    try:
        from openinference.instrumentation.openai import OpenAIInstrumentor
        from phoenix.otel import register

        tracer_provider = register(project_name=PHOENIX_PROJECT_NAME, protocol="http/protobuf")
        OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)

    except ImportError:
        print("Phoenix OpenTelemetry instrumentation is not installed. please run 'uv sync'")

OpenTelemetry Tracing Details
|  Phoenix Project: raven-bot-2025-11-28 20:25:58
|  Span Processor: SimpleSpanProcessor
|  Collector Endpoint: http://localhost:6006/v1/traces
|  Transport: HTTP + protobuf
|  Transport Headers: {}
|  
|  Using a default SpanProcessor. `add_span_processor` will overwrite this default.
|  
|  
|  `register` has set this TracerProvider as the global OpenTelemetry default.
|  To disable this behavior, call `register` with `set_global_tracer_provider=False`.



## Hello LLM

Run Below cell to make sure your OPENAI_API_KEY is working

In [3]:
no_of_tips = 5
max_characters_per_tip = 140

if not os.environ.get("OPENAI_API_KEY"):
    print(f"‚ö†Ô∏è OPENAI_API_KEY was not found in the environment.")

# Single async OpenAI client
client = AsyncOpenAI()

MODEL_NAME = "gpt-5-nano"
REASONING_EFFORT = "minimal"   # "minimal""low" / "medium" / "high"
VERBOSITY = "low"             # keep answers short/direct

try:
    user_prompt = (
        f"I'm playing a game of Mafia (Raven) in a 24 Hour AI hackathon."
        f"Give me {no_of_tips} tips to help me win the hackathon."
        f"Keep each tip less than {max_characters_per_tip} characters."
    )
    # user_prompt = "Say Hi"
    resp = await client.responses.parse(
        model=MODEL_NAME,
        input=[ { "role": "user", "content": user_prompt,}],
        reasoning={"effort": REASONING_EFFORT},
        text={"verbosity": VERBOSITY}
    )
    print(resp.output_text)

except Exception as e:
    print(f"Error during OpenAI API call: {e}")
    raise

- Map every player‚Äôs role clues quickly; note actions, votes, and alibis to spot patterns.
- Prioritize trust but verify: test suspects with targeted questions and inconsistent responses.
- Use risk assessment: address high-impact roles first (killers, blockers) to slow mafia.
- Coordinate with teammates: share information succinctly; avoid exposing your own plan.
- Stay calm under pressure: concise decisions, predictable actions prevent panic and misreads.


# Helper functions

## Message model ‚Äì `ParsedMessage`

In [4]:

@dataclass
class ParsedMessage:
    raw: str                                 # original JSON string
    type: Optional[str] = None               # message type

    # Core identifiers
    match_id: Optional[str] = None
    game_id: Optional[str] = None
    your_id: Optional[str] = None

    # Game and phase context
    day: Optional[int] = None
    phase: Optional[str] = None              # "morning" or "night"
    timeout: Optional[float] = None          # generic timeout (seconds)
    first_vote_timeout: Optional[float] = None
    time_remaining: Optional[float] = None   # used in *_player-comment, raven-comment

    # GAME START info
    your_role: Optional[str] = None
    raven_count: Optional[int] = None
    detective_count: Optional[int] = None
    doctor_count: Optional[int] = None
    villager_count: Optional[int] = None

    # Common interaction fields
    otp: Optional[str] = None
    comment: Optional[str] = None

    # Player lists / status
    all_players: List[Dict[str, Any]] = field(default_factory=list)
    villagers_alive: List[str] = field(default_factory=list)
    players_alive: List[str] = field(default_factory=list)
    discussions: List[Dict[str, Any]] = field(default_factory=list)

    # Voting information
    votes: List[str] = field(default_factory=list)   # unified votes field
    done_voting: Optional[bool] = None
    player_lynched: Optional[str] = None

    # Role-identification info (Detective / shared)
    identified_raven: List[str] = field(default_factory=list)
    identified_villager: List[str] = field(default_factory=list)

    # Comment metadata
    player_id: Optional[str] = None
    llm_model_used: Optional[str] = None

    # Ack / investigation results
    request_status: Optional[str] = None             # for generic ack
    investigated: List[str] = field(default_factory=list)  # for ack-night-investigation
    is_raven: List[bool] = field(default_factory=list)     # parallel to investigated

    # Game result
    result: Optional[str] = None


## üîß Function:  `parse_message` to parse incoming messages

In [5]:
def parse_message(msg: str) -> Optional[ParsedMessage]:
    """Parse a raw JSON string from the server into a ParsedMessage object.

    This is aligned with the latest singham protocol.md and the observed
    JSONL logs. Legacy fields like a top-level ``vote`` on incoming
    messages are no longer supported here.
    """
    try:
        data = json.loads(msg)
    except Exception as e:
        print("‚ùå Invalid JSON:", msg)
        print("Error:", e)
        return None

    p = ParsedMessage(raw=msg)

    # Common fields present on most messages
    p.type = data.get("type")
    p.match_id = data.get("matchId")
    p.game_id = data.get("gameId")
    p.your_id = data.get("yourId")
    p.otp = data.get("otp")
    p.day = data.get("day")
    p.phase = data.get("phase")

    # Timers
    if "timeout" in data:
        p.timeout = data.get("timeout")
    if "firstVoteTimeout" in data:
        p.first_vote_timeout = data.get("firstVoteTimeout")
    if "timeRemaining" in data:
        p.time_remaining = data.get("timeRemaining")

    # 1) GAME START
    if p.type == "game-start":
        p.raven_count = data.get("ravenCount")
        p.detective_count = data.get("detectiveCount")
        p.doctor_count = data.get("doctorCount")
        p.villager_count = data.get("villagerCount")
        p.your_role = data.get("yourRole")
        return p

    # 2) PLAYER STATUS
    if p.type == "player-status":
        # allPlayers: [{ id, isAlive?, lynchedBy, lynchedDay }, ...]
        p.all_players = data.get("allPlayers", [])
        return p

    # 3) PHASE RESULT
    if p.type == "phase-result":
        # playerLynched may be empty (no one lynched)
        lynched = data.get("playerLynched")
        p.player_lynched = lynched or None
        return p

    # 4) GENERIC ACK
    if p.type == "ack":
        p.request_status = data.get("requestStatus")
        return p

    # 5) NIGHT DISCUSSION (Ravens)
    if p.type == "night-discussion":
        p.villagers_alive = data.get("villagersAlive", [])
        return p

    # 6) RAVEN COMMENT (Server ‚Üí Ravens)
    if p.type == "raven-comment":
        p.discussions = data.get("discussions", [])
        if p.discussions:
            last = p.discussions[-1]
            p.player_id = last.get("playerId")
            p.comment = last.get("comment")
            p.votes = list(last.get("votes", []))
        return p

    # 7) MORNING DISCUSSION (Villagers)
    if p.type == "morning-discussion":
        p.players_alive = data.get("playersAlive", [])
        return p

    # 8) MORNING PLAYER COMMENT (Server ‚Üí Villagers)
    if p.type == "morning-player-comment":
        p.discussions = data.get("discussions", [])
        if p.discussions:
            last = p.discussions[-1]
            p.player_id = last.get("playerId")
            p.comment = last.get("comment")
            p.votes = list(last.get("votes", []))
        return p

    # 9) NIGHT INVESTIGATION (Detective)
    if p.type == "night-investigation":
        p.players_alive = data.get("playersAlive", [])
        p.identified_raven = data.get("identifiedRavens", []) or []
        p.identified_villager = data.get("identifiedVillagers", []) or []
        return p

    # 10) NIGHT PROTECTION (Doctor)
    if p.type == "night-protection":
        p.players_alive = data.get("playersAlive", [])
        return p

    # 11) ACK for detective investigation
    if p.type == "ack-night-investigation":
        p.investigated = data.get("investigated", []) or []
        p.is_raven = data.get("isRaven", []) or []
        return p

    # 12) GAME RESULT (if/when used)
    if p.type == "game-result":
        p.result = data.get("result")  # "won", "lost", "draw", etc.
        return p

    # Fallback: unknown type ‚Äì still return ParsedMessage for debugging
    return p


## üîß Functions: `parse_outgoing_message`

In [6]:
def parse_outgoing_message(raw: str) -> ParsedMessage:
    """Parse the bot ‚Üí server JSON into a ParsedMessage for logging.

    Outgoing messages now always use the ``votes`` list (no legacy ``vote`` key).
    """
    try:
        data = json.loads(raw)
    except Exception:
        # log as raw, but still return ParsedMessage
        return ParsedMessage(raw=raw)

    p = ParsedMessage(raw=raw)
    p.type = data.get("type")
    p.game_id = data.get("gameId")
    p.your_id = data.get("yourId")
    p.otp = data.get("otp")
    p.comment = data.get("comment")

    # Voting-related fields (used by all roles)
    p.votes = data.get("votes", []) or []
    p.done_voting = data.get("doneVoting")
    p.llm_model_used = data.get("llmModelUsed")

    return p

## üîß Functions: `print_parsed_message`

In [7]:
def print_parsed_message(p: Optional[ParsedMessage]) -> None:
    """Pretty-print the key fields of a ParsedMessage for debugging."""
    if p is None:
        print("‚ùå Nothing to print (parsed is None)")
        return
    
    

    print("\n================= üß© PARSED MESSAGE =================")
    print(f"Type           : {p.type}")
    print(f"Match ID       : {p.match_id}")
    print(f"Game ID        : {p.game_id}")
    print(f"Your ID        : {p.your_id}")
    if p.day is not None:
        print(f"Day            : {p.day}")
    if p.phase is not None:
        print(f"Phase          : {p.phase}")
    if p.timeout is not None:
        print(f"Timeout (s)    : {p.timeout}")
    if p.first_vote_timeout is not None:
        print(f"First Vote TO  : {p.first_vote_timeout}")
    if p.time_remaining is not None:
        print(f"Time Remaining : {p.time_remaining}")
    if p.otp:
        print(f"OTP            : {p.otp}")

    # GAME START
    if p.type == "game-start":
        print(f"Your Role      : {p.your_role}")
        print(f"Ravens         : {p.raven_count}")
        print(f"Detectives     : {p.detective_count}")
        print(f"Doctors        : {p.doctor_count}")
        print(f"Villagers      : {p.villager_count}")

    # PLAYER STATUS
    elif p.type == "player-status":
        print("All Players    :")
        for pl in p.all_players:
            print(f"  - {pl}")

    # PHASE RESULT
    elif p.type == "phase-result":
        print(f"Player Lynched : {p.player_lynched}")

    # ACK
    elif p.type == "ack":
        print(f"Request Status : {p.request_status}")

    # NIGHT DISCUSSION
    elif p.type == "night-discussion":
        print(f"Villagers Alive: {p.villagers_alive}")

    # RAVEN COMMENT
    elif p.type == "raven-comment":
        print("Discussions    :")
        for d in p.discussions:
            print(f"  - {d.get('playerId')}: {d.get('comment')} (votes={d.get('votes')})")
        if p.votes:
            print(f"Last Votes     : {p.votes}")

    # MORNING DISCUSSION
    elif p.type == "morning-discussion":
        print(f"Players Alive  : {p.players_alive}")

    # MORNING PLAYER COMMENT
    elif p.type == "morning-player-comment":
        print("Discussions    :")
        for d in p.discussions:
            print(f"  - {d.get('playerId')}: {d.get('comment')} (votes={d.get('votes')})")
        if p.votes:
            print(f"Last Votes     : {p.votes}")

    # NIGHT INVESTIGATION
    elif p.type == "night-investigation":
        print(f"Players Alive  : {p.players_alive}")
        print(f"Identified Ravens   : {p.identified_raven}")
        print(f"Identified Villagers: {p.identified_villager}")

    # NIGHT PROTECTION
    elif p.type == "night-protection":
        print(f"Players Alive  : {p.players_alive}")

    # ACK NIGHT INVESTIGATION
    elif p.type == "ack-night-investigation":
        print(f"Investigated   : {p.investigated}")
        print(f"Is Raven       : {p.is_raven}")

    # GAME RESULT
    elif p.type == "game-result":
        print(f"Game Result    : {p.result}")

    else:
        print("‚ö†Ô∏è No specific printer for this type; showing raw dict:")
        try:
            print(json.loads(p.raw))
        except Exception:
            print(p.raw)

    print("=====================================================")



In [8]:
def parsed_message_to_obj(p: Optional[ParsedMessage]) -> dict:
    """Return a dictionary representation of a ParsedMessage."""
    if p is None:
        return {"error": "parsed is None"}

    obj = {
        "type": p.type,
        "match_id": p.match_id,
        "game_id": p.game_id,
        "your_id": p.your_id,
        "day": p.day,
        "phase": p.phase,
        "timeout": p.timeout,
        "first_vote_timeout": p.first_vote_timeout,
        "time_remaining": p.time_remaining,
        "otp": p.otp,
    }

    # Add type-specific fields
    if p.type == "game-start":
        obj.update({
            "your_role": p.your_role,
            "raven_count": p.raven_count,
            "detective_count": p.detective_count,
            "doctor_count": p.doctor_count,
            "villager_count": p.villager_count,
        })

    elif p.type == "player-status":
        obj["all_players"] = list(p.all_players)

    elif p.type == "phase-result":
        obj["player_lynched"] = p.player_lynched

    elif p.type == "ack":
        obj["request_status"] = p.request_status

    elif p.type == "night-discussion":
        obj["villagers_alive"] = p.villagers_alive

    elif p.type == "raven-comment":
        obj["discussions"] = p.discussions
        obj["votes"] = p.votes

    elif p.type == "morning-discussion":
        obj["players_alive"] = p.players_alive

    elif p.type == "morning-player-comment":
        obj["discussions"] = p.discussions
        obj["votes"] = p.votes

    elif p.type == "night-investigation":
        obj.update({
            "players_alive": p.players_alive,
            "identified_raven": p.identified_raven,
            "identified_villager": p.identified_villager,
        })

    elif p.type == "night-protection":
        obj["players_alive"] = p.players_alive

    elif p.type == "ack-night-investigation":
        obj.update({
            "investigated": p.investigated,
            "is_raven": p.is_raven,
        })

    elif p.type == "game-result":
        obj["result"] = p.result

    else:
        # fallback raw data
        try:
            obj["raw"] = json.loads(p.raw)
        except Exception:
            obj["raw"] = p.raw

    return obj


# Role-specific responders (Villager, Raven, Detective, Doctor)

###  All Roles: `build_vote_from_morning_discussion`

In [9]:
from prompts import GAME_PROMPT
from llm import make_llm_call

In [10]:
prt = GAME_PROMPT
res = await make_llm_call(prt)

Making an LLM call
prompt='\nYou are playing an 8‚Äëplayer hidden‚Äërole elimination game.\n\nRoles in setup:\n4 Villagers (no power)\n1 Doctor (night protect one)\n1 Detective (night investigate one)\n2 Ravens (know each other, night kill one)\n\nPublic Knowledge Each Morning:\nAlive list, who died last night. Roles remain hidden except Ravens know partners; Detective accumulates confirmed roles from investigations.\n\nYour Private Role: {role}\n\nCore Objectives:\nVillager/Doctor/Detective team: Eliminate both Ravens before being outnumbered.\nRaven team: Eliminate all non‚ÄëRavens (Doctor & Detective count as villagers).\n\nStrategic Guidance By Role:\nVillager:\n- Establish baselines; push for specific reasoning over vibes.\n- Vote: Prefer consolidating late to avoid random scatter.\n- Detect lies by tracking claimed night actions vs outcomes.\n\nDoctor:\n- Protect likely kill targets (outspoken, analytic players).\n- Once a reliable Detective emerges (with consistent results), cha

In [11]:
print(res)

{}


In [12]:
def clean_history(events):
    """
    Take a list of ParsedMessage-like dicts and produce a compact, 
    human-friendly history. All types are handled except bare 'ack'.
    """
    cleaned = []

    for ev in events:
        t = ev.get("type")

        # Skip meaningless events
        if t == "ack":
            # low-level network acks; not useful for history
            continue

        # Common metadata we may want on almost every entry
        base = {
            "match_id": ev.get("match_id"),
            "game_id": ev.get("game_id"),
            "day": ev.get("day"),
            "phase": ev.get("phase"),
        }

        # 1) GAME START
        if t == "game-start":
            cleaned.append({
                **base,
                "type": "game-start",
                "role": ev.get("your_role"),
                "counts": {
                    "raven": ev.get("raven_count"),
                    "detective": ev.get("detective_count"),
                    "doctor": ev.get("doctor_count"),
                    "villager": ev.get("villager_count"),
                },
            })
            continue

        # 2) PLAYER STATUS SNAPSHOT
        if t == "player-status":
            alive = []
            dead = []
            for p in ev.get("all_players", []):
                # AllPlayers entries are raw JSON from server:
                # { id, isAlive?, lynchedBy, lynchedDay }
                if p.get("isAlive?"):
                    alive.append(p["id"])
                else:
                    dead.append({
                        "player": p["id"],
                        "by": p.get("lynchedBy"),
                        "day": p.get("lynchedDay"),
                    })

            cleaned.append({
                **base,
                "type": "player-status",
                "alive": alive,
                "dead": dead,
            })
            continue

        # 3) PHASE RESULT (lynches / no-lynch)
        if t == "phase-result":
            lynched = ev.get("player_lynched")
            cleaned.append({
                **base,
                "type": "lynch-result",
                "player": lynched or None,   # None = no one lynched
            })
            continue

        # 4) NIGHT DISCUSSION (Ravens ‚Äì who‚Äôs alive to target)
        if t == "night-discussion":
            cleaned.append({
                **base,
                "type": "night-discussion",
                "villagers_alive": ev.get("villagers_alive", []),
            })
            continue

        # 5) RAVEN COMMENT THREAD (server ‚Üí ravens)
        if t == "raven-comment":
            conversations = []
            for d in ev.get("discussions", []):
                conversations.append({
                    "player": d.get("playerId"),
                    "comment": d.get("comment"),
                    "votes": d.get("votes", []),
                })

            cleaned.append({
                **base,
                "type": "raven-discussion",
                "conversations": conversations,
                # final_votes is usually the "current aggregate" from server
                "final_votes": ev.get("votes", []),
            })
            continue

        # 6) MORNING DISCUSSION (villagers ‚Äì who‚Äôs alive)
        if t == "morning-discussion":
            cleaned.append({
                **base,
                "type": "morning-discussion",
                "players_alive": ev.get("players_alive", []),
            })
            continue

        # 7) MORNING PLAYER COMMENT THREAD (server ‚Üí villagers)
        if t == "morning-player-comment":
            conversations = []
            for d in ev.get("discussions", []):
                conversations.append({
                    "player": d.get("playerId"),
                    "comment": d.get("comment"),
                    "votes": d.get("votes", []),
                })

            cleaned.append({
                **base,
                "type": "discussion",
                "conversations": conversations,
                "final_votes": ev.get("votes", []),
            })
            continue

        # 8) NIGHT INVESTIGATION (detective)
        if t == "night-investigation":
            cleaned.append({
                **base,
                "type": "night-investigation",
                "players_alive": ev.get("players_alive", []),
                "identified_ravens": ev.get("identified_raven", []),
                "identified_villagers": ev.get("identified_villager", []),
            })
            continue

        # 9) NIGHT PROTECTION (doctor)
        if t == "night-protection":
            cleaned.append({
                **base,
                # keep your older 'night' label but add phase details
                "type": "night",
                "phase_role": "doctor",
                "players_alive": ev.get("players_alive", []),
            })
            continue

        # 10) ACK FOR DETECTIVE INVESTIGATION
        if t == "ack-night-investigation":
            cleaned.append({
                **base,
                "type": "investigation-result",
                # server-side fields mirrored by parse_message
                "investigated": ev.get("investigated", []),
                "is_raven": ev.get("is_raven", []),
            })
            continue

        # 11) GAME RESULT
        if t == "game-result":
            cleaned.append({
                **base,
                "type": "game-result",
                "result": ev.get("result"),  # "won", "lost", "draw", etc.
            })
            continue

        # 12) NIGHT TIMER / OTHER UNEXPECTED TYPES ‚Äì just keep a debug entry
        cleaned.append({
            **base,
            "type": t or "unknown",
            "raw": ev,
        })

    return cleaned


In [13]:
def format_history_for_llm(events):
    """
    Takes cleaned history (output of clean_history) and returns
    a nicely formatted multiline string for LLM prompts.
    Now also includes detective investigation info:
      - night-investigation (players alive, known ravens/villagers)
      - investigation-result (per-night results, updated known sets)
    """
    lines = []
    current_day = None

    # Track cumulative knowledge from investigations
    known_ravens = set()
    known_villagers = set()
    last_investigation_night_day = None  # to label investigation results

    for ev in events:
        t = ev.get("type")

        # Game setup (no day)
        if t == "game-start":
            lines.append("GAME SETUP")
            lines.append(f"- Your role: {ev.get('role')}")
            counts = ev.get("counts", {})
            role_parts = []
            for role_key, label in [
                ("raven", "Ravens"),
                ("detective", "Detectives"),
                ("doctor", "Doctors"),
                ("villager", "Villagers"),
            ]:
                if counts.get(role_key) is not None:
                    role_parts.append(f"{label}: {counts[role_key]}")
            if role_parts:
                lines.append("- Role counts: " + ", ".join(role_parts))
            lines.append("")  # blank line
            continue

        # Everything else usually has a day
        day = ev.get("day")
        if day is not None and t != "game-start":
            if day != current_day:
                lines.append("")
                lines.append(f"=== Day {day} ===")
                current_day = day

        # Player status
        if t == "player-status":
            phase = ev.get("phase") or "unknown"
            lines.append(f"[{phase.capitalize()} status]")
            alive = ", ".join(ev.get("alive", [])) or "None"
            lines.append(f"- Alive: {alive}")
            dead_list = ev.get("dead", [])
            if dead_list:
                dead_strs = [
                    f"{d['player']} (by {d['by']} on day {d['day']})"
                    for d in dead_list
                ]
                lines.append("- Dead: " + ", ".join(dead_strs))
            else:
                lines.append("- Dead: None")
            lines.append("")
            continue

        # Legacy "night" type (if ever present)
        if t == "night":
            alive = ", ".join(ev.get("alive", [])) or "None"
            lines.append(f"[Night {day}]")
            lines.append(f"- Alive during night: {alive}")
            lines.append("")
            continue

        # Night investigation snapshot
        if t == "night-investigation":
            last_investigation_night_day = day

            # Update known sets from the event (they‚Äôre cumulative snapshots)
            for r in ev.get("identified_ravens") or []:
                known_ravens.add(r)
            for v in ev.get("identified_villagers") or []:
                known_villagers.add(v)

            lines.append(f"[Night {day} investigation]")
            alive = ", ".join(ev.get("players_alive", [])) or "None"
            lines.append(f"- Players alive: {alive}")

            if known_ravens:
                lines.append(
                    "- Known Ravens so far: " + ", ".join(sorted(known_ravens))
                )
            if known_villagers:
                lines.append(
                    "- Known Villagers so far: " + ", ".join(sorted(known_villagers))
                )

            lines.append("")
            continue

        # Investigation result(s) from the detective
        if t == "investigation-result":
            investigated = ev.get("investigated") or []
            is_raven_flags = ev.get("is_raven") or []

            # Pair them safely
            pairs = list(zip(investigated, is_raven_flags))

            # Update cumulative knowledge
            for player, is_raven in pairs:
                if is_raven:
                    known_ravens.add(player)
                else:
                    known_villagers.add(player)

            # Label by last night day if known
            if last_investigation_night_day is not None:
                lines.append(
                    f"[Investigation result (Night {last_investigation_night_day})]"
                )
            else:
                lines.append("[Investigation result]")

            for player, is_raven in pairs:
                if is_raven:
                    lines.append(f"- You investigated {player}: RAVEN")
                else:
                    lines.append(
                        f"- You investigated {player}: NOT Raven (confirmed Villager)"
                    )

            # Show updated sets
            if known_ravens:
                lines.append(
                    "- Updated known Ravens: " + ", ".join(sorted(known_ravens))
                )
            if known_villagers:
                lines.append(
                    "- Updated known Villagers: " + ", ".join(sorted(known_villagers))
                )

            lines.append("")
            continue

        # Discussion ‚Äì preserve ALL comments & votes
        if t == "discussion":
            # Skip totally empty shells
            if not ev.get("conversations") and not ev.get("final_votes"):
                continue

            lines.append(f"[Day {day} discussion]")
            for c in ev.get("conversations", []):
                votes = c.get("votes") or []
                vote_text = ", ".join(votes) if votes else "no vote"
                comment = c.get("comment", "").replace("\n", " ")
                lines.append(
                    f"- {c['player']} said: \"{comment}\" (votes: {vote_text})"
                )

            final_votes = ev.get("final_votes") or []
            if final_votes:
                lines.append(
                    "Final votes this round: " + ", ".join(final_votes)
                )
            lines.append("")
            continue

        # Elimination / lynch result
        if t == "lynch-result":
            lines.append("[Resolution]")
            lines.append(f"- Player eliminated: {ev.get('player')}")
            lines.append("")
            continue

        # You can add more handlers here if new event types appear later.

    # Join everything into a single string
    return "\n".join(lines).strip()


In [14]:
MORNING_MSG_COUNT: Dict[str, int] = defaultdict(int)

async def build_vote_from_morning_discussion(parsed: ParsedMessage, game_events, done_voting: bool) -> Dict[str, Any]:
    """
    Villager: build a vote from a morning-discussion message.
    Now uses OpenAI to choose a target + comment, with a simple random fallback.
    """
    game_id = parsed.game_id
    your_id = parsed.your_id
    otp = parsed.otp

    alive = parsed.players_alive or []
    possible_targets = [p for p in alive if p != your_id]
    
    vote_res_fmt = {
        "comment" : "<Your short comment here *Not* more than 140 Chars>",
        "votes" : "ID of the player who needs to be eliminated so that chances for you to win is maximum"
    }
    
    print("Getting Game Events in Doc step")
    print(game_events)
    
    
    print(f"Getting respective History for GameID {game_id}")
    history = game_events.get(game_id, [])
    
    print("History fetched successfully")
    print(f"{history=}")
        
    print("Cleaning History")
    cleaned_history = clean_history(history)
    
    print("Formatting History")
    frmt_history = format_history_for_llm(cleaned_history)
    print(f"{frmt_history=}")
    
    
    print("Getting your role")
    role = game_events.get(game_id+"_role", "")
    print(f"{role=}")
    
    guess = await make_llm_call(prompt=GAME_PROMPT.format(role=str(role + your_id), log_history=f"{frmt_history} + \n \n Possible Targets : {possible_targets}", response_format=vote_res_fmt)) 
    
    print(f"{guess=}")
    guess_json = json.loads(guess)
    print(f"{guess_json=}")
    
    vote_target = guess_json["votes"]
    comment = guess_json["comment"]

    
    # vote_target = random.choice(possible_targets) if possible_targets else None
    # comment = f"I think {vote_target} may be suspicious. Casting my vote."

    votes = [vote_target] if vote_target is not None else []

    msg: Dict[str, Any] = {
        "gameId": game_id,
        "yourId": your_id,
        "type": "vote",
        "otp": otp,
        "comment": comment,
        "votes": votes,
    }

    if done_voting:
        msg["doneVoting"] = True

    return msg


### Raven: `build_raven_vote_from_night_discussion`

In [15]:
async def build_raven_vote_from_night_discussion(parsed: ParsedMessage, game_events) -> Dict[str, Any]:
    """
    Raven: build a vote message from a night-discussion message.

    Uses LLM to pick villagers to target (votes list).
    If LLM fails or gives nonsense, fallback = all villagers, sorted.
    """
    game_id = parsed.game_id
    your_id = parsed.your_id
    otp = parsed.otp
    
    villagers = sorted(parsed.villagers_alive or [])
    
    votes: List[str] = []
    if not votes and villagers:
        votes = villagers[:] 
        comment = (
            "As Raven, I am voting to eliminate the following villagers: "
            + ", ".join(votes)
        )
        
        raven_res_fmt = {
            "comment" : "<Your short comment here *Not* more than 140 Chars>",
            "votes" : ["IDs of the players who needs to be killed to make the Raven win. Based on Priority."] 
        }
        
        print("Getting Game Events in Doc step")
        print(game_events)
        
        
        print(f"Getting respective History for GameID {game_id}")
        history = game_events.get(game_id, [])
        
        print("History fetched successfully")
        print(f"{history=}")
            
        print("Cleaning History")
        cleaned_history = clean_history(history)
        
        print("Formatting History")
        frmt_history = format_history_for_llm(cleaned_history)
        print(f"{frmt_history=}")
        
        
        print("Getting your role")
        role = game_events.get(game_id+"_role", "")
        print(f"{role=}")
        
        guess = await make_llm_call(prompt=GAME_PROMPT.format(role=str(role + your_id), log_history=f"{frmt_history}", response_format=raven_res_fmt)) 
        
        print(f"{guess=}")
        guess_json = json.loads(guess)
        print(f"{guess_json=}")
        
        votes = guess_json["votes"]
        comment = guess_json["comment"]
    elif not villagers:
        votes = []
        comment = "As Raven, I have no villagers to target :-)"

    return {
        "gameId": game_id,
        "yourId": your_id,
        "type": "vote",
        "otp": otp,
        "comment": comment,
        "votes": votes,
    }


### Detective: `build_detective_vote_from_night_investigation`

In [16]:
async def build_detective_vote_from_night_investigation(parsed: ParsedMessage, game_events) -> Dict[str, Any]:
    """Detective: build a vote from a night-investigation message."""
    game_id = parsed.game_id
    your_id = parsed.your_id
    otp = parsed.otp

    alive = parsed.players_alive or []
    safe_players = set(parsed.identified_villager or [])
    
    
    det_res_fmt = {
        "comment" : "<Your short comment here, *Not* more than 140 Chars>",
        "person_to_investigate" : "<ID of the Player to be investigated>"
    }
    
    print("Getting Game Events in Doc step")
    print(game_events)
    
    
    print(f"Getting respective History for GameID {game_id}")
    history = game_events.get(game_id, [])
    
    print("History fetched successfully")
    print(f"{history=}")
        
    print("Cleaning History")
    cleaned_history = clean_history(history)
    print(f"{cleaned_history=}")
    
    print("Formatting History")
    frmt_history = format_history_for_llm(cleaned_history)
    print(f"{frmt_history=}")
    
    
    print("Getting your role")
    role = game_events.get(game_id+"_role", "")
    print(f"{role=}")
    
    guess = await make_llm_call(prompt=GAME_PROMPT.format(role=str(role + your_id), log_history=frmt_history, response_format=det_res_fmt))
    
    print(f"{guess=}")
    guess_json = json.loads(guess)
    print(f"{guess_json=}")
    

    # possible_targets = [p for p in alive if p not in safe_players]
    # if not possible_targets:
    #     possible_targets = alive[:]

    
    # vote_target = random.choice(possible_targets) if possible_targets else None
    # comment = f"As Detective, I want to investigate {vote_target}. Casting my vote on them."
    
    vote_target = guess_json["person_to_investigate"]
    comment = guess_json["comment"]

    votes = [vote_target] if vote_target is not None else []

    return {
        "gameId": game_id,
        "yourId": your_id,
        "type": "vote",
        "otp": otp,
        "comment": comment,
        "votes": votes,
    }


### Doctor: `build_doctor_vote_from_night_protection`

In [17]:
async def build_doctor_vote_from_night_protection(parsed: ParsedMessage, game_events) -> Dict[str, Any]:
    """Doctor: build a protection vote from a night-protection message."""
    
    # Doctor should save himself at first until he gets clear suspicious
    # Doctor can identify like, how many villagers are killed by Raven / By voting
    
    
    game_id = parsed.game_id
    your_id = parsed.your_id
    otp = parsed.otp

    alive = parsed.players_alive or []
    
    doc_res_fmt = {
        "comment" : "<Your short comment here, *Not* more than 140 Chars>",
        "protect" : "<ID of the Player to be protected>"
    }
    
    print("Getting Game Events in Doc step")
    print(game_events)
    
    
    print(f"Getting respective History for GameID {game_id}")
    history = game_events.get(game_id, [])
    
    print("History fetched successfully")
    print(f"{history=}")
        
    print("Cleaning History")
    cleaned_history = clean_history(history)
    print(f"{cleaned_history=}")
    
    print("Formatting History")
    frmt_history = format_history_for_llm(cleaned_history)
    print(f"{frmt_history=}")
    
    
    print("Getting your role")
    role = game_events.get(game_id+"_role", "")
    print(f"{role=}")
    
    guess = await make_llm_call(prompt=GAME_PROMPT.format(role=str(role + your_id), log_history=frmt_history, response_format=doc_res_fmt))
    
    print(f"{guess=}")
    guess_json = json.loads(guess)
    print(f"{guess_json=}")
    
    protect_target = guess_json["protect"]
    comment = guess_json["comment"]

    votes = [protect_target] if protect_target is not None else []

    return {
        "gameId": game_id,
        "yourId": your_id,
        "type": "vote",
        "otp": otp,
        "comment": comment,
        "votes": votes,
    }


## Logging ‚Äì in-memory + JSONL file

In [18]:
# In-memory game logs: game_id -> list of events
GAME_LOGS: Dict[str, List[Dict[str, Any]]] = defaultdict(list)

# Global event counter (across the whole run)
EVENT_COUNTER = itertools.count(1)

# Log file setup
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)

LOG_FILE_PATH = os.path.join(
    LOG_DIR,
    f"raven_events_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"
)

print(f"Logging events to: {LOG_FILE_PATH}")


def record_event(*, direction: str, raw: str, parsed: Optional[ParsedMessage] = None) -> None:
    """Record a single event both in memory and in the JSONL log file."""
    seq = next(EVENT_COUNTER)
    timestamp = ts()

    game_id = getattr(parsed, "game_id", None) if parsed is not None else None
    match_id = getattr(parsed, "match_id", None) if parsed is not None else None
    msg_type = getattr(parsed, "type", None) if parsed is not None else None

    if not game_id:
        game_id = "__no_game_id__"

    event = {
        "seq": seq,
        "ts": timestamp,
        "direction": direction,  # "IN" or "OUT"
        "type": msg_type,
        "matchId": match_id,
        "gameId": game_id,
        "raw": raw,
    }

    # 1) In-memory
    GAME_LOGS[game_id].append(event)

    # 2) On disk (append JSONL)
    with open(LOG_FILE_PATH, "a", encoding="utf-8") as f:
        f.write(json.dumps(event) + "\n")


def print_game_log_from_memory(game_id: str) -> None:
    """Print all events for a gameId from in-memory logs."""
    events = GAME_LOGS.get(game_id, [])
    if not events:
        print(f"No events logged for gameId={game_id}")
        return

    print("\n" + "=" * 80)
    print(f"üéÆ GAME LOG (in-memory) for gameId={game_id}")
    print("=" * 80)

    for ev in events:
        direction = "Server ‚Üí Bot" if ev["direction"] == "IN" else "Bot ‚Üí Server"
        print(
            f"\n#{ev['seq']} [{ev['ts']}] {direction} "
            f"(type={ev['type']}, matchId={ev['matchId']})"
        )
        print("-" * 80)
        try:
            obj = json.loads(ev["raw"])
            print(json.dumps(obj, indent=2))
        except Exception:
            print(ev["raw"])

    print("\n" + "=" * 80 + "\n")

Logging events to: logs\raven_events_20251128_202625.jsonl


# Main async loop ‚Äì connect, parse, respond, log

### How the non-blocking LLM logic works

This section is the **heart of the bot's concurrency model**. The goal is:

> Keep listening to the server all the time, without waiting for the LLM.

We do this in two pieces:

1. **`connect_parse_respond_forever` (main loop)**  
   - Opens the WebSocket connection to the game server.  
   - Repeatedly does `ws.recv()` to get the next message.  
   - Parses and logs the message.  
   - For messages that need an LLM decision, it calls:

     ```python
     asyncio.create_task(handle_message_and_respond(parsed, ws, send_lock))
     ```

   - This line *starts* an async task and immediately returns, so the loop
     can go back to waiting for the next message.

2. **`handle_message_and_respond` (background task)**  
   - Runs **in the background**, one task per message that needs the LLM.  
   - Decides what to do based on `parsed.type` (Villager / Raven / Detective / Doctor).  
   - Calls the appropriate async helper which internally talks to OpenAI.  
   - Builds the outgoing JSON payload, logs it, and sends it via `ws.send(...)`.  
   - Uses a shared `send_lock` so that only one task writes to the WebSocket at a time.

Because of this design:

- The bot can receive and handle **multiple games or phases in parallel**.
- A slow LLM call does **not** block the WebSocket; other messages keep flowing.
- The pattern is simple and beginner-friendly: one main loop + one background handler.


## Handler `handle_message_and_respond`

In [19]:
async def handle_message_and_respond(parsed: ParsedMessage, ws, send_lock: asyncio.Lock, game_events) -> None:
    """
    Handle a single parsed message **in a background task**.

    This function is where we decide whether we need to call the LLM and send
    a response back to the server. It is meant to be launched with
    `asyncio.create_task(...)`, so the main WebSocket receive loop does not
    wait for the LLM call to finish.

    Parameters
    ----------
    parsed : ParsedMessage
        The already-parsed server message.
    ws : websockets.WebSocketClientProtocol
        The live WebSocket connection back to the game server.
    send_lock : asyncio.Lock
        A lock so that only one task calls `ws.send(...)` at a time.
    """
    try:
        if parsed is None:
            return

        outgoing: Optional[Dict[str, Any]] = None

        # --- Decide what to do based on message type ---
        if parsed.type == "morning-discussion":
            # All Players: build a vote for morning discussion 
            outgoing = await build_vote_from_morning_discussion(parsed, game_events, done_voting=True)

        elif parsed.type == "night-discussion":
            # Raven: build votes for night elimination
            outgoing = await build_raven_vote_from_night_discussion(parsed, game_events)

        elif parsed.type == "night-investigation":
            # Detective: choose a target to investigate
            outgoing = await build_detective_vote_from_night_investigation(parsed, game_events)

        elif parsed.type == "night-protection":
            # Doctor: choose someone to protect
            outgoing = await build_doctor_vote_from_night_protection(parsed, game_events)

        else:
            # For all other message types (acks, results, etc.) we just log.
            print(f"[{ts()}] ‚ÑπÔ∏è No action needed/handled for type={parsed.type!r}")
            return

        if not outgoing:
            # Nothing to send (e.g., no valid targets)
            print(f"[{ts()}] ‚ÑπÔ∏è No outgoing message built for type={parsed.type!r}")
            return

        # --- Serialize and log OUT message ---
        raw_out = json.dumps(outgoing)
        out_parsed = parse_outgoing_message(raw_out)
        record_event(direction="OUT", raw=raw_out, parsed=out_parsed)

        # Only one task should call ws.send at a time ‚Üí use a lock.
        async with send_lock:
            await ws.send(raw_out)

        print(f"[Bot ‚Üí sent]\n{json.dumps(outgoing, indent=2)}")

    except Exception as e:
        # Make sure background task failures are visible
        print(f"[{ts()}] ‚ö†Ô∏è Error in handle_message_and_respond: {e}")

## Main Loop `connect_parse_respond_forever`

In [20]:
async def connect_parse_respond_forever():
    """
    Main loop: keep the WebSocket connection open, keep *listening* for messages,
    and spin off background tasks to handle any LLM / decision work.

    The key idea:
    - This loop ONLY waits on `ws.recv()` and other cheap operations.
    - For any message that needs an LLM call, we do:

          asyncio.create_task(handle_message_and_respond(parsed, ws, send_lock))

      so the loop can immediately go back to listening for the next message.
    """
    print(f"[{ts()}] üîå Connecting to {WS_URL} ...")
    try:
        # Dictionary to store events by gameID
        game_events = {}

        def add_event(game_id, event):
            """Add an event to the list for a given game_id."""
            if game_id not in game_events:
                game_events[game_id] = []
            game_events[game_id].append(event)
            
        def add_role(game_id, role):
            if role is not None:
                game_events[game_id + "_role"] = role
    
            
        async with websockets.connect(WS_URL, open_timeout=CONNECT_TIMEOUT) as ws:
            print(f"[{ts()}] ‚úÖ Connection established.")

            # One lock shared by all background tasks that want to send on this websocket
            send_lock = asyncio.Lock()

            while True:
                try:
                    # 1) Wait for the next message from the server
                    msg = await asyncio.wait_for(ws.recv(), timeout=RECV_TIMEOUT)

                    # 2) Log and parse incoming message
                    print(f"\n[Server ‚Üí raw] {msg}")
                    parsed = parse_message(msg)
                    print_parsed_message(parsed)
                    add_event(parsed.game_id, parsed_message_to_obj(parsed))
                    # print("*"*50)
                    # print(game_events)
                    # print("*"*50)
                    
                    add_role(parsed.game_id, parsed.your_role)

                    # Record the IN event (even if parsing failed, we capture raw)
                    record_event(direction="IN", raw=msg, parsed=parsed)

                    if parsed is None:
                        # Invalid/unknown message, nothing more to do
                        continue

                    # 3) Fire-and-forget background task to handle LLM + response.
                    #    This is what makes the LLM call *non-blocking* for the main loop.
                    asyncio.create_task(handle_message_and_respond(parsed, ws, send_lock, game_events=game_events))

                except asyncio.TimeoutError:
                    if KEEP_ALIVE:
                        # No messages recently, but keep the connection open.
                        continue
                    print(f"[{ts()}] ‚èπÔ∏è No messages within {RECV_TIMEOUT}s; closing connection.")
                    break
                except websockets.exceptions.ConnectionClosedOK:
                    print(f"[{ts()}] üîí Connection closed by server (OK).")
                    break
                except websockets.exceptions.ConnectionClosedError as e:
                    print(f"[{ts()}] ‚ùå Connection closed with error: {e}")
                    break
                except Exception as e:
                    print(f"[{ts()}] ‚ö†Ô∏è Unexpected error while listening: {e}")
                    break

    except Exception as e:
        print(f"[{ts()}] ‚ùå Connection failed: {e}")

# Run and Debug

## Run the bot

In [None]:
#if the bot_task is already running, run bot_task.cancel() first the old task
try:
    bot_task.cancel()
except Exception:
    pass

# Start a (new) bot task
bot_task = asyncio.create_task(connect_parse_respond_forever())



[Server ‚Üí raw] {"gameId":"CDFA2CC2-4824-4B76-BDAC-000BB68E94BB","gameNumber":"Game1","type":"morning-player-comment","day":3,"phase":"morning","timeRemaining":23.9707913398743,"otp":"BC9CC39D-730C-46B2-82CF-650888A154FA","discussions":[]}

Type           : morning-player-comment
Match ID       : None
Game ID        : CDFA2CC2-4824-4B76-BDAC-000BB68E94BB
Your ID        : None
Day            : 3
Phase          : morning
Time Remaining : 23.9707913398743
OTP            : BC9CC39D-730C-46B2-82CF-650888A154FA
Discussions    :
[2025-11-28 20:30:07] ‚ÑπÔ∏è No action needed/handled for type='morning-player-comment'

[Server ‚Üí raw] {"gameId":"CDFA2CC2-4824-4B76-BDAC-000BB68E94BB","gameNumber":"Game1","type":"morning-player-comment","day":3,"phase":"morning","timeRemaining":3.97492980957031,"otp":"BC9CC39D-730C-46B2-82CF-650888A154FA","discussions":[]}

Type           : morning-player-comment
Match ID       : None
Game ID        : CDFA2CC2-4824-4B76-BDAC-000BB68E94BB
Your ID        : None
D

## Print Games logs

In [22]:
# game_ids = list(GAME_LOGS.keys())
# game_ids

In [23]:
# print_game_log_from_memory(game_ids[0])

## Run the Below cell stop the task

In [24]:
#bot_task.cancel()