In [1]:
import asyncio
from dotenv import load_dotenv
from minference.lite.inference import InferenceOrchestrator, RequestLimits
from minference.lite.models import ChatThread, LLMConfig, CallableTool, LLMClient, ResponseFormat, SystemPrompt, StructuredTool
from typing import List
from pydantic import BaseModel
from minference.enregistry import EntityRegistry
from minference.caregistry import CallableRegistry

In [2]:

load_dotenv()
EntityRegistry()
CallableRegistry()

oai_request_limits = RequestLimits(max_requests_per_minute=500, max_tokens_per_minute=200000)
orchestrator = InferenceOrchestrator(oai_request_limits=oai_request_limits)


In [5]:
from typing import List, Dict, Tuple, Optional, Set
import heapq
import random

class GridMap:
    """
    A grid-based map that supports walls/floors, apples, a player state,
    and methods for observation and movement using Dijkstra.
    """

    # ANSI escape codes for yellow background and reset
    YELLOW_BG = "\033[43m"
    RESET = "\033[0m"

    def __init__(
        self, 
        width: int, 
        height: int, 
        vision_range: int = 3, 
        movement_range: int = 3, 
        walls: Optional[List[Tuple[int, int]]] = None,
        apples: Optional[List[Tuple[int, int]]] = None
    ) -> None:
        """
        Initialize the grid map.

        :param width: Width of the grid.
        :param height: Height of the grid.
        :param vision_range: How far the player can see (ignoring line of sight).
        :param movement_range: How far the player can move in one turn (in steps).
        :param walls: A list of (x, y) coordinates that are walls.
                      If None, no walls are placed.
        :param apples: A list of (x, y) coordinates that contain apples.
                       If None, no apples are placed.
        """
        self.width: int = width
        self.height: int = height
        self.vision_range: int = vision_range
        self.movement_range: int = movement_range

        # 2D array to store grid cells:
        #   0 = floor (walkable), 1 = wall (unwalkable)
        self.grid: List[List[int]] = [[0 for _ in range(width)] for _ in range(height)]
        
        # Set of apple locations for quick lookup and removal
        self.apples: Set[Tuple[int, int]] = set()
        
        # Add walls if provided
        if walls is not None:
            for (wx, wy) in walls:
                if 0 <= wx < width and 0 <= wy < height:
                    self.grid[wy][wx] = 1
        
        # Add apples if provided
        if apples is not None:
            for (ax, ay) in apples:
                if 0 <= ax < width and 0 <= ay < height:
                    # Ensure apples are placed on walkable cells
                    if self.grid[ay][ax] == 0:
                        self.apples.add((ax, ay))

        
        
        # Track how many apples the player has collected
        self.apples_collected: int = 0

        # Keep track of the total distance traveled
        self.distance_traveled: int = 0

        # Player's default position (top-left corner)
        self.place_player_in_random_floor_cell()

        # Store the last path taken (for highlighting)
        self.last_path: List[Tuple[int, int]] = []

        # Ensure the default position is walkable
        if self.grid[self.player_y][self.player_x] == 1:
            raise ValueError("Default player position is on a wall. Choose another position.")
        
        # Collect an apple if starting on one
        if (self.player_x, self.player_y) in self.apples:
            self.apples.remove((self.player_x, self.player_y))
            self.apples_collected += 1
    def place_player_in_random_floor_cell(self) -> None:
        """
        Pick a random floor cell (where self.grid[y][x] == 0) and place the player there.
        If there's an apple there, collect it automatically via set_player_position.
        """
        floor_cells = []
        for y in range(self.height):
            for x in range(self.width):
                if self.grid[y][x] == 0:  # 0 = floor
                    floor_cells.append((x, y))

        if not floor_cells:
            raise ValueError("No floor cells found in this map. Cannot place the player.")

        chosen_x, chosen_y = random.choice(floor_cells)
        self.set_player_position(chosen_x, chosen_y)
    def set_player_position(self, x: int, y: int) -> None:
        """
        Set the player's position if the cell is walkable. 
        If there's an apple there, collect it.
        """
        if not self.is_in_bounds(x, y):
            raise ValueError("Position out of bounds.")
        if self.grid[y][x] == 1:
            raise ValueError("Cannot place the player on a wall.")
        
        self.player_x = x
        self.player_y = y

        # If there's an apple at the new position, collect it
        if (x, y) in self.apples:
            self.apples.remove((x, y))
            self.apples_collected += 1

    def is_in_bounds(self, x: int, y: int) -> bool:
        """Check if (x, y) is within the grid bounds."""
        return 0 <= x < self.width and 0 <= y < self.height

    def is_walkable(self, x: int, y: int) -> bool:
        """Check if the cell is floor (walkable)."""
        return self.grid[y][x] == 0

    def get_observation(self) -> Dict[str, object]:
        """
        Return an observation of the cells around the player, 
        restricted by the vision range. This ignores line of sight.
        
        :return: A dict with:
            - "player_position": (player_x, player_y)
            - "apples_collected": number of apples collected so far
            - "distance_traveled": total amount of space the player has traveled
            - "vision_grid": A sub-grid (list of lists) of size 
                             (2*vision_range+1) x (2*vision_range+1), 
                             indicating walls/floors only.
            - "top_left": (top-left x, top-left y) of the sub-grid 
                          in the full map.
        """
        obs: Dict[str, object] = {}
        obs["player_position"] = (self.player_x, self.player_y)
        obs["apples_collected"] = self.apples_collected
        obs["distance_traveled"] = self.distance_traveled

        # Compute bounding box for vision
        min_x: int = max(0, self.player_x - self.vision_range)
        max_x: int = min(self.width - 1, self.player_x + self.vision_range)
        min_y: int = max(0, self.player_y - self.vision_range)
        max_y: int = min(self.height - 1, self.player_y + self.vision_range)

        # Extract the sub-grid of walls/floors
        vision_grid: List[List[int]] = []
        for row in range(min_y, max_y + 1):
            vision_grid.append(self.grid[row][min_x:max_x + 1])

        obs["vision_grid"] = vision_grid
        obs["top_left"] = (min_x, min_y)

        return obs

    def _run_dijkstra(
        self
    ) -> Tuple[List[List[float]], Dict[Tuple[int, int], Optional[Tuple[int, int]]]]:
        """
        Run Dijkstra from the player's position to compute:
            1) dist[y][x]: The minimum distance from the player to (x, y)
            2) predecessor[(x, y)]: The cell from which we arrived at (x, y)
        
        :return: (dist, predecessor)
                 where dist is a 2D list of floats
                 and predecessor is a dict { (x, y): (px, py) or None }
        """
        dist: List[List[float]] = [[float('inf')] * self.width for _ in range(self.height)]
        dist[self.player_y][self.player_x] = 0.0

        predecessor: Dict[Tuple[int, int], Optional[Tuple[int, int]]] = {}
        start: Tuple[int, int] = (self.player_x, self.player_y)
        predecessor[start] = None

        pq: List[Tuple[float, Tuple[int, int]]] = [(0.0, start)]
        directions: List[Tuple[int, int]] = [(1,0), (-1,0), (0,1), (0,-1)]

        while pq:
            current_dist, (cx, cy) = heapq.heappop(pq)
            
            if current_dist > dist[cy][cx]:
                continue

            # We only care up to movement_range
            if current_dist >= self.movement_range:
                continue

            for dx, dy in directions:
                nx, ny = cx + dx, cy + dy
                if self.is_in_bounds(nx, ny) and self.is_walkable(nx, ny):
                    new_dist = current_dist + 1
                    if new_dist < dist[ny][nx]:
                        dist[ny][nx] = new_dist
                        predecessor[(nx, ny)] = (cx, cy)
                        heapq.heappush(pq, (new_dist, (nx, ny)))

        return dist, predecessor

    def _reconstruct_path(
        self, 
        predecessor: Dict[Tuple[int, int], Optional[Tuple[int, int]]], 
        target: Tuple[int, int]
    ) -> Optional[List[Tuple[int, int]]]:
        """
        Reconstruct the path from the predecessor dictionary for the given target.
        
        :param predecessor: Dictionary mapping cell -> previous cell on path
        :param target: The cell we want to reconstruct the path to
        :return: A list of (x, y) coordinates from player's position to target,
                 or None if no path exists.
        """
        if target not in predecessor:
            return None

        path: List[Tuple[int, int]] = []
        current: Optional[Tuple[int, int]] = target
        while current is not None:
            path.append(current)
            current = predecessor[current]
        path.reverse()
        return path

    def get_actions(self) -> Dict[Tuple[int, int], List[Tuple[int, int]]]:
        """
        Use Dijkstra to compute the shortest paths from the player's position 
        to every walkable cell within the movement range. Return all 
        these paths as a dictionary:
        
            (x, y) -> [list of (x, y) from player_pos to (x, y)]

        :return: A dict mapping each reachable walkable cell (within 
                 movement_range steps) to its shortest path from the player.
        """
        dist, predecessor = self._run_dijkstra()

        paths: Dict[Tuple[int, int], List[Tuple[int, int]]] = {}
        for y in range(self.height):
            for x in range(self.width):
                if dist[y][x] != float('inf') and dist[y][x] <= self.movement_range:
                    path = self._reconstruct_path(predecessor, (x, y))
                    if path:
                        paths[(x, y)] = path
        return paths

    def next(self, short_path: List[Tuple[int, int]]) -> Dict[str, object]:
        """
        Advance the player's position along the provided short_path.
        The short_path is expected to be one of the values returned by get_actions().

        If any apple is encountered along this path, the player collects it.
        We also highlight the path on the next grid printout and
        increment distance_traveled.
        
        :param short_path: A list of (x, y) coordinates from the player's 
                           current position to the destination.
        :return: A new observation (from get_observation()) after moving.
        """
        if not short_path:
            # No movement, return current observation
            self.last_path = []
            return self.get_observation()
        
        # Collect apples along the way
        for (px, py) in short_path:
            if (px, py) in self.apples:
                self.apples.remove((px, py))
                self.apples_collected += 1

        # Distance traveled is path length - 1 (number of "moves")
        self.distance_traveled += max(0, len(short_path) - 1)
        
        # The last coordinate in short_path is the destination
        dest_x, dest_y = short_path[-1]
        self.player_x = dest_x
        self.player_y = dest_y

        # Store the path for highlighting in print_grid
        self.last_path = short_path

        return self.get_observation()

    def print_grid(self) -> None:
        """
        Print a formatted ASCII visualization of the entire grid,
        indicating walls (#), floors (.), apples (A), and the player's position (P).
        Also highlight the most recent path taken in yellow (self.last_path).
        Each cell is followed by a space for extra readability.
        """
        for y in range(self.height):
            row_str = ""
            for x in range(self.width):
                # Determine the character to display
                if x == self.player_x and y == self.player_y:
                    cell_char = "P"  # Player
                elif (x, y) in self.apples:
                    cell_char = "A"  # Apple
                elif self.grid[y][x] == 1:
                    cell_char = "#"  # Wall
                else:
                    cell_char = "."  # Floor

                # Highlight if this cell is part of the last path
                if (x, y) in self.last_path:
                    row_str += f"{self.YELLOW_BG}{cell_char}{self.RESET} "
                else:
                    row_str += f"{cell_char} "
            print(row_str)




