Skip to content
This repository has been archived by the owner on Aug 7, 2024. It is now read-only.

Commit

Permalink
feat: add scan_nearby, MoveTowardsAction and A* pathfinding (#1481)
Browse files Browse the repository at this point in the history
* add scan_nearby and MoveTowardsAction (and A* pathfinding)

* minor refactoring of functions

* simplify while loop

* add test_scan_nearby

* update the full backpack message

* tidy up

* just use cell.interactable to pass the path

* fix test

* Merge branch 'development' into scan_move

* fix the artefact and pickup types

* fix the test
  • Loading branch information
dionizh authored Mar 17, 2021
1 parent 4853c58 commit 1e37fb3
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 13 deletions.
31 changes: 31 additions & 0 deletions aimmo-game-worker/simulation/action.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from abc import ABCMeta, abstractmethod

from .direction import Direction


class Action:
__metaclass__ = ABCMeta
Expand Down Expand Up @@ -30,6 +32,35 @@ def serialise(self):
}


class MoveTowardsAction(Action):
def __init__(self, artefact):
self.direction = None

if not artefact:
print("MoveTowardsAction got an invalid parameter. Is it empty?")
return

if len(artefact.get("path", [])) < 2:
return # not a valid path

# the first cell in the path is the starting cell
avatar_location = artefact["path"][0].location
next_location = artefact["path"][1].location

# calculate direction
x = next_location.x - avatar_location.x
y = next_location.y - avatar_location.y
self.direction = Direction(x, y)

def serialise(self):
if not self.direction:
return {}
return {
"action_type": "move",
"options": {"direction": self.direction.serialise()},
}


class AttackAction(Action):
def __init__(self, direction):
self.direction = direction
Expand Down
130 changes: 130 additions & 0 deletions aimmo-game-worker/simulation/pathfinding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Inspired by https://medium.com/@nicholas.w.swift/easy-a-star-pathfinding-7e6689c7f7b2
# and https://gist.github.com/ryancollingwood/32446307e976a11a1185a5394d6657bc

import heapq

from .location import Location
from .direction import ALL_DIRECTIONS


class Node:
"""
A node class for A* Pathfinding
"""

def __init__(self, parent=None, cell=None):
self.parent = parent
self.cell = cell

self.g = 0 # g is the distance between the current node and the start node
self.h = 0 # h is the heuristic - estimated distance from the current node to the end node
self.f = 0 # f is the total cost of the node (g + h)

@property
def location(self):
return self.cell.location

def __eq__(self, other):
return self.location == other.location

def __repr__(self):
return f"{self.location} - g: {self.g} h: {self.h} f: {self.f}"

# defining less than for purposes of heap queue
def __lt__(self, other):
return self.f < other.f

# defining greater than for purposes of heap queue
def __gt__(self, other):
return self.f > other.f


def _get_adjacent_cells(current_node, world_map):
adj = []
# we move only in 4 directions
for direction in ALL_DIRECTIONS:
# make sure the cell is within the grid
try:
tx = current_node.location.x + direction.x
ty = current_node.location.y + direction.y
cell = world_map.get_cell(Location(tx, ty))
except KeyError:
cell = None
# Make sure walkable cell
if cell and cell.habitable:
adj.append(cell)
return adj


def _constructed_path(current_node):
# follow backwards from current node all the way to the start
path = []
current = current_node
while current is not None:
path.append(current.cell)
current = current.parent
return path[::-1] # Return reversed path


def astar(world_map, start_cell, end_cell):
"""
Returns a list of Cell as a path from the given start to the given end in the given world_map.
The best path navigates the obstacles (but not other avatars, as they're assumed to be moving).
For explanation of the A* pathfinding, see the medium post at the top.
"""

# Create start and end node
start_node = Node(None, start_cell)
end_node = Node(None, end_cell)

# Initialize both open and closed list
open_list = []
closed_list = []

# Heapify the open_list and Add the start node
heapq.heapify(open_list)
heapq.heappush(open_list, start_node)

# Loop until you find the goal or exhaust the nodes
while len(open_list) > 0:

# look for the lowest F cost square on the open list for current node (returned by the heapq)
current_node = heapq.heappop(open_list)
closed_list.append(current_node)

# Found the goal!
if current_node == end_node:
return _constructed_path(current_node)

# Generate children
children = []
for acell in _get_adjacent_cells(current_node, world_map):
new_node = Node(current_node, acell)
children.append(new_node)

for child in children:
if child in closed_list:
continue

# calculate the f, g, and h values
child.g = current_node.g + 1
child.h = ((child.location.x - end_node.location.x) ** 2) + (
(child.location.y - end_node.location.y) ** 2
)
child.f = child.g + child.h

# check if it is already in the open list, and if this path to that square is better,
# using G cost as the measure (lower G is better)
open_nodes = [
open_node
for open_node in open_list
if (child.location == open_node.location and child.g > open_node.g)
]
if len(open_nodes) > 0:
continue

# add the child to the open list
heapq.heappush(open_list, child)

print("Couldn't get a path to destination")
return None
54 changes: 51 additions & 3 deletions aimmo-game-worker/simulation/world_map.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from collections import defaultdict
from .avatar_state import create_avatar_state
from .location import Location
from typing import Dict, List
from .pathfinding import astar

# how many nearby artefacts to return
SCAN_LIMIT = 3
SCAN_RADIUS = 12
ARTEFACT_TYPES = ["chest", "key", "yellow_orb"]
PICKUP_TYPES = ["damage_boost", "invulnerability", "health"] + ARTEFACT_TYPES


class Cell(object):
Expand All @@ -25,7 +33,10 @@ def habitable(self):
return not (self.avatar or self.obstacle)

def has_artefact(self):
return self.interactable is not None and self.interactable["type"] == "artefact"
return (
self.interactable is not None
and self.interactable["type"] in ARTEFACT_TYPES
)

def __repr__(self):
return "Cell({} a={} i={})".format(
Expand Down Expand Up @@ -92,11 +103,10 @@ def interactable_cells(self):
return [cell for cell in self.all_cells() if cell.interactable]

def pickup_cells(self):
pickup_types = ("damage_boost", "invulnerability", "health", "artefact")
return [
cell
for cell in self.interactable_cells()
if cell.interactable["type"] in pickup_types
if cell.interactable["type"] in PICKUP_TYPES
]

def score_cells(self):
Expand Down Expand Up @@ -126,5 +136,43 @@ def can_move_to(self, target_location):
return False
return getattr(cell, "habitable", False) and not getattr(cell, "avatar", False)

def _scan_artefacts(self, start_location, radius):
# get artefacts from starting location within the radius
artefacts = []
for x in range(start_location.x - radius, start_location.x + radius + 1):
for y in range(start_location.y - radius, start_location.y + radius + 1):
try:
cell = self.get_cell(Location(x, y))
except KeyError:
continue
if cell.has_artefact():
artefacts.append(cell)
return artefacts

def scan_nearby(self, avatar_location, radius=SCAN_RADIUS) -> List[dict]:
"""
From the given location point search the given radius for artefacts.
Returns list of nearest artefacts (artefact/interactable represented as dict).
"""
artefact_cells = self._scan_artefacts(avatar_location, radius)

# get the best path to each artefact
nearby = defaultdict(list)
for art_cell in artefact_cells:
path = astar(self, self.cells.get(avatar_location), art_cell)
if path:
nearby[len(path)].append((art_cell, path))

# sort them by distance (the length of path) and take the nearest first
nearest = []
for distance in sorted(nearby.keys()):
for art_cell, path in nearby[distance]:
art_cell.interactable["path"] = path
nearest.append(art_cell.interactable)
if len(nearest) > SCAN_LIMIT:
break

return nearest[:SCAN_LIMIT]

def __repr__(self):
return repr(self.cells)
13 changes: 11 additions & 2 deletions aimmo-game-worker/tests/tests_simulation/test_world_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest import TestCase

from simulation.location import Location
from simulation.world_map import WorldMap, WorldMapCreator
from simulation.world_map import WorldMap, WorldMapCreator, ARTEFACT_TYPES


class TestWorldMap(TestCase):
Expand Down Expand Up @@ -70,7 +70,7 @@ def test_interactable_cells(self):

def test_artefact_cell(self):
cells = self._generate_cells()
cells[0]["interactable"] = {"type": "artefact"}
cells[0]["interactable"] = {"type": ARTEFACT_TYPES[0]}
map = WorldMapCreator.generate_world_map_from_cells_data(cells)
self.assertEqual(map.get_cell(Location(-1, -1)).has_artefact(), True)

Expand Down Expand Up @@ -136,3 +136,12 @@ def test_cannot_move_to_inhabited_cell(self):
cells[1]["avatar"] = self.AVATAR
map = WorldMapCreator.generate_world_map_from_cells_data(cells)
self.assertFalse(map.can_move_to(Location(-1, 0)))

def test_scan_nearby(self):
cells = self._generate_cells(5, 5)
cells[0]["avatar"] = self.AVATAR
cells[2]["obstacle"] = {"location": {"x": 0, "y": 0}}
cells[4]["interactable"] = {"type": ARTEFACT_TYPES[-1]}
map = WorldMapCreator.generate_world_map_from_cells_data(cells)
artefacts = map.scan_nearby(Location(-1, 0))
self.assertEqual(len(artefacts), 1)
2 changes: 1 addition & 1 deletion aimmo-game/simulation/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _reject(self):
self.avatar.add_event(FailedPickupEvent())
self.avatar.clear_action()
self.avatar.logs.append(
"Uh oh! Your backpack is full! 🎒 Please drop something."
"Uh oh! Your avatar was unable to pick up the artefact. Your backpack is full! 🎒 "
)


Expand Down
14 changes: 7 additions & 7 deletions game_frontend/src/pyodide/webWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import { expose } from 'threads/worker'
import ComputedTurnResult from './computedTurnResult'

function getAvatarStateFromGameState (gameState: any, playerAvatarID: number): object {
function getAvatarStateFromGameState(gameState: any, playerAvatarID: number): object {
return gameState.players.find(player => player.id === playerAvatarID)
}

async function initializePyodide () {
async function initializePyodide() {
self.languagePluginUrl = 'https://pyodide-cdn2.iodide.io/v0.15.0/full/'
importScripts('https://pyodide-cdn2.iodide.io/v0.15.0/full/pyodide.js')
await languagePluginLoader
Expand All @@ -20,7 +20,7 @@ micropip.install("${self.location.origin}/static/worker/aimmo_avatar_api-0.0.0-p
await pyodide.runPythonAsync(`
from simulation import direction
from simulation import location
from simulation.action import MoveAction, PickupAction, WaitAction
from simulation.action import MoveAction, PickupAction, WaitAction, MoveTowardsAction
from simulation.world_map import WorldMapCreator
from simulation.avatar_state import create_avatar_state
from io import StringIO
Expand All @@ -46,7 +46,7 @@ def capture_output(stdout=None, stderr=None):
`)
}

async function computeNextAction (gameState, playerAvatarID): Promise<ComputedTurnResult> {
async function computeNextAction(gameState, playerAvatarID): Promise<ComputedTurnResult> {
const avatarState = getAvatarStateFromGameState(gameState, playerAvatarID)
try {
return await pyodide.runPythonAsync(`
Expand All @@ -72,7 +72,7 @@ logs = stdout.getvalue() + stderr.getvalue()
}
}

export function simplifyErrorMessageInLog (log: string): string {
export function simplifyErrorMessageInLog(log: string): string {
const regexToFindNextTurnErrors = /.*line (\d+), in next_turn\n((?:.|\n)*)/
const matches = log.match(regexToFindNextTurnErrors)
if (matches?.length >= 2) {
Expand All @@ -85,7 +85,7 @@ export function simplifyErrorMessageInLog (log: string): string {
.join('\n')
}

export async function updateAvatarCode (
export async function updateAvatarCode(
userCode: string,
gameState: any,
playerAvatarID: number = 0
Expand Down Expand Up @@ -115,7 +115,7 @@ export async function updateAvatarCode (
}
}

async function setAvatarCodeToWaitActionOnError () {
async function setAvatarCodeToWaitActionOnError() {
await pyodide.runPythonAsync(
`def next_turn(world_map, avatar_state):
return WaitAction()`
Expand Down

0 comments on commit 1e37fb3

Please sign in to comment.