# Buil Game Agent

## Import libraries

In [None]:
# !uv add pydantic_graph mermaid-python

In [2]:
from dotenv import load_dotenv
from config import Config
from pydantic import BaseModel, ConfigDict
from pydantic_ai.usage import Usage, UsageLimits
from pydantic_ai import ModelRetry
from player import Player
import numpy as np
from dataclasses import dataclass, field
from collections import deque
import logfire

from client import Client


logfire.configure()  
logfire.instrument_pydantic_ai()  

import nest_asyncio
nest_asyncio.apply()

pygame 2.6.1 (SDL 2.28.4, Python 3.12.10)
Hello from the pygame community. https://www.pygame.org/contribute.html
No Logfire project credentials found.
All data sent to Logfire must be associated with a project.



LogfireConfigError: You are not authenticated. Please run `logfire auth` to authenticate.

If you are running in production, you can set the `LOGFIRE_TOKEN` environment variable.
To create a write token, refer to https://logfire.pydantic.dev/docs/guides/advanced/creating_write_tokens/


In [3]:
load_dotenv('./.env',override=True)

True

## Common functions

In [6]:
from pydantic_ai import Agent, RunContext

def find_resource(grid:np.array, resource: str) -> list:
        """Find resource cells"""
    
        pair_of_row_cols = np.where(grid==resource)
        
        l = []
        if pair_of_row_cols:
            for r,c in zip(pair_of_row_cols[0], pair_of_row_cols[1]):
                l.append({'row': int(r), 'col': int(c)})    
        return l

# Move step (dr, dc, move_code)
MOVES = [
    (0, -1, 0),  # left
    (0, 1,  1),  # right
    (-1,0,  2),  # up
    (1, 0,  3),  # down
]

def is_valid(r, c, grid):
    h, w = grid.shape
    return 0 <= r < h and 0 <= c < w and grid[r, c] in ('g', '-1')
    
def shortest_path():

    return None

# Agents

## Orchestrator Agent

In [7]:
class Position(BaseModel):
    row: int
    col: int
    
class Task(BaseModel):
    task_description: str
    
class CalCulateWinConditionTask(Task):
    """
    step by step to calculate resource need, you can apply some calculate as +, -, * , / operator
    """
    explain: str
    wood_need: int
    cotton_need: int

class CollectResourceTask(Task):
    position: Position
    
class CollectFoodTask(CollectResourceTask):
    pass

class CollectWoodTask(CollectResourceTask):
    pass

class CollectCottonTask(CollectResourceTask):
    pass

class GoHomeTask(Task):
    pass

class DiscoverMapTask(Task):
    pass

class UnknownTask(Task):
    pass

class WinCondition(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    wood_need: int
    cotton_need: int
    player: Player
    
@dataclass
class OrchestratorAgent:
    
    # Define constants for actions
    KILL_OTHERS_TASK = "kill other players"
    COLLECT_FOOD_TASK = "collect food"
    COLLECT_WOOD_TASK = "collect wood"
    COLLECT_COTTON_TASK = "collect cotton"
    GOTO_CELL_TASK = "Go to cell to collect resource"
    DISCOVER_MAP_TASK = "discover map"
    CALCULATE_WIN_CONDITION = "calculate condition to win the game"
    
    GO_HOME_TASK = "go home"
    UNKNOWN_TASK = "unknown"
    config = Config()
    
    agent = Agent(
            f'openai:{config.LLM_MODEL}',
            output_type= CalCulateWinConditionTask | CollectFoodTask | CollectWoodTask | CollectCottonTask | GoHomeTask | Task | DiscoverMapTask | UnknownTask,
            system_prompt=(
                "You are a helpful ai assistant\n"
                "Suggest the task to win the game"
                
            ),
            result_retries=2,
        )

    
    @agent.system_prompt
    def system_prompt(ctx: RunContext[WinCondition|Player]) -> str:
        wood_need = max(0, ctx.deps.wood_need - ctx.deps.player.store.count('w'))
        cotton_need = max(0, ctx.deps.cotton_need - ctx.deps.player.store.count('c'))

        return f"""Identify the task should be to do
You are an agent to play a 2D game. The map is a matrix 2D, cell values can be:
  - 'g': cell is ground and you can step into it
  - '-1': invisibe cell, you can can step into it 
  - 'f', 'w', 'c': food, wood, cotton cell. You can collect it.

The **IMPORTANCE TASK IS COLLECT ONLY NEEDED RESOURCE** as wood, cotton to win the game. 



Base on the current map, your current resource or notify you suggest which task should to take.
The task is choosen from these options: `{OrchestratorAgent.KILL_OTHERS_TASK}`, `{OrchestratorAgent.COLLECT_FOOD_TASK}`, `{OrchestratorAgent.COLLECT_WOOD_TASK}`, `{OrchestratorAgent.COLLECT_COTTON_TASK}`, `{OrchestratorAgent.DISCOVER_MAP_TASK}`, `{OrchestratorAgent.GOTO_CELL_TASK}`, `{OrchestratorAgent.CALCULATE_WIN_CONDITION}`, `{OrchestratorAgent.GO_HOME_TASK}` and `{OrchestratorAgent.UNKNOWN_TASK}`

- If task is "calculate condition to win the game" try to number wood, cotton need to collect.

- If you do not see any resource you should suggest the Discorver Task 

- If task is goto cell you should extract position of cell and return it

Below helpfull info help you decine to select task:
 - Resource need to collect:
     - wood needed to collect: {wood_need}\n"
     - cotton needed to collect: {cotton_need}\n"

 - Your info : {ctx.deps.player}"
            "Action will be invalid for all invalid messages"
"""
    
    @agent.output_validator
    def validate_output(ctx: RunContext[None], result: Task) -> Task:
        print(f'Validate task: {result}')
        if isinstance(result, UnknownTask):
            raise ModelRetry(
                f"Try to determine task or suggest discover map task"
            )
        
        return result

    

## Test OrchestratorAgent

### Task Analysis Assessment

In [27]:
c = Client("localhost", 54712)

In [28]:
c.set_player_name('Hi')

'Hi'

### Win condition

In [29]:
o = OrchestratorAgent()
mission_text =  """To complete this game, you need to collect 2 units of wood and 2 units of fabric. Every 3 units of cotton can be converted into 1 unit of fabric."""
o.agent.run_sync(mission_text, 
        deps = WinCondition(
            wood_need=0,
            cotton_need=0,
            player= c.player))

Validate task: task_description='Calculate resources needed to win the game' explain='To win, I need to collect 2 units of wood and 2 units of fabric. Since every 3 units of cotton can be converted into 1 unit of fabric, I need to collect 6 units of cotton to make the 2 units of fabric.' wood_need=2 cotton_need=6


AgentRunResult(output=CalCulateWinConditionTask(task_description='Calculate resources needed to win the game', explain='To win, I need to collect 2 units of wood and 2 units of fabric. Since every 3 units of cotton can be converted into 1 unit of fabric, I need to collect 6 units of cotton to make the 2 units of fabric.', wood_need=2, cotton_need=6))

### Resource Need

In [30]:
c.allow_collect_items()
c.player.store = ['c','c','w','w','w'] 
o = OrchestratorAgent()
mission_text =  """Do no suggest condition win task. What task i should to do next?"""
o.agent.run_sync(mission_text, 
        deps = WinCondition(
            wood_need=5,
            cotton_need=4,
            player= c.player))

Validate task: task_description='Discover the map to search for resources.'


AgentRunResult(output=DiscoverMapTask(task_description='Discover the map to search for resources.'))

### Event

In [32]:
c.allow_collect_items()
c.player.store = ['c','w'] * 1
o = OrchestratorAgent()
mission_text =  """Collect food"""
o.agent.run_sync(mission_text, 
        deps = WinCondition(
            wood_need=4,
            cotton_need=0,
            player= c.player))

Validate task: task_description='Discover the map to find resources.'


AgentRunResult(output=DiscoverMapTask(task_description='Discover the map to find resources.'))

In [33]:
c.allow_collect_items()
c.player.store = ['c','w'] * 1
o = OrchestratorAgent()
mission_text =  """Which resource I should collect?"""
o.agent.run_sync(mission_text, 
        deps = WinCondition(
            wood_need=4,
            cotton_need=5,
            player= c.player))

Validate task: task_description='Calculate the resources needed to win' explain='We need to collect the necessary resources to win the game.' wood_need=3 cotton_need=4


AgentRunResult(output=CalCulateWinConditionTask(task_description='Calculate the resources needed to win', explain='We need to collect the necessary resources to win the game.', wood_need=3, cotton_need=4))

In [31]:
c.allow_collect_items()
c.player.store = ['c','w'] * 1
o = OrchestratorAgent()
mission_text =  """Go to the cell at (10, 18) to obtain one unit of fabric."""
o.agent.run_sync(mission_text, 
        deps = WinCondition(
            wood_need=4,
            cotton_need=0,
            player= c.player))

Validate task: task_description='Go to the cell at (10, 18) to obtain one unit of fabric.'


AgentRunResult(output=GoHomeTask(task_description='Go to the cell at (10, 18) to obtain one unit of fabric.'))

## Player Agent

In [17]:
class FindContext(BaseModel):
    player: Player
    model_config = ConfigDict(arbitrary_types_allowed=True)

class Position(BaseModel):
    row: int
    col: int
    
class ResourcePositions(BaseModel):
    positions: list[Position]
    explain: str

class FoodPositions(BaseModel):
    positions: list[Position]
    explain: str

class WoodPositions(BaseModel):
    positions: list[Position]
    explain: str

class CottonPositions(BaseModel):
    positions: list[Position]
    explain: str

class MapExplorer(BaseModel):
    positions: list[Position]
    explain: str

     
class Path(BaseModel):
    directions: list[int]
    explain: str

In [36]:
@dataclass
class PlayGameAgent:
    prompting_for_finding_resource = """

You are tasked with locating resources on the map defined in World Representation.
Return a list of all cells that contain a resource food, wood, or cotton where each is denoted by:

‘f’ for food

‘w’ for wood

‘c’ for cotton

'-1' for discover map

if No wood resources found you should return MapExlorer
    
"""
    config = Config()
    agent = Agent(
            f'openai:{config.LLM_MODEL}',
            output_type= FoodPositions | WoodPositions | CottonPositions | MapExplorer|  Path,
            system_prompt= prompting_for_finding_resource,
            deps_type=FindContext
        ) 

    @agent.tool
    def map_expolorer(ctx: RunContext[FindContext]) -> list:
        """Find -1 cells"""

        row = ctx.deps.player.row
        col = ctx.deps.player.col
        
        return find_resource(ctx.deps.player.grid[row-1: row+1,col-1:col+1],  -1)
        
    @agent.tool
    def find_food(ctx: RunContext[FindContext]) -> list:
        """Find food cells"""
        
        return find_resource(ctx.deps.player.grid, 'f')
        
    
    @agent.tool
    def find_wood(ctx: RunContext[FindContext]) -> list:
        """Find wood cells"""
        
        return find_resource(ctx.deps.player.grid, 'w')
    
    @agent.tool
    def find_cotton(ctx: RunContext[FindContext]) -> list:
        """Find wood cells"""
        
        return find_resource(ctx.deps.player.grid, 'c')
        
    @agent.tool
    def find_shortest_path(ctx: RunContext[FindContext], resource_positions: list[Position]) -> list:
        """Find shortest path"""
    
        grid = ctx.deps.player.grid
        
        positions = resource_positions
            
        grid_tmp = np.copy(ctx.deps.player.grid)
    
        
        if len(positions)==0:
            return None # None mean agent don't see resource
            
        # peusdo code
        sorttest = [1,1,1,1]
        
        return sorttest
                    

## Test PlayGameAgent

In [37]:
play_game_agent = PlayGameAgent()

In [38]:
player = c.get_player()
play_game_agent.agent.run_sync(
    'find wood',
    deps=FindContext(
        player= player
    )
)

AgentRunResult(output=MapExplorer(positions=[], explain='No wood resources found, showing the map explorer.'))

In [39]:
player = c.get_player()
play_game_agent.agent.run_sync(
    'Find shortest path to collect wood only',
    deps=FindContext(
        player= player
    )
)

AgentRunResult(output=MapExplorer(positions=[], explain='No wood resources found.'))

# Game Client

In [40]:
from dataclasses import dataclass
from rich.prompt import Prompt
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from __future__ import annotations
import time
from mermaid import Mermaid

from client import Client

config: Config = Config()

class GameClient(Client):
    def __init__(self):
        
        super().__init__()

        self.task: str = ''
        self.messages: list[str] = []

        self.usage = Usage()

        self.orchestrator_agent = OrchestratorAgent()

        self.play_game_agent = PlayGameAgent()
        
        
    def print_usage(self):
        
        print("Prompt tokens:", self.usage.request_tokens)
        print("Response tokens:", self.usage.response_tokens)
        print("Total tokens:", self.usage.total_tokens)

    def find_shortest_path_to(self, row, col):
        player = self.get_player()
        path = shortest_path((self.player.row, self.player.col),
                     (row, col),
                     self.player.grid)
        return path
    
    def go_step_by_step(self, path: list[int]):
        for direction in path:
            self.move(direction)
            time.sleep(1)
        
    def go_home(self):
        player = self.get_player()
        home_path = self.find_shortest_path_to(self.player.home_row, self.player.home_col)
        self.go_step_by_step(home_path)

        

# Build Graph

## Define Graph State

In [41]:
@dataclass
class GameState:  
    client: GameClient
    name: str
    food_need: int 
    fabric_need: int

## Define nodes

### Create Game Node

In [42]:
@dataclass
class CreateGame(BaseNode[GameState]):  
    async def run(self, ctx: GraphRunContext[GameState]) -> TakeMission:
        return TakeMission()

### Take on the mission

In [43]:
@dataclass
class TakeMission(BaseNode[GameState]):
    async def run(self, ctx: GraphRunContext[GameState]) -> WaitingStartGame:
        
        # Store mission to context

        client = ctx.state.client 
        
        player = client.get_player()
        while not player.message:
            time.sleep(1)
            player = client.get_player()

        client.messages.append(player.message)

        result = client.orchestrator_agent.agent.run_sync(
            "Parse this message to get the resource need: " + player.message,
            deps = WinCondition(
                wood_need=0,
                cotton_need=0,
                player= player
            )
        )

        print(result.output)

        ctx.state.wood_need = result.output.wood_need
        ctx.state.cotton_need = result.output.cotton_need

        print(f'{ctx.state=}')

        return WaitingStartGame()
        
        

### Waiting Start Game

In [44]:
@dataclass
class WaitingStartGame(BaseNode[GameState]):
    async def run(self, ctx: GraphRunContext[GameState]) -> IdentifyTask | End:
        
        msg = 'Waiting start game'
        client = ctx.state.client 
        while True:
            player = client.get_player()
            msg += '.'
            print(msg)
            if player.status.value == 'playing':
                break
            time.sleep(1)

        print('Game started. New Identity task to do')
                
        return IdentifyTask()

### Identify the tasks to complete

In [45]:
@dataclass
class IdentifyTask(BaseNode[GameState]):
    async def run(self, ctx: GraphRunContext[GameState]) -> ExecuteTask | End:

        client = ctx.state.client 
        player = client.get_player()

        # Check new message 
        message = player.message

        if message not in client.messages:
            client.messages.append(player.message)
            result = client.orchestrator_agent.agent.run_sync(
                "Base on this message suggest what task name I sould to do: Message" + player.message,
                deps = WinCondition(
                    wood_need= ctx.state.wood_need,
                    cotton_need=ctx.state.cotton_need,
                    player= player
                )
            )
        else:
            result = client.orchestrator_agent.agent.run_sync(
                "What is my next task. Do not suggest CalCulateWinConditionTask",
                deps = WinCondition(
                    wood_need= ctx.state.wood_need,
                    cotton_need=ctx.state.cotton_need,
                    player= player
                )
            )

        print(result)

        return ExecuteTask(result.output)
        # return End(None)

### Execute Task

In [46]:
@dataclass
class ExecuteTask(BaseNode[GameState]):
    task: Task
    async def run(self, ctx: GraphRunContext[GameState]) -> IdentifyTask:
        if isinstance(self.task, CollectWoodTask):
            
            client = ctx.state.client 

            player = client.get_player()

            client.allow_collect_items()

            result = client.play_game_agent.agent.run_sync(
                 f'Find shortest path go to get wood.',
                usage=ctx.state.client.usage,
                deps=FindContext(player=player)
            )
            
        
            if isinstance(result.output, Path) and  len(result.output.directions) > 0:
                ctx.state.client.go_step_by_step(result.output.directions)
                ctx.state.client.go_home()
                return IdentifyTask()
        
        # To Do:

        print('Just for test: processing task. It takes about 10 seconds....')
        time.sleep(10)
        print('Identify next task')

        return IdentifyTask()


# Test

In [47]:
game_graph = Graph(
    nodes=[CreateGame, TakeMission, WaitingStartGame, IdentifyTask, ExecuteTask]
)


In [48]:
Mermaid(game_graph.mermaid_code(start_node=CreateGame))

In [54]:
async def create_client_game(game_client):
    state = GameState(
        client= game_client, 
        name= name,
        food_need=0, 
        fabric_need=0
    )  
    await game_graph.run(CreateGame(), state=state) 

import asyncio
import nest_asyncio
nest_asyncio.apply()

agent_id = 2
name = f'Agent {agent_id:02d}'

game_client = GameClient()
game_client.set_player_name("Huhu")

asyncio.run(create_client_game(game_client))

OSError: [WinError 10049] The requested address is not valid in its context