In [6]:
# ---------------------------
# Example usage:
# ---------------------------
# Define some walls and apples
walls_example = [(2, 1), (2, 2), (2, 3)]
apples_example = [(1, 0), (4, 4), (5, 1)]

# Create a GridMap of size 6x6
gm = GridMap(
    width=6, 
    height=6, 
    vision_range=2, 
    movement_range=3, 
    walls=walls_example,
    apples=apples_example
)

# Place the player in a non-wall cell
gm.set_player_position(0, 0)

print("Initial Grid (P=Player, #=Wall, .=Floor, A=Apple):")
gm.print_grid()

# Get observation
obs = gm.get_observation()
print("\nInitial Observation:")
print("Player Position:", obs["player_position"])
print("Apples Collected:", obs["apples_collected"])
print("Distance Traveled:", obs["distance_traveled"])
print("Vision Grid:")
for row in obs["vision_grid"]:
    print(row)

# Get possible actions (shortest paths)
actions_dict = gm.get_actions()
print("\nPossible Actions (destination -> path):")
for k, v in actions_dict.items():
    print(k, "->", v)

# Demonstrate choosing an action that might collect an apple.
chosen_path = None
for cell, path in actions_dict.items():
    # If the path includes any apple location, pick it
    if any(pos in gm.apples for pos in path):
        chosen_path = path
        break

if chosen_path is None and actions_dict:
    # If no path goes through an apple, pick the last reachable cell
    chosen_path = list(actions_dict.values())[-1]

if chosen_path:
    print("\nChosen path:", chosen_path)
    new_obs = gm.next(chosen_path)
    
    print("\nAfter moving, new grid (yellow = path taken):")
    gm.print_grid()
    
    print("\nNew Observation:")
    print("Player Position:", new_obs["player_position"])
    print("Apples Collected:", new_obs["apples_collected"])
    print("Distance Traveled:", new_obs["distance_traveled"])
    print("Vision Grid:")
    for row in new_obs["vision_grid"]:
        print(row)
else:
    print("\nNo valid path found or no actions available.")


Initial Grid (P=Player, #=Wall, .=Floor, A=Apple):
P A . . . . 
. . # . . A 
. . # . . . 
. . # . . . 
. . . . A . 
. . . . . . 

Initial Observation:
Player Position: (0, 0)
Apples Collected: 0
Distance Traveled: 0
Vision Grid:
[0, 0, 0]
[0, 0, 1]
[0, 0, 1]

Possible Actions (destination -> path):
(0, 0) -> [(0, 0)]
(1, 0) -> [(0, 0), (1, 0)]
(2, 0) -> [(0, 0), (1, 0), (2, 0)]
(3, 0) -> [(0, 0), (1, 0), (2, 0), (3, 0)]
(0, 1) -> [(0, 0), (0, 1)]
(1, 1) -> [(0, 0), (0, 1), (1, 1)]
(0, 2) -> [(0, 0), (0, 1), (0, 2)]
(1, 2) -> [(0, 0), (0, 1), (0, 2), (1, 2)]
(0, 3) -> [(0, 0), (0, 1), (0, 2), (0, 3)]

Chosen path: [(0, 0), (1, 0)]

After moving, new grid (yellow = path taken):
[43m.[0m [43mP[0m . . . . 
. . # . . A 
. . # . . . 
. . # . . . 
. . . . A . 
. . . . . . 

New Observation:
Player Position: (1, 0)
Apples Collected: 1
Distance Traveled: 1
Vision Grid:
[0, 0, 0, 0]
[0, 0, 1, 0]
[0, 0, 1, 0]


In [7]:
import random
from typing import List, Tuple

class ProceduralMapGenerator:
    """
    Generates a 2D grid map layout (walls + apples) via a
    rooms-and-corridors style algorithm.
    """

    def __init__(
        self,
        width: int,
        height: int,
        num_rooms: int,
        num_apples: int,
        min_room_size: int = 3,
        max_room_size: int = 6,
        seed: int = 0
    ) -> None:
        """
        :param width: Width of the map.
        :param height: Height of the map.
        :param num_rooms: Number of rooms to create.
        :param num_apples: Number of apples to place in the dungeon.
        :param min_room_size: Minimum size (width or height) of a room.
        :param max_room_size: Maximum size (width or height) of a room.
        :param seed: Random seed for reproducibility.
        """
        self.width = width
        self.height = height
        self.num_rooms = num_rooms
        self.num_apples = num_apples
        self.min_room_size = min_room_size
        self.max_room_size = max_room_size
        self.random = random.Random(seed)

        # This 2D array will store walls/floors:
        #   1 = wall, 0 = floor
        self.grid = [[1 for _ in range(self.width)] for _ in range(self.height)]

        # Keep track of the rooms as lists of (x, y) floor positions
        self.rooms: List[Tuple[int, int, int, int]] = []
        # Each room is stored as: (room_x, room_y, room_w, room_h)
        # meaning top-left corner is (room_x, room_y) 
        # and the room is room_w wide and room_h tall.

    def generate(self) -> Tuple[List[Tuple[int, int]], List[Tuple[int, int]]]:
        """
        Generates the dungeon layout and returns:
          - A list of walls (x, y)
          - A list of apples (x, y)
        """
        self._create_rooms()
        self._connect_rooms()
        walls = self._extract_walls()
        apples = self._place_apples()
        return walls, apples

    def _create_rooms(self) -> None:
        """
        Randomly carve out a number of rectangular rooms inside the grid.
        """
        created_rooms = 0
        
        attempts = 0
        max_attempts = 5 * self.num_rooms  # Limit how many times we try to place rooms

        while created_rooms < self.num_rooms and attempts < max_attempts:
            attempts += 1

            # Random room size
            w = self.random.randint(self.min_room_size, self.max_room_size)
            h = self.random.randint(self.min_room_size, self.max_room_size)

            # Random top-left position for the room
            x = self.random.randint(1, self.width - w - 1)  # 1 to leave boundary walls
            y = self.random.randint(1, self.height - h - 1)

            # Check if this room overlaps significantly with existing rooms
            if not self._room_overlaps(x, y, w, h):
                self._carve_room(x, y, w, h)
                self.rooms.append((x, y, w, h))
                created_rooms += 1

    def _room_overlaps(self, rx: int, ry: int, rw: int, rh: int) -> bool:
        """
        Check if the proposed room overlaps with any existing room 
        (with a small margin to keep them separate or partially separate).
        """
        # We can allow slight adjacency, so let's define a small 'border' 
        # so rooms don't overlap by 1 cell. 
        # Feel free to tweak how strict you want this overlap check to be.
        border = 1

        for (x, y, w, h) in self.rooms:
            # Check if horizontally or vertically they separate
            if (rx + rw + border <= x) or (rx >= x + w + border):
                continue  # no overlap in X dimension
            if (ry + rh + border <= y) or (ry >= y + h + border):
                continue  # no overlap in Y dimension

            return True  # Overlaps
        return False

    def _carve_room(self, x: int, y: int, w: int, h: int) -> None:
        """
        Carve out a rectangular room in the grid by setting those cells to 0 (floor).
        """
        for row in range(y, y + h):
            for col in range(x, x + w):
                self.grid[row][col] = 0

    def _connect_rooms(self) -> None:
        """
        Create corridors between rooms to ensure they are connected.
        A simple approach:
          - Sort rooms by center.
          - Carve corridors from each room to the next in sorted order.
        """
        if len(self.rooms) <= 1:
            return  # No need to connect if there's only one or no rooms

        # Sort rooms by their center's x coordinate
        self.rooms.sort(key=lambda r: r[0] + r[2] // 2)  # sort by center x
        # Could also sort by center y or a combination of x and y

        for i in range(len(self.rooms) - 1):
            # current room
            (x1, y1, w1, h1) = self.rooms[i]
            # next room
            (x2, y2, w2, h2) = self.rooms[i + 1]

            # find center of current room
            cx1 = x1 + w1 // 2
            cy1 = y1 + h1 // 2
            # find center of next room
            cx2 = x2 + w2 // 2
            cy2 = y2 + h2 // 2

            # carve corridor from (cx1, cy1) -> (cx2, cy2)
            # A simple approach: carve horizontal corridor, then vertical 
            # (or vice versa)
            if cx1 < cx2:
                for x in range(cx1, cx2 + 1):
                    self.grid[cy1][x] = 0
            else:
                for x in range(cx2, cx1 + 1):
                    self.grid[cy1][x] = 0

            if cy1 < cy2:
                for y in range(cy1, cy2 + 1):
                    self.grid[y][cx2] = 0
            else:
                for y in range(cy2, cy1 + 1):
                    self.grid[y][cx2] = 0

    def _extract_walls(self) -> List[Tuple[int, int]]:
        """
        Parse self.grid and return a list of (x, y) that are walls.
        """
        walls = []
        for y in range(self.height):
            for x in range(self.width):
                if self.grid[y][x] == 1:
                    walls.append((x, y))
        return walls

    def _place_apples(self) -> List[Tuple[int, int]]:
        """
        Randomly place apples on floor cells.
        Return the list of apple positions.
        """
        floor_cells = []
        for y in range(self.height):
            for x in range(self.width):
                if self.grid[y][x] == 0:  # floor
                    floor_cells.append((x, y))

        # Shuffle floor cells so we pick random positions
        self.random.shuffle(floor_cells)

        # Choose up to num_apples distinct positions (or fewer if not enough floor cells)
        chosen_apples = floor_cells[:min(len(floor_cells), self.num_apples)]
        return chosen_apples


In [20]:


# ---------------------------
# Example usage with GridMap:
# ---------------------------
from pprint import pprint
# Example: create a generator for a 30x20 dungeon, with 5 rooms, 10 apples
generator = ProceduralMapGenerator(
    width=30,
    height=20,
    num_rooms=5,
    num_apples=10,
    seed=42  # remove or change seed for random variability
)
walls, apples = generator.generate()

procedural_grid = GridMap(width=30, height=20, vision_range=10, movement_range=5, walls=walls, apples=apples)
max_steps = 10
print("Initial Grid (P=Player, #=Wall, .=Floor, A=Apple):")
init_state = procedural_grid.get_observation()

print(init_state["player_position"], init_state["apples_collected"], init_state["distance_traveled"])

procedural_grid.print_grid()

for i in range(max_steps):
    actions = procedural_grid.get_actions()
    random_action = random.choice(list(actions.keys()))
    next_state = procedural_grid.next(actions[random_action])

    print("After moving, new grid (yellow = path taken):")
    print(next_state["player_position"], next_state["apples_collected"], next_state["distance_traveled"])
    procedural_grid.print_grid()



Initial Grid (P=Player, #=Wall, .=Floor, A=Apple):
(20, 15) 0 0
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 
# # # . A . # # # # # # # # # # # # A . . . # # # # # # # # 
# # # . . A . . # # # # # # # # # # . . . A . . . . # # # # 
# # # . . . # . # # # # # # # # # # . A . . # # . . . # # # 
# # # # # # # . # # # # # # # # # # # # . # # # . . . # # # 
# # # # # # # . # # # # # # # # # # # # . # # # . . . # # # 
# # # # # A . . . # # # # # # # # # # # . # # # # # # # # # 
# # # # # . A . . # # # # # # # # # # # . # # # # # # # # # 
# # # # # . . . . . . . . . . . . . A . . A # # # # # # # # 
# # # # # . . . . # #