diff --git a/src/assurancetourix/strategix/strategix/action_objects/__init__.py b/src/assurancetourix/strategix/strategix/action_objects/__init__.py
new file mode 100644
index 00000000..27f80c3d
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/action_objects/__init__.py
@@ -0,0 +1,5 @@
+from .action import Action
+from .gobelet import Gobelet
+from .manche_air import MancheAir
+from .phare import Phare
+from .ecueil import Ecueil
diff --git a/src/assurancetourix/strategix/strategix/action_objects/action.py b/src/assurancetourix/strategix/strategix/action_objects/action.py
new file mode 100644
index 00000000..87d3b629
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/action_objects/action.py
@@ -0,0 +1,32 @@
+"""Base Action class for all types of action"""
+
+
+class Action:
+ def __init__(self, position=(0, 0), **kwargs):
+ self.position = position
+ self.rotation = kwargs.get("rotation", 0)
+ self.tags = kwargs.get("tags", {})
+
+ def get_initial_position(self, robot=None):
+ """Function called when the robot needs to get the position of its objective"""
+ return self.position
+
+ def get_initial_orientation(self, robot=None):
+ """Function called when the robot needs to get the orientation of its objective"""
+ return self.rotation
+
+ def preempt_action(self, robot=None, action_list=None):
+ """Function called when the action is preempted"""
+ pass
+
+ def finish_action(self, robot=None):
+ """Function called when the action is finished"""
+ pass
+
+ def release_action(self, robot=None):
+ """Function called when the action is released"""
+ pass
+
+ def start_actuator(self, robot=None):
+ """Function called when the robot reaches its objective and starts the actuator"""
+ pass
diff --git a/src/assurancetourix/strategix/strategix/action_objects/ecueil.py b/src/assurancetourix/strategix/strategix/action_objects/ecueil.py
new file mode 100644
index 00000000..19dd4d76
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/action_objects/ecueil.py
@@ -0,0 +1,25 @@
+from .action import Action
+
+
+class Ecueil(Action):
+ def __init__(self, position, **kwargs):
+ super().__init__(position, **kwargs)
+ self.gob_list = kwargs.get("gob_list", [])
+
+ def get_initial_position(self, robot):
+ return self.position
+
+ # def preempt_action(self):
+ # for gobelet_id in self.gob_list:
+ # gobelet = actions.get(gobelet_id)
+ # gobelet.preempt_action()
+
+ # def release_action(self):
+ # for gobelet_id in self.gob_list:
+ # gobelet = actions.get(gobelet_id)
+ # gobelet.release_action()
+
+ # def finish_action(self):
+ # for gobelet_id in self.gob_list:
+ # gobelet = actions.get(gobelet_id)
+ # gobelet.finish_action()
diff --git a/src/assurancetourix/strategix/strategix/action_objects/gobelet.py b/src/assurancetourix/strategix/strategix/action_objects/gobelet.py
new file mode 100644
index 00000000..67deed15
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/action_objects/gobelet.py
@@ -0,0 +1,70 @@
+from .action import Action
+import numpy as np
+from PyKDL import Vector, Rotation, Frame
+
+
+class Gobelet(Action):
+ def __init__(self, position, color, **kwargs):
+ super().__init__(position, **kwargs)
+ self.color = color
+ self.pump_id = None
+
+ def get_initial_orientation(self, robot):
+ theta = 180 - np.rad2deg(
+ np.arctan(
+ (self.position[1] - robot.position[1])
+ / (self.position[0] - robot.position[0])
+ )
+ )
+ return theta
+
+ def get_initial_position(self, robot):
+ if robot.simulation:
+ robot_to_gob = (0.04, 0.08)
+ else:
+ robot_to_gob = robot.actuators.PUMPS.get(self.pump_id).get("pos")
+ vector_robot_to_gob = Vector(robot_to_gob[0], robot_to_gob[1], 0)
+ # Find the angle between the robot's and the gobelet's position (theta)
+ theta = 180 - np.rad2deg(
+ np.arctan(
+ (self.position[1] - robot.position[1])
+ / (self.position[0] - robot.position[0])
+ )
+ )
+ # Frame between gobelet and center of robot
+ frame_robot_to_gob = Frame(Rotation.RotZ(theta), vector_robot_to_gob)
+ # Frame between gobelet and map
+ frame_map_to_gob = Frame(
+ Rotation.Identity(), Vector(self.position[0], self.position[1], 0)
+ )
+ # Frame between robot and map
+ frame_map_to_robot = frame_map_to_gob * frame_robot_to_gob.Inverse()
+ return (frame_map_to_robot.p.x(), frame_map_to_robot.p.y())
+
+ def preempt_action(self, robot, action_list):
+ if robot.simulation:
+ return
+ # Find the first available pump
+ for pump_id, pump_dict in robot.actuators.PUMPS.items():
+ if pump_dict.get("status") is None:
+ self.pump_id = pump_id
+ # Find the id of this Gobelet
+ for action_id, action_dict in action_list:
+ if action_dict == self:
+ pump_dict["status"] = action_id
+ robot.get_logger().info(
+ f"Pump {pump_id} preempted {action_id}."
+ )
+ return
+
+ def release_action(self, robot):
+ if robot.simulation:
+ return
+ robot.actuators.PUMPS.get(self.pump_id).pop("status")
+ self.pump_id = None
+
+ def finish_action(self, robot):
+ if robot.simulation:
+ return
+ robot.actuators.PUMPS.get(self.pump_id).pop("status")
+ self.pump_id = None
diff --git a/src/assurancetourix/strategix/strategix/action_objects/manche_air.py b/src/assurancetourix/strategix/strategix/action_objects/manche_air.py
new file mode 100644
index 00000000..0b6d7ab6
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/action_objects/manche_air.py
@@ -0,0 +1,26 @@
+from .action import Action
+
+
+class MancheAir(Action):
+ def __init__(self, position, **kwargs):
+ super().__init__(position, **kwargs)
+ self.rotation = -90
+ self.step = 0
+
+ def get_initial_position(self, robot):
+ if self.step == 0:
+ offset = -0.1 if self.tags["ONLY_SIDE"] == "blue" else 0.1
+ else:
+ offset = 0.1 if self.tags["ONLY_SIDE"] == "blue" else -0.1
+ position = (self.position[0] + offset, robot.width.value / 2 + 0.05)
+ return position
+
+ def start_actuator(self, robot):
+ if self.step == 0:
+ # Open robot arm - robot.openArm()
+ # Force new objective to be next position
+ pass
+ else:
+ # Close robot arm - robot.closeArm()
+ pass
+ self.step += 1
diff --git a/src/assurancetourix/strategix/strategix/action_objects/phare.py b/src/assurancetourix/strategix/strategix/action_objects/phare.py
new file mode 100644
index 00000000..82838f32
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/action_objects/phare.py
@@ -0,0 +1,17 @@
+from .action import Action
+
+
+class Phare(Action):
+ def __init__(self, position, **kwargs):
+ super().__init__(position, **kwargs)
+ self.rotation = 90
+
+ def get_initial_position(self, robot):
+ return (self.position[0], self.position[1] - robot.length.value / 2 - 0.1)
+
+ def start_actuator(self, robot):
+ response = robot.synchronous_call(
+ robot.trigger_deploy_pharaon_client,
+ robot.trigger_deploy_pharaon_request,
+ )
+ return response.success
diff --git a/src/assurancetourix/strategix/strategix/actions.py b/src/assurancetourix/strategix/strategix/actions.py
index 18498b5f..76ce38e2 100644
--- a/src/assurancetourix/strategix/strategix/actions.py
+++ b/src/assurancetourix/strategix/strategix/actions.py
@@ -1,92 +1,102 @@
#!/usr/bin/env python3
+from strategix.action_objects import Action, Phare, Ecueil, MancheAir, Gobelet
actions = {
- "STUPID_1": {},
- "STUPID_2": {},
- "STUPID_3": {},
- "MANCHE1": {"ONLY_SIDE": "blue"},
- "MANCHE2": {"ONLY_SIDE": "blue"},
- "MANCHE3": {"ONLY_SIDE": "yellow"},
- "MANCHE4": {"ONLY_SIDE": "yellow"},
- "PAVILLON": {"STATUS": "PREEMPTED"},
- "ECUEIL_1": {
- "ONLY_ROBOT": "obelix",
- "GOBS": ["GOB35", "GOB36", "GOB37", "GOB38", "GOB39"],
- },
- "ECUEIL_2": {
- "ONLY_ROBOT": "obelix",
- "GOBS": ["GOB40", "GOB41", "GOB42", "GOB43", "GOB44"],
- },
- "ECUEIL_BLEU": {
- "ONLY_ROBOT": "obelix",
- "ONLY_SIDE": "blue",
- "GOBS": ["GOB25", "GOB26", "GOB27", "GOB28", "GOB29"],
- },
- "ECUEIL_JAUNE": {
- "ONLY_ROBOT": "obelix",
- "ONLY_SIDE": "yellow",
- "GOBS": ["GOB30", "GOB31", "GOB32", "GOB33", "GOB34"],
- },
- "PHARE_BLEU": {"ONLY_SIDE": "blue"},
- "PHARE_JAUNE": {"ONLY_SIDE": "yellow"},
- "CHENAL_BLEU_VERT_1": {"ONLY_SIDE": "blue", "STATUS": "PREEMPTED"},
- # "CHENAL_BLEU_VERT_2": {"ONLY_SIDE": "blue"},
- "CHENAL_BLEU_ROUGE_1": {"ONLY_SIDE": "blue", "STATUS": "PREEMPTED"},
- # "CHENAL_BLEU_ROUGE_2": {"ONLY_SIDE": "blue"},
- "CHENAL_JAUNE_VERT_1": {"ONLY_SIDE": "yellow", "STATUS": "PREEMPTED"},
- # "CHENAL_JAUNE_VERT_2": {"ONLY_SIDE": "yellow"},
- "CHENAL_JAUNE_ROUGE_1": {"ONLY_SIDE": "yellow", "STATUS": "PREEMPTED"},
- # "CHENAL_JAUNE_ROUGE_2": {"ONLY_SIDE": "yellow"},
- # Red Cups
- "GOB2": {"COLOR": "RED"},
- "GOB3": {"COLOR": "RED"},
- "GOB5": {"COLOR": "RED"},
- "GOB7": {"COLOR": "RED"},
- "GOB9": {"COLOR": "RED"},
- "GOB11": {"COLOR": "RED"},
- "GOB13": {"COLOR": "RED"},
- "GOB15": {"COLOR": "RED"},
- "GOB17": {"COLOR": "RED"},
- "GOB19": {"COLOR": "RED"},
- "GOB22": {"COLOR": "RED"},
- "GOB23": {"COLOR": "RED"},
- # Cups in ECUEIL_BLEU:
- "GOB25": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
- "GOB27": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
- "GOB29": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
- # Cups in ECUEIL_JAUNE:
- "GOB31": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
- "GOB33": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
- # Cups in ECUEIL_1 & ECUEIL_2 following the Scenario 1:
- "GOB36": {"COLOR": "RED", "IN_ECUEIL": True},
- "GOB39": {"COLOR": "RED", "IN_ECUEIL": True},
- "GOB41": {"COLOR": "RED", "IN_ECUEIL": True},
- "GOB42": {"COLOR": "RED", "IN_ECUEIL": True},
- "GOB44": {"COLOR": "RED", "IN_ECUEIL": True},
- # Green Cups
- "GOB1": {"COLOR": "GREEN"},
- "GOB4": {"COLOR": "GREEN"},
- "GOB6": {"COLOR": "GREEN"},
- "GOB8": {"COLOR": "GREEN"},
- "GOB10": {"COLOR": "GREEN"},
- "GOB12": {"COLOR": "GREEN"},
- "GOB14": {"COLOR": "GREEN"},
- "GOB16": {"COLOR": "GREEN"},
- "GOB18": {"COLOR": "GREEN"},
- "GOB20": {"COLOR": "GREEN"},
- "GOB21": {"COLOR": "GREEN"},
- "GOB24": {"COLOR": "GREEN"},
- # Cups in ECUEIL_BLEU:
- "GOB26": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
- "GOB28": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
- # Cups in ECUEIL_JAUNE:
- "GOB30": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
- "GOB32": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
- "GOB34": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
- # Cups in ECUEIL_1 & ECUEIL_2 following the Scenario 1:
- "GOB35": {"COLOR": "GREEN", "IN_ECUEIL": True},
- "GOB37": {"COLOR": "GREEN", "IN_ECUEIL": True},
- "GOB38": {"COLOR": "GREEN", "IN_ECUEIL": True},
- "GOB40": {"COLOR": "GREEN", "IN_ECUEIL": True},
- "GOB43": {"COLOR": "GREEN", "IN_ECUEIL": True},
+ "PAVILLON": Action(tags={"STATUS": "PREEMPT"}),
+ "PHARE_BLEU": Phare(position=(0.26, 2), tags={"ONLY_SIDE": "blue"}),
+ "PHARE_JAUNE": Phare(position=(2.775, 2), tags={"ONLY_SIDE": "yellow"}),
+ "MANCHE1": MancheAir(position=(0.23, 0), tags={"ONLY_SIDE": "blue"}),
+ "MANCHE2": MancheAir(position=(0.635, 0), tags={"ONLY_SIDE": "blue"}),
+ "MANCHE3": MancheAir(position=(2.365, 0), tags={"ONLY_SIDE": "yellow"}),
+ "MANCHE4": MancheAir(position=(2.77, 0), tags={"ONLY_SIDE": "yellow"}),
+ "ECUEIL_1": Ecueil(
+ position=(0.85, 2),
+ rotation=90,
+ gob_list=["GOB35", "GOB36", "GOB37", "GOB38", "GOB39"],
+ tags={"ONLY_ROBOT": "obelix"},
+ ),
+ "ECUEIL_2": Ecueil(
+ position=(2.15, 2),
+ rotation=90,
+ gob_list=["GOB40", "GOB41", "GOB42", "GOB43", "GOB44"],
+ tags={"ONLY_ROBOT": "obelix"},
+ ),
+ "ECUEIL_BLEU": Ecueil(
+ position=(0, 0.4),
+ rotation=180,
+ gob_list=["GOB25", "GOB26", "GOB27", "GOB28", "GOB29"],
+ tags={"ONLY_ROBOT": "obelix", "ONLY_SIDE": "blue"},
+ ),
+ "ECUEIL_JAUNE": Ecueil(
+ position=(3, 0.4),
+ rotation=0,
+ gob_list=["GOB30", "GOB31", "GOB32", "GOB33", "GOB34"],
+ tags={"ONLY_ROBOT": "obelix", "ONLY_SIDE": "yellow"},
+ ),
+ "GOB1": Gobelet(position=(0.3, 0.8), color="GREEN"),
+ "GOB2": Gobelet(position=(0.3, 1.6), color="RED"),
+ "GOB3": Gobelet(position=(0.45, 0.92), color="RED"),
+ "GOB4": Gobelet(position=(0.45, 1.49), color="GREEN"),
+ "GOB5": Gobelet(position=(0.67, 1.9), color="RED"),
+ "GOB6": Gobelet(position=(0.95, 1.6), color="GREEN"),
+ "GOB7": Gobelet(position=(1.005, 0.045), color="RED"),
+ "GOB8": Gobelet(position=(1.065, 0.35), color="GREEN"),
+ "GOB9": Gobelet(position=(1.1, 1.2), color="RED"),
+ "GOB10": Gobelet(position=(1.27, 0.8), color="GREEN"),
+ "GOB11": Gobelet(position=(1.335, 0.35), color="RED"),
+ "GOB12": Gobelet(position=(1.395, 0.045), color="GREEN"),
+ "GOB13": Gobelet(position=(1.605, 0.045), color="RED"),
+ "GOB14": Gobelet(position=(1.665, 0.35), color="GREEN"),
+ "GOB15": Gobelet(position=(1.73, 0.8), color="RED"),
+ "GOB16": Gobelet(position=(1.9, 1.2), color="GREEN"),
+ "GOB17": Gobelet(position=(1.935, 0.35), color="RED"),
+ "GOB18": Gobelet(position=(1.995, 0.045), color="GREEN"),
+ "GOB19": Gobelet(position=(2.05, 1.6), color="RED"),
+ "GOB20": Gobelet(position=(2.33, 1.9), color="GREEN"),
+ "GOB21": Gobelet(position=(2.55, 0.92), color="GREEN"),
+ "GOB22": Gobelet(position=(2.55, 1.49), color="RED"),
+ "GOB23": Gobelet(position=(2.7, 0.8), color="RED"),
+ "GOB24": Gobelet(position=(2.7, 1.6), color="GREEN"),
+ # "GOB25": Gobelet(
+ # position=(0, 1), color="RED", tags={"IN_ECUEIL": True, "ONLY_SIDE": "blue"}
+ # ),
}
+
+# actions = {
+# "CHENAL_BLEU_VERT_1": {"ONLY_SIDE": "blue", "STATUS": "PREEMPTED"},
+# # "CHENAL_BLEU_VERT_2": {"ONLY_SIDE": "blue"},
+# "CHENAL_BLEU_ROUGE_1": {"ONLY_SIDE": "blue", "STATUS": "PREEMPTED"},
+# # "CHENAL_BLEU_ROUGE_2": {"ONLY_SIDE": "blue"},
+# "CHENAL_JAUNE_VERT_1": {"ONLY_SIDE": "yellow", "STATUS": "PREEMPTED"},
+# # "CHENAL_JAUNE_VERT_2": {"ONLY_SIDE": "yellow"},
+# "CHENAL_JAUNE_ROUGE_1": {"ONLY_SIDE": "yellow", "STATUS": "PREEMPTED"},
+# # "CHENAL_JAUNE_ROUGE_2": {"ONLY_SIDE": "yellow"},
+# # Red Cups
+# # Cups in ECUEIL_BLEU:
+# "GOB25": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
+# "GOB27": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
+# "GOB29": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
+# # Cups in ECUEIL_JAUNE:
+# "GOB31": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
+# "GOB33": {"COLOR": "RED", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
+# # Cups in ECUEIL_1 & ECUEIL_2 following the Scenario 1:
+# "GOB36": {"COLOR": "RED", "IN_ECUEIL": True},
+# "GOB39": {"COLOR": "RED", "IN_ECUEIL": True},
+# "GOB41": {"COLOR": "RED", "IN_ECUEIL": True},
+# "GOB42": {"COLOR": "RED", "IN_ECUEIL": True},
+# "GOB44": {"COLOR": "RED", "IN_ECUEIL": True},
+# # Green Cups
+# # Cups in ECUEIL_BLEU:
+# "GOB26": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
+# "GOB28": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "blue"},
+# # Cups in ECUEIL_JAUNE:
+# "GOB30": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
+# "GOB32": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
+# "GOB34": {"COLOR": "GREEN", "IN_ECUEIL": True, "ONLY_SIDE": "yellow"},
+# # Cups in ECUEIL_1 & ECUEIL_2 following the Scenario 1:
+# "GOB35": {"COLOR": "GREEN", "IN_ECUEIL": True},
+# "GOB37": {"COLOR": "GREEN", "IN_ECUEIL": True},
+# "GOB38": {"COLOR": "GREEN", "IN_ECUEIL": True},
+# "GOB40": {"COLOR": "GREEN", "IN_ECUEIL": True},
+# "GOB43": {"COLOR": "GREEN", "IN_ECUEIL": True},
+# }
diff --git a/src/assurancetourix/strategix/strategix/score.py b/src/assurancetourix/strategix/strategix/score.py
deleted file mode 100644
index c9d44510..00000000
--- a/src/assurancetourix/strategix/strategix/score.py
+++ /dev/null
@@ -1,75 +0,0 @@
-from strategix.actions import actions
-
-
-class Score:
- def __init__(self):
- pass
- # self.bonPortGros = BonPort('BonPortGros')
- # self.bonPortPetit = BonPort('BonPortPetit')
-
- def get_score(self):
- score = 0
- num_manche_air = 0
- num_red_cups = 0
- num_green_cups = 0
- phare_raised = False
- pavillon_raised = False
- for action in actions:
- if actions[action].get("STATUS") == "FINISHED":
- if "MANCHE" in action:
- num_manche_air += 1
- if "GOB" in action:
- if actions[action].get("COLOR") == "RED":
- num_red_cups += 1
- else:
- num_green_cups += 1
- if "PHARE" in action:
- phare_raised = True
- if "PAVILLON" in action:
- pavillon_raised = True
- # Bon Port
- # if self.bonPortGros.pos == self.bonPortPetit.pos == 'Good':
- # self.score += 10
- # elif self.bonPortGros.pos == self.bonPortPetit.pos == 'Wrong':
- # self.score += 5
- # elif self.bonPortGros.pos == 'Good' and self.bonPortPetit.pos == 'Out':
- # self.score += 5
- # elif self.bonPortGros.pos == 'Out' and self.bonPortPetit.pos == 'Good':
- # self.score += 5
- # else:
- # self.score += 0
- # Pavillon
- score += 15 if num_manche_air == 2 else 5 if num_manche_air == 1 else 0
- pair = 2 * num_red_cups if num_red_cups < num_green_cups else 2 * num_green_cups
- score += 2 * (num_red_cups + num_green_cups) + pair
- score += 15 if phare_raised else 2
- score += 10 if pavillon_raised else 0
- return score
-
- def preempt(self, action, sender):
- actions[action]["STATUS"] = "PREEMPTED"
- actions[action]["EXECUTER"] = sender
- if "ECUEIL" in action:
- for gob in actions[action]["GOBS"]:
- actions[action]["STATUS"] = "PREEMPTED"
- actions[gob]["EXECUTER"] = sender
- return True
-
- def release(self, action, sender):
- actions[action]["STATUS"] = "RELEASED"
- actions[action]["EXECUTER"] = None
- if "ECUEIL" in action:
- for gob in actions[action]["GOBS"]:
- actions[action]["STATUS"] = "RELEASED"
- actions[gob]["EXECUTER"] = None
- # Don't add failing action again
- return True
-
- def finish(self, action, sender):
- actions[action]["STATUS"] = "FINISHED"
- actions[action]["EXECUTER"] = sender
- if "ECUEIL" in action:
- for gob in actions[action]["GOBS"]:
- actions[action]["STATUS"] = "FINISHED"
- actions[gob]["EXECUTER"] = sender
- return True
diff --git a/src/assurancetourix/strategix/strategix/strategix.py b/src/assurancetourix/strategix/strategix/strategix.py
index f0214bbe..6aed6039 100755
--- a/src/assurancetourix/strategix/strategix/strategix.py
+++ b/src/assurancetourix/strategix/strategix/strategix.py
@@ -11,8 +11,8 @@
from rcl_interfaces.msg import SetParametersResult
from rclpy.node import Node
from strategix.actions import actions
+from strategix.action_objects import Phare, MancheAir, Gobelet
from strategix.exceptions import MatchStartedException
-from strategix.score import Score
from strategix_msgs.srv import ChangeActionStatus, GetAvailableActions
@@ -21,7 +21,6 @@ def __init__(self):
super().__init__("strategix_action_server")
self.side = self.declare_parameter("side", "blue")
self.add_on_set_parameters_callback(self._on_set_parameters)
- self.score = Score()
self.todo_srv = self.create_service(
GetAvailableActions, "/strategix/available", self.available_callback
)
@@ -52,19 +51,19 @@ def available_callback(self, request, response):
"""Callback function when a robot needs the list of available actions"""
self.get_logger().info(f"GET {request.sender}")
available = []
- for action in actions:
- if actions[action].get("STATUS") is None and not actions[action].get(
+ for action_id, action_object in actions.items():
+ if action_object.tags.get("STATUS") is None and not action_object.tags.get(
"IN_ECUEIL"
):
if (
- actions[action].get("ONLY_SIDE") is None
- or actions[action].get("ONLY_SIDE") == self.side.value
+ action_object.tags.get("ONLY_SIDE") is None
+ or action_object.tags.get("ONLY_SIDE") == self.side.value
):
if (
- actions[action].get("ONLY_ROBOT") is None
- or actions[action].get("ONLY_ROBOT") == request.sender
+ action_object.tags.get("ONLY_ROBOT") is None
+ or action_object.tags.get("ONLY_ROBOT") == request.sender
):
- available.append(action)
+ available.append(action_id)
response.available = available
self.get_logger().info(f"AVAILABLE: {response.available}")
return response
@@ -76,11 +75,11 @@ def action_callback(self, request, response):
f"{request.sender} {request.request} {request.action}"
)
if request.request == "PREEMPT":
- response.success = self.score.preempt(request.action, request.sender)
+ response.success = self.preempt(request.action, request.sender)
elif request.request == "DROP":
- response.success = self.score.release(request.action, request.sender)
+ response.success = self.release(request.action, request.sender)
elif request.request == "CONFIRM":
- response.success = self.score.finish(request.action, request.sender)
+ response.success = self.finish(request.action, request.sender)
# Detach update_score coroutine
Thread(target=self.update_score).start()
else:
@@ -94,15 +93,82 @@ def action_callback(self, request, response):
def update_score(self):
"""Update global score."""
- score = self.score.get_score()
+ score = self.get_score()
lcd_msg = Lcd()
lcd_msg.line = 1
lcd_msg.text = f"Score: {score}"
+ self.get_logger().info(f"Score: {score}")
score_msg = UInt8()
score_msg.data = score
self.score_publisher.publish(score_msg)
self.lcd_driver.publish(lcd_msg)
+ def get_score(self):
+ score = 0
+ num_manche_air = 0
+ num_red_cups = 0
+ num_green_cups = 0
+ phare_raised = False
+ pavillon_raised = False
+ for action_id, action_object in actions.items():
+ if action_object.tags.get("STATUS") == "FINISHED":
+ if type(action_object) is MancheAir:
+ num_manche_air += 1
+ elif type(action_object) is Gobelet:
+ if action_object.color == "RED":
+ num_red_cups += 1
+ else:
+ num_green_cups += 1
+ elif type(action_object) is Phare:
+ phare_raised = True
+ elif action_id == "PAVILLON":
+ pavillon_raised = True
+ # Bon Port
+ # if self.bonPortGros.pos == self.bonPortPetit.pos == 'Good':
+ # self.score += 10
+ # elif self.bonPortGros.pos == self.bonPortPetit.pos == 'Wrong':
+ # self.score += 5
+ # elif self.bonPortGros.pos == 'Good' and self.bonPortPetit.pos == 'Out':
+ # self.score += 5
+ # elif self.bonPortGros.pos == 'Out' and self.bonPortPetit.pos == 'Good':
+ # self.score += 5
+ # else:
+ # self.score += 0
+ score += 15 if num_manche_air == 2 else 5 if num_manche_air == 1 else 0
+ pair = 2 * num_red_cups if num_red_cups < num_green_cups else 2 * num_green_cups
+ score += 2 * (num_red_cups + num_green_cups) + pair
+ score += 15 if phare_raised else 2
+ score += 10 if pavillon_raised else 0
+ return score
+
+ def preempt(self, action, sender):
+ actions.get(action).tags["STATUS"] = "PREEMPT"
+ actions.get(action).tags["EXECUTER"] = sender
+ # if "ECUEIL" in action:
+ # for gob in actions[action]["GOBS"]:
+ # actions[action]["STATUS"] = "PREEMPTED"
+ # actions[gob]["EXECUTER"] = sender
+ return True
+
+ def release(self, action, sender):
+ actions.get(action).tags["STATUS"] = "RELEASE"
+ actions.get(action).tags["EXECUTER"] = sender
+ # if "ECUEIL" in action:
+ # for gob in actions[action]["GOBS"]:
+ # actions[action]["STATUS"] = "RELEASED"
+ # actions[gob]["EXECUTER"] = None
+ # Don't add failing action again
+ return True
+
+ def finish(self, action, sender):
+ actions.get(action).tags["STATUS"] = "FINISH"
+ actions.get(action).tags["EXECUTER"] = sender
+ # if "ECUEIL" in action:
+ # for gob in actions[action]["GOBS"]:
+ # actions[action]["STATUS"] = "FINISHED"
+ # actions[gob]["EXECUTER"] = sender
+ return True
+
def main(args=None):
rclpy.init(args=args)
diff --git a/src/assurancetourix/strategix/strategix/strategy_modes.py b/src/assurancetourix/strategix/strategix/strategy_modes.py
new file mode 100644
index 00000000..f8e359fc
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/strategy_modes.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python3
+
+from strategix.action_objects import Phare, Ecueil
+
+# Strategy points are set using (time, coefficient) coordinates
+strategies = {
+ "NORMAL": {
+ "ECUEIL_COULEUR": [(0, 0), (25, 0), (50, 50), (100, 50)],
+ "ECUEIL_GENERAL": [(0, 50), (100, 50)],
+ "PHARE": [(0, 0), (25, 0), (75, 100), (100, 100)],
+ "MANCHE": [(0, 0), (100, 75)],
+ }
+}
+
+
+def get_time_coeff(time, action, strategy):
+ if type(action) is Phare:
+ action = "PHARE"
+ elif type(action) is Ecueil:
+ if action.tags.get("ONLY_SIDE") is None:
+ action = "ECUEIL_GENERAL"
+ else:
+ action = "ECUEIL_COULEUR"
+ action_time_coeff_list = strategies.get(strategy).get(action)
+ if action_time_coeff_list is None:
+ return 0
+ for i in range(len(action_time_coeff_list)):
+ prev_time_coeff = action_time_coeff_list[i - 1]
+ time_coeff = action_time_coeff_list[i]
+ if time < time_coeff[0]:
+ # Calculate a score value between 0 and 100 depending on the time value using linear extrapolation
+ score = prev_time_coeff[1] + (time - prev_time_coeff[0]) * (
+ time_coeff[1] - prev_time_coeff[1]
+ ) / (time_coeff[0] - prev_time_coeff[0])
+ return score
+ return action_time_coeff_list[i][-1]
diff --git a/src/modules/cetautomatix/cetautomatix/utils/__init__.py b/src/assurancetourix/strategix/strategix/utils/__init__.py
similarity index 100%
rename from src/modules/cetautomatix/cetautomatix/utils/__init__.py
rename to src/assurancetourix/strategix/strategix/utils/__init__.py
diff --git a/src/assurancetourix/strategix/strategix/utils/plot_actions_map.py b/src/assurancetourix/strategix/strategix/utils/plot_actions_map.py
new file mode 100644
index 00000000..d20025f7
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/utils/plot_actions_map.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python3
+
+import matplotlib.pyplot as plt
+from strategix.action_objects import Ecueil, MancheAir, Phare, Gobelet
+from strategix.actions import actions
+
+plt.axes()
+plt.xlabel("X")
+plt.ylabel("Y")
+border = plt.Rectangle((0, 0), 3, 2, ec="black", fc="w", lw=5)
+plt.gca().add_patch(border)
+for action in actions.values():
+ if type(action) is Ecueil:
+ if action.tags.get("ONLY_SIDE") == "blue":
+ line = plt.Line2D(
+ (action.position[0], action.position[0]),
+ (action.position[1] - 0.2095, action.position[1] + 0.2095),
+ lw=5,
+ color="b",
+ )
+ plt.gca().add_line(line)
+ elif action.tags.get("ONLY_SIDE") == "yellow":
+ line = plt.Line2D(
+ (action.position[0], action.position[0]),
+ (action.position[1] - 0.2095, action.position[1] + 0.2095),
+ lw=5,
+ color="y",
+ )
+ plt.gca().add_line(line)
+ else:
+ line = plt.Line2D(
+ (action.position[0] - 0.2095, action.position[0] + 0.2095),
+ (action.position[1], action.position[1]),
+ lw=5,
+ color="grey",
+ )
+ plt.gca().add_line(line)
+ elif type(action) is MancheAir:
+ line = plt.Line2D(
+ (action.position[0] - 0.05, action.position[0] + 0.05),
+ (action.position[1], action.position[1]),
+ lw=5,
+ color="grey",
+ )
+ plt.gca().add_line(line)
+ elif type(action) is Phare:
+ if action.tags.get("ONLY_SIDE") == "blue":
+ line = plt.Line2D(
+ (action.position[0] - 0.225, action.position[0] + 0.225),
+ (action.position[1], action.position[1]),
+ lw=5,
+ color="b",
+ )
+ plt.gca().add_line(line)
+ else:
+ line = plt.Line2D(
+ (action.position[0] - 0.225, action.position[0] + 0.225),
+ (action.position[1], action.position[1]),
+ lw=5,
+ color="y",
+ )
+ plt.gca().add_line(line)
+ elif type(action) is Gobelet:
+ if action.color == "GREEN":
+ circle = plt.Circle(action.position, radius=0.03, fc="g")
+ plt.gca().add_patch(circle)
+ else:
+ circle = plt.Circle(action.position, radius=0.03, fc="r")
+ plt.gca().add_patch(circle)
+ # elif "ZONES" in key:
+ # for key2, value2 in value.items():
+ # if "CHENAL" in key2:
+ # if "ROUGE" in key2:
+ # chenal = plt.Rectangle(
+ # (value2[0], value2[1]),
+ # value2[2] - value2[0],
+ # value2[3] - value2[1],
+ # fc="r",
+ # )
+ # plt.gca().add_patch(chenal)
+ # else:
+ # chenal = plt.Rectangle(
+ # (value2[0], value2[1]),
+ # value2[2] - value2[0],
+ # value2[3] - value2[1],
+ # fc="g",
+ # )
+ # plt.gca().add_patch(chenal)
+ # elif "PORT" in key2:
+ # if "BLEU" in key2:
+ # port = plt.Rectangle(
+ # (value2[0], value2[1]),
+ # value2[2] - value2[0],
+ # value2[3] - value2[1],
+ # ec="b",
+ # fc="w",
+ # )
+ # plt.gca().add_patch(port)
+ # else:
+ # port = plt.Rectangle(
+ # (value2[0], value2[1]),
+ # value2[2] - value2[0],
+ # value2[3] - value2[1],
+ # ec="y",
+ # fc="w",
+ # )
+ # plt.gca().add_patch(port)
+
+plt.axis("scaled")
+plt.show()
diff --git a/src/assurancetourix/strategix/strategix/utils/plot_strategy_modes.py b/src/assurancetourix/strategix/strategix/utils/plot_strategy_modes.py
new file mode 100644
index 00000000..351a4afa
--- /dev/null
+++ b/src/assurancetourix/strategix/strategix/utils/plot_strategy_modes.py
@@ -0,0 +1,21 @@
+#!/usr/bin/env python3
+
+import matplotlib.pyplot as plt
+from strategix.strategy_modes import strategies, get_time_coeff
+
+strategy_mode = "NORMAL"
+
+plt.axes()
+plt.xlabel("Time")
+plt.ylabel("Coefficient")
+lw = len(strategies.get(strategy_mode))
+for action, time_coeff in strategies.get(strategy_mode).items():
+ x, y = [], []
+ for i in range(101):
+ x.append(i)
+ y.append(get_time_coeff(i, action, strategy_mode))
+ plt.plot(x, y, lw=lw, label=action)
+ lw -= 1
+plt.axis("scaled")
+plt.legend()
+plt.show()
diff --git a/src/modules/cetautomatix/cetautomatix/custom_behaviours.py b/src/modules/cetautomatix/cetautomatix/custom_behaviours.py
index 29ef465c..be2dfec2 100644
--- a/src/modules/cetautomatix/cetautomatix/custom_behaviours.py
+++ b/src/modules/cetautomatix/cetautomatix/custom_behaviours.py
@@ -1,6 +1,5 @@
#!/usr/bin/env python3
-
import py_trees
try:
@@ -16,16 +15,15 @@ def __init__(self, name, robot):
self.robot = robot
def update(self):
- if self.robot._current_action is None:
- available = self.robot.fetch_available_actions()
- best_action = self.robot.compute_best_action(available)
- return (
- py_trees.common.Status.SUCCESS
- if self.robot.preempt_action(best_action)
- else py_trees.common.Status.FAILURE
- )
- else:
- return py_trees.common.Status.SUCCESS
+ if not self.robot.action_list: # No actions left to realize
+ available_actions = self.robot.fetch_available_actions()
+ best_action = self.robot.compute_best_action(available_actions)
+ self.robot.action_list.append(best_action)
+ return (
+ py_trees.common.Status.SUCCESS
+ if self.robot.preempt_action(self.robot.action_list.pop(0))
+ else py_trees.common.Status.FAILURE
+ )
class ConfirmAction(py_trees.behaviour.Behaviour):
@@ -78,7 +76,6 @@ def update(self):
if not self.visited:
self.visited = True
# Setup Timers
- self.robot._start_robot_callback(None, None)
for action, duration in self.actions.items():
action.time = self.robot.get_clock().now().nanoseconds * 1e-9 + duration
return py_trees.common.Status.SUCCESS
@@ -105,7 +102,7 @@ class EndOfGameAction(py_trees.behaviour.Behaviour):
def __init__(self, name, robot):
super().__init__(name=name)
self.robot = robot
- self.time = 1e5
+ self.time = 100000
self.visited = False
def update(self):
@@ -133,8 +130,8 @@ def setup(self, node):
def update(self):
"""Guard condition for goupille."""
if GPIO is None or self.robot.robot == "asterix":
- if self.robot.triggered():
+ if self.robot.triggered:
return py_trees.common.Status.SUCCESS
- elif not GPIO.input(17) or self.robot.triggered():
+ elif not GPIO.input(17) or self.robot.triggered:
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.RUNNING
diff --git a/src/modules/cetautomatix/cetautomatix/old_robot.py b/src/modules/cetautomatix/cetautomatix/old_robot.py
new file mode 100644
index 00000000..1aea4f6c
--- /dev/null
+++ b/src/modules/cetautomatix/cetautomatix/old_robot.py
@@ -0,0 +1,503 @@
+#!/usr/bin/env python3
+
+
+import math
+from importlib import import_module
+from signal import SIGINT
+from subprocess import call
+from platform import machine
+
+import numpy as np
+import psutil
+
+import py_trees
+import rclpy
+from actuators.actuators import NC, NO
+from actuators_srvs.srv import Slider
+from cetautomatix import tf2_geometry_msgs
+from cetautomatix.magic_points import elements
+from cetautomatix.selftest import Selftest
+from cetautomatix.strategy_modes import get_time_coeff
+from lcd_msgs.msg import Lcd
+from nav2_msgs.action._navigate_to_pose import NavigateToPose_Goal
+from nav_msgs.msg import Odometry
+from rclpy.node import Node
+from rclpy.time import Duration, Time
+from std_srvs.srv import SetBool, Trigger
+from strategix.actions import actions
+from strategix_msgs.srv import ChangeActionStatus, GetAvailableActions
+from tf2_ros import LookupException, ConnectivityException
+from tf2_ros.buffer import Buffer
+from tf2_ros.transform_listener import TransformListener
+
+
+class Robot(Node):
+ def __init__(self):
+ super().__init__(node_name="cetautomatix")
+ # Detect simulation mode
+ self.simulation = True if machine() != "aarch64" else False
+ self.i = 0
+ self.stupid_actions = ["STUPID_1", "STUPID_2", "STUPID_3"]
+ self._triggered = False
+ self._position = None
+ self._orientation = None
+ self._start_time = None
+ self._current_action = None
+ self.robot = self.get_namespace().strip("/")
+ # parameters interfaces
+ self.side = self.declare_parameter("side", "blue")
+ self.declare_parameter("length")
+ self.declare_parameter("width")
+ self.declare_parameter("strategy_mode")
+ self.length_param = self.get_parameter("length")
+ self.width_param = self.get_parameter("width")
+ self.strategy_mode_param = self.get_parameter("strategy_mode")
+ # Bind actuators
+ self.actuators = import_module(f"actuators.{self.robot}").actuators
+ # Do selftest
+ self.selftest = Selftest(self)
+ # Prechill engines
+ self.actuators.setFansEnabled(True)
+ # Stop ros service
+ self._stop_ros_service = self.create_service(
+ Trigger, "stop", self._stop_robot_callback
+ )
+ # strategix client interfaces
+ self._get_available_request = GetAvailableActions.Request()
+ self._get_available_request.sender = self.robot
+ self._get_available_client = self.create_client(
+ GetAvailableActions, "/strategix/available"
+ )
+ self._change_action_status_request = ChangeActionStatus.Request()
+ self._change_action_status_request.sender = self.robot
+ self._change_action_status_client = self.create_client(
+ ChangeActionStatus, "/strategix/action"
+ )
+ # Phararon delploy client interfaces
+ self._get_trigger_deploy_pharaon_request = Trigger.Request()
+ self._get_trigger_deploy_pharaon_client = self.create_client(
+ Trigger, "/pharaon/deploy"
+ )
+ # Slider driver
+ self._set_slider_position_request = Slider.Request()
+ self._set_slider_position_client = self.create_client(Slider, "slider_position")
+ # Odometry subscriber
+ self._tf_buffer = Buffer()
+ self._tf_listener = TransformListener(self._tf_buffer, self)
+ self._odom_pose_stamped = tf2_geometry_msgs.PoseStamped()
+ self._odom_callback(self._odom_pose_stamped)
+ self._odom_sub = self.create_subscription(
+ Odometry, "odom", self._odom_callback, 1
+ )
+ # Py-Trees blackboard to send NavigateToPose actions
+ self.blackboard = py_trees.blackboard.Client(name="NavigateToPose")
+ self.blackboard.register_key(key="goal", access=py_trees.common.Access.WRITE)
+ # LCD driver direct access
+ self._lcd_driver_pub = self.create_publisher(Lcd, "/obelix/lcd", 1)
+ # Wait for strategix as this can block the behavior Tree
+ while not self._get_available_client.wait_for_service(timeout_sec=5):
+ self.get_logger().warn(
+ "Failed to contact strategix services ! Has it been started ?"
+ )
+ # Enable stepper drivers
+ if not self.simulation:
+ self._get_enable_drivers_request = SetBool.Request()
+ self._get_enable_drivers_request.data = True
+ self._get_enable_drivers_client = self.create_client(
+ SetBool, "enable_drivers"
+ )
+ self._synchronous_call(
+ self._get_enable_drivers_client, self._get_enable_drivers_request
+ )
+ # Robot trigger service
+ self._trigger_start_robot_server = self.create_service(
+ Trigger, "start", self._start_robot_callback
+ )
+ if self.robot == "obelix":
+ self._trigger_start_asterix_request = Trigger.Request()
+ self._trigger_start_asterix_client = self.create_client(
+ Trigger, "/asterix/start"
+ )
+
+ # Reached initialized state
+ self.get_logger().info("Cetautomatix ROS node has been started")
+
+ def _synchronous_call(self, client, request):
+ """Call service synchronously."""
+ future = client.call_async(request)
+ rclpy.spin_until_future_complete(self, future)
+ try:
+ response = future.result()
+ except BaseException:
+ response = None
+ return response
+
+ def set_slider_position(self, position: int):
+ """Set slider position with position in range 0 to 255."""
+ self._set_slider_position_request.position = position
+ return self._synchronous_call(
+ self._set_slider_position_client, self._set_slider_position_request
+ )
+
+ def fetch_available_actions(self):
+ """Fetch available actions for BT."""
+ response = self._synchronous_call(
+ self._get_available_client, self._get_available_request
+ )
+ return response.available if response is not None else []
+
+ def preempt_action(self, action):
+ """Preempt an action for the BT."""
+ if action is None:
+ return False
+ self._change_action_status_request.action = str(action)
+ self._change_action_status_request.request = "PREEMPT"
+ response = self._synchronous_call(
+ self._change_action_status_client, self._change_action_status_request
+ )
+ if response is None:
+ return False
+ self._current_action = action if response.success else None
+ if self._current_action is not None:
+ self.get_goal_pose()
+ return response.success
+
+ def drop_current_action(self):
+ """Drop an action for the BT."""
+ if self._current_action is None:
+ return False
+ self._change_action_status_request.action = self._current_action
+ self._change_action_status_request.request = "DROP"
+ response = self._synchronous_call(
+ self._change_action_status_client, self._change_action_status_request
+ )
+ if response is None:
+ return False
+ self._current_action = None
+ return response.success
+
+ def confirm_current_action(self):
+ """Confirm an action for the BT."""
+ if self._current_action is None:
+ return False
+ self._change_action_status_request.action = self._current_action
+ self._change_action_status_request.request = "CONFIRM"
+ response = self._synchronous_call(
+ self._change_action_status_client, self._change_action_status_request
+ )
+ if response is None:
+ return False
+ self._current_action = None
+ return response.success
+
+ def start_actuator_action(self):
+ self.get_logger().info(f"START ACTUATOR {self._current_action}")
+ if "PHARE" in self._current_action:
+ response = self._synchronous_call(
+ self._get_trigger_deploy_pharaon_client,
+ self._get_trigger_deploy_pharaon_request,
+ )
+ return response.success
+ elif "GOB" in self._current_action:
+ self.actuators.PUMPS[self.current_pump]["STATUS"] = self._current_action
+ servo = self.actuators.DYNAMIXELS[self.current_pump]
+ self.actuators.arbotix.setPosition(self.current_pump, servo.get("up"))
+ return True
+ elif "ECUEIL" in self._current_action:
+ pump_list = []
+ i = 0
+ for pump_id, pump_dict in self.actuators.PUMPS.items():
+ if pump_dict.get("type") == NO:
+ pump_list.append(pump_id)
+ self.actuators.PUMPS[pump_id]["STATUS"] = actions[
+ self._current_action
+ ].get("GOBS")[i]
+ i += 1
+ self.actuators.setPumpsEnabled(True, pump_list)
+ # TODO: Fermer Herse
+ self.set_slider_position(255)
+ return True
+ elif "CHENAL" in self._current_action:
+ color = "GREEN" if "VERT" in self._current_action else "RED"
+ pump_list = []
+ for pump_id, pump_dict in self.actuators.PUMPS.items():
+ if (
+ pump_dict.get("type") == self.drop
+ and pump_dict.get("STATUS") == color
+ ):
+ pump_list.append(pump_id)
+ if self.drop == NC:
+ for pump in pump_list:
+ servo = self.actuators.DYNAMIXELS[pump]
+ self.actuators.arbotix.setPosition(pump, servo.get("down"))
+ else:
+ self.set_slider_position(100)
+ # TODO: Ouvrir herse
+ self.actuators.setPumpsEnabled(False, pump_list)
+ for pump in pump_list:
+ del self.actuators.PUMPS[pump]["STATUS"]
+ return True
+ else:
+ return True
+
+ def trigger_pavillons(self):
+ self.get_logger().info("Triggered pavillons")
+ self.actuators.raiseTheFlag()
+ self._change_action_status_request.action = "PAVILLON"
+ self._change_action_status_request.request = "CONFIRM"
+ response = self._synchronous_call(
+ self._change_action_status_client, self._change_action_status_request
+ )
+ if response is None:
+ return False
+ return response.success
+
+ def _start_robot_callback(self, req, resp):
+ """Start robot."""
+ self._triggered = True
+ self.get_logger().info("Triggered robot starter")
+ if self.robot == "obelix":
+ # TODO : Fix sync call
+ # Thread(target=self._synchronous_call, args=[self._trigger_start_asterix_client, self._trigger_start_asterix_request]).start()
+ self.get_logger().info("Obelix triggered Asterix")
+ self._start_time = self.get_clock().now().nanoseconds * 1e-9
+ lcd_msg = Lcd()
+ lcd_msg.line = 0
+ lcd_msg.text = f"{self.robot.capitalize()} running".ljust(16)
+ self._lcd_driver_pub.publish(lcd_msg)
+ lcd_msg.line = 1
+ lcd_msg.text = "Score: 0".ljust(16)
+ self._lcd_driver_pub.publish(lcd_msg)
+ if hasattr(resp, "success"):
+ resp.success = True
+ return resp
+
+ def _stop_robot_callback(self, req, resp):
+ """Stop robot / ROS."""
+ self.stop_ros(shutdown=False)
+ resp.sucess = True
+ return resp
+
+ def triggered(self):
+ """Triggered var."""
+ return self._triggered
+
+ def _odom_callback(self, msg):
+ try:
+ # Get latest transform
+ tf = self._tf_buffer.lookup_transform(
+ "map", "base_link", Time(), timeout=Duration(seconds=1.0)
+ )
+ self._odom_pose_stamped.header = msg.header
+ self._odom_pose_stamped.pose = msg.pose.pose
+ tf_pose = tf2_geometry_msgs.do_transform_pose(self._odom_pose_stamped, tf)
+ except LookupException:
+ self.get_logger().warn("Failed to lookup_transform from map to odom")
+ return
+ except ConnectivityException:
+ self.get_logger().warn(
+ "ConnectivityException, 'map' and 'base_link' are not part of the same tree"
+ )
+ return
+ self._position = (tf_pose.pose.position.x, tf_pose.pose.position.y)
+ self._orientation = self.quaternion_to_euler(
+ tf_pose.pose.orientation.x,
+ tf_pose.pose.orientation.y,
+ tf_pose.pose.orientation.z,
+ tf_pose.pose.orientation.w,
+ )
+
+ def euler_to_quaternion(self, yaw, pitch=0, roll=0):
+ """Conversion between euler angles and quaternions."""
+ qx = np.sin(roll / 2) * np.cos(pitch / 2) * np.cos(yaw / 2) - np.cos(
+ roll / 2
+ ) * np.sin(pitch / 2) * np.sin(yaw / 2)
+ qy = np.cos(roll / 2) * np.sin(pitch / 2) * np.cos(yaw / 2) + np.sin(
+ roll / 2
+ ) * np.cos(pitch / 2) * np.sin(yaw / 2)
+ qz = np.cos(roll / 2) * np.cos(pitch / 2) * np.sin(yaw / 2) - np.sin(
+ roll / 2
+ ) * np.sin(pitch / 2) * np.cos(yaw / 2)
+ qw = np.cos(roll / 2) * np.cos(pitch / 2) * np.cos(yaw / 2) + np.sin(
+ roll / 2
+ ) * np.sin(pitch / 2) * np.sin(yaw / 2)
+ return (qx, qy, qz, qw)
+
+ def quaternion_to_euler(self, x, y, z, w):
+ """Conversion between quaternions and euler angles."""
+ t0 = +2.0 * (w * x + y * z)
+ t1 = +1.0 - 2.0 * (x * x + y * y)
+ X = math.atan2(t0, t1)
+ t2 = +2.0 * (w * y - z * x)
+ t2 = +1.0 if t2 > +1.0 else t2
+ t2 = -1.0 if t2 < -1.0 else t2
+ Y = math.asin(t2)
+ t3 = +2.0 * (w * z + x * y)
+ t4 = +1.0 - 2.0 * (y * y + z * z)
+ Z = math.atan2(t3, t4)
+ return (X, Y, Z)
+
+ def get_position(self, element, initial_orientation, diff_orientation, offset):
+ element = elements[element]
+ element_x, element_y = element.get("X"), element.get("Y")
+ offset_x, offset_y = offset[0], offset[1]
+
+ alpha = initial_orientation[0] - np.pi / 2
+ v = (
+ offset_x * np.cos(alpha) - offset_y * np.sin(alpha),
+ offset_x * np.sin(alpha) + offset_y * np.cos(alpha),
+ )
+ v = (
+ v[0] * np.cos(diff_orientation) - v[1] * np.sin(diff_orientation),
+ v[0] * np.sin(diff_orientation) + v[1] * np.cos(diff_orientation),
+ )
+ return (element_x - v[0], element_y - v[1])
+
+ def get_orientation(self, element, robot_pos, initial_orientation):
+ element = elements[element]
+ element_x, element_y = element.get("X"), element.get("Y")
+ robot_pos_x, robot_pos_y = robot_pos[0], robot_pos[1]
+
+ u = (np.cos(initial_orientation[0]), np.sin(initial_orientation[0]))
+ u = u / (np.sqrt(u[0] ** 2 + u[1] ** 2))
+ if element.get("Rot") is None:
+ v = (element_x - robot_pos_x, element_y - robot_pos_y)
+ v = v / (np.sqrt(v[0] ** 2 + v[1] ** 2))
+ theta = np.arccos(
+ np.dot(u, v)
+ / (np.sqrt(u[0] ** 2 + u[1] ** 2) * np.sqrt(v[0] ** 2 + v[1] ** 2))
+ )
+ if robot_pos_y <= element_y:
+ theta = theta
+ else:
+ theta = -theta
+ else:
+ angle = element.get("Rot")
+ theta = np.deg2rad(angle) - initial_orientation[0]
+ phi = initial_orientation[0] + theta
+ return (theta, phi)
+
+ def compute_best_action(self, action_list):
+ """Calculate best action to choose from its distance to the robot."""
+ return self.stupid_actions[self.i % len(self.stupid_actions)]
+ check = self.check_to_empty()
+ self.get_logger().info(f"{check}")
+ if check is not None:
+ action_list = check
+ if not action_list:
+ return None
+ coefficient_list = []
+ for action in action_list:
+ coefficient = 0
+ element = elements[action]
+ distance = np.sqrt(
+ (element["X"] - self._position[0]) ** 2
+ + (element["Y"] - self._position[1]) ** 2
+ )
+ coefficient += 100 * (1 - distance / 3.6)
+ coefficient += get_time_coeff(
+ self.get_clock().now().nanoseconds * 1e-9 - self._start_time,
+ action,
+ self.strategy_mode_param.value,
+ )
+ coefficient_list.append(coefficient)
+ best_action = action_list[coefficient_list.index(max(coefficient_list))]
+ return best_action
+
+ def check_to_empty(self):
+ empty_behind, empty_front = True, True
+ for pump_id, pump_dict in self.actuators.PUMPS.items():
+ if pump_dict.get("STATUS") is None:
+ if pump_dict.get("type") == NO:
+ empty_behind = False
+ else:
+ empty_front = False
+ self.get_logger().info(f"behind: {empty_behind}, front: {empty_front}")
+ if empty_behind or empty_front:
+ if self.side.value == "blue":
+ return ["CHENAL_BLEU_VERT_1", "CHENAL_BLEU_ROUGE_1"]
+ else:
+ return ["CHENAL_JAUNE_VERT_1", "CHENAL_JAUNE_ROUGE_1"]
+ return None
+
+ def get_goal_pose(self):
+ """Get goal pose for action."""
+ msg = NavigateToPose_Goal()
+ msg.pose.header.frame_id = "map"
+ # if self._current_action is not None:
+ # offset = (0, 0)
+ # if 'GOB' in self._current_action:
+ # for pump_id, pump_dict in self.actuators.PUMPS.items():
+ # if pump_dict.get('type') == NC and not pump_dict.get('STATUS'):
+ # self.current_pump = pump_id
+ # offset = pump_dict.get("pos")
+ # self.actuators.setPumpsEnabled(True, [pump_id])
+ # break
+ # elif 'ECUEIL' in self._current_action:
+ # for pump_id, pump_dict in self.actuators.PUMPS.items():
+ # if pump_dict.get('type') == NO and pump_dict.get('STATUS') is not None:
+ # offset = pump_dict.get("pos")
+ # break
+ # self.set_slider_position(0)
+ # elif 'CHENAL' in self._current_action:
+ # arriere = False
+ # for pump_id, pump_dict in self.actuators.PUMPS.items():
+ # if pump_dict.get('type') == NO and pump_dict.get('STATUS') is not None:
+ # arriere = True
+ # if arriere is True:
+ # offset = (0, -0.1)
+ # elements[self._current_action]["Rot"] = -90
+ # self.drop = NO
+ # else:
+ # offset = (0, 0.1)
+ # elements[self._current_action]["Rot"] = 90
+ # self.drop = NC
+ if self._position is not None:
+ offset = (0, 0)
+ (theta, phi) = self.get_orientation(
+ self._current_action, self._position, self._orientation
+ )
+ position = self.get_position(
+ self._current_action, self._orientation, theta, offset
+ )
+ msg.pose.pose.position.x = position[0]
+ msg.pose.pose.position.y = position[1]
+ msg.pose.pose.position.z = 0.0
+ q = self.euler_to_quaternion(phi)
+ msg.pose.pose.orientation.x = q[0]
+ msg.pose.pose.orientation.y = q[1]
+ msg.pose.pose.orientation.z = q[2]
+ msg.pose.pose.orientation.w = q[3]
+ self.blackboard.goal = msg
+ self.i += 1
+
+ def stop_ros(self, shutdown=True):
+ """Stop ros launch processes."""
+ # Publish "Robot is done"
+ lcd_msg = Lcd()
+ lcd_msg.line = 0
+ lcd_msg.text = f"{self.robot.capitalize()} is done!".ljust(16)
+ self._lcd_driver_pub.publish(lcd_msg)
+ if not self.simulation:
+ # Disable stepper drivers
+ self._get_enable_drivers_request.data = False
+ self._synchronous_call(
+ self._get_enable_drivers_client, self._get_enable_drivers_request
+ )
+ # Stop fans and relax servos
+ self.actuators.disableDynamixels()
+ self.actuators.setFansEnabled(False)
+ # Shutdown ROS
+ for p in psutil.process_iter(["pid", "name", "cmdline"]):
+ if "ros2" in p.name() and "launch" in p.cmdline():
+ self.get_logger().warn(f"Sent SIGINT to ros2 launch {p.pid}")
+ p.send_signal(SIGINT)
+ # shutdown linux
+ if shutdown:
+ call(["shutdown", "-h", "now"])
+
+ def destroy_node(self):
+ """Handle SIGINT/global destructor."""
+ self.actuators.disableDynamixels()
+ self.actuators.setFansEnabled(False)
+ super().destroy_node()
diff --git a/src/modules/cetautomatix/cetautomatix/robot.py b/src/modules/cetautomatix/cetautomatix/robot.py
index ae2cdafc..a66cff97 100644
--- a/src/modules/cetautomatix/cetautomatix/robot.py
+++ b/src/modules/cetautomatix/cetautomatix/robot.py
@@ -1,36 +1,23 @@
#!/usr/bin/env python3
-
+import rclpy
+import py_trees
+import psutil
import math
-import time
-from importlib import import_module
+import numpy as np
from signal import SIGINT
from subprocess import call
-from threading import Thread
+from importlib import import_module
from platform import machine
-
-import numpy as np
-import psutil
-
-import py_trees
-import rclpy
-from actuators.actuators import NC, NO
-from actuators_srvs.srv import Slider
-from cetautomatix import tf2_geometry_msgs
-from cetautomatix.magic_points import elements
-from cetautomatix.selftest import Selftest
-from cetautomatix.strategy_modes import get_time_coeff
-from lcd_msgs.msg import Lcd
-from nav2_msgs.action._navigate_to_pose import NavigateToPose_Goal
-from nav_msgs.msg import Odometry
from rclpy.node import Node
-from rclpy.time import Duration, Time
+from lcd_msgs.msg import Lcd
from std_srvs.srv import SetBool, Trigger
+from geometry_msgs.msg import PoseStamped
+from nav2_msgs.action._navigate_to_pose import NavigateToPose_Goal
+from strategix_msgs.srv import GetAvailableActions, ChangeActionStatus
from strategix.actions import actions
-from strategix_msgs.srv import ChangeActionStatus, GetAvailableActions
-from tf2_ros import LookupException, ConnectivityException
-from tf2_ros.buffer import Buffer
-from tf2_ros.transform_listener import TransformListener
+from strategix.strategy_modes import get_time_coeff
+from cetautomatix.selftest import Selftest
class Robot(Node):
@@ -38,93 +25,84 @@ def __init__(self):
super().__init__(node_name="cetautomatix")
# Detect simulation mode
self.simulation = True if machine() != "aarch64" else False
- self.i = 0
- self.stupid_actions = ["STUPID_1", "STUPID_2", "STUPID_3"]
- self._triggered = False
- self._position = None
- self._orientation = None
- self._start_time = None
- self._current_action = None
- self.robot = self.get_namespace().strip("/")
- # parameters interfaces
- self.side = self.declare_parameter("side", "blue")
+ self.name = self.get_namespace().strip("/")
+ # Declare parameters
+ self.triggered = False
self.declare_parameter("length")
self.declare_parameter("width")
self.declare_parameter("strategy_mode")
- self.length_param = self.get_parameter("length")
- self.width_param = self.get_parameter("width")
- self.strategy_mode_param = self.get_parameter("strategy_mode")
+ self.length = self.get_parameter("length")
+ self.width = self.get_parameter("width")
+ self.strategy_mode = self.get_parameter("strategy_mode")
# Bind actuators
- self.actuators = import_module(f"actuators.{self.robot}").actuators
+ self.actuators = import_module(f"actuators.{self.name}").actuators
# Do selftest
self.selftest = Selftest(self)
# Prechill engines
self.actuators.setFansEnabled(True)
- # Stop ros service
- self._stop_ros_service = self.create_service(
- Trigger, "stop", self._stop_robot_callback
+ # Create empty action queue
+ self.action_list = []
+ self.current_action = None
+ # Stop ROS service
+ self.stop_ros_service = self.create_service(
+ Trigger, "stop", self.stop_robot_callback
)
- # strategix client interfaces
- self._get_available_request = GetAvailableActions.Request()
- self._get_available_request.sender = self.robot
- self._get_available_client = self.create_client(
+ # Strategix client to get available actions
+ self.get_available_actions_request = GetAvailableActions.Request()
+ self.get_available_actions_request.sender = self.name
+ self.get_available_actions_client = self.create_client(
GetAvailableActions, "/strategix/available"
)
- self._change_action_status_request = ChangeActionStatus.Request()
- self._change_action_status_request.sender = self.robot
- self._change_action_status_client = self.create_client(
+ # Strategix client to change the status of an action
+ self.change_action_status_request = ChangeActionStatus.Request()
+ self.change_action_status_request.sender = self.name
+ self.change_action_status_client = self.create_client(
ChangeActionStatus, "/strategix/action"
)
- # Phararon delploy client interfaces
- self._get_trigger_deploy_pharaon_request = Trigger.Request()
- self._get_trigger_deploy_pharaon_client = self.create_client(
- Trigger, "/pharaon/deploy"
- )
- # Slider driver
- self._set_slider_position_request = Slider.Request()
- self._set_slider_position_client = self.create_client(Slider, "slider_position")
# Odometry subscriber
- self._tf_buffer = Buffer()
- self._tf_listener = TransformListener(self._tf_buffer, self)
- self._odom_pose_stamped = tf2_geometry_msgs.PoseStamped()
- self._odom_callback(self._odom_pose_stamped)
- self._odom_sub = self.create_subscription(
- Odometry, "odom", self._odom_callback, 1
+ self.odom_sub = self.create_subscription(
+ PoseStamped, "odom_map_relative", self.odom_callback, 1
)
# Py-Trees blackboard to send NavigateToPose actions
self.blackboard = py_trees.blackboard.Client(name="NavigateToPose")
self.blackboard.register_key(key="goal", access=py_trees.common.Access.WRITE)
# LCD driver direct access
- self._lcd_driver_pub = self.create_publisher(Lcd, "/obelix/lcd", 1)
- # Wait for strategix as this can block the behavior Tree
- while not self._get_available_client.wait_for_service(timeout_sec=5):
+ self.lcd_driver_pub = self.create_publisher(Lcd, "/obelix/lcd", 1)
+ # Wait for Strategix as this can block the whole Behaviour Tree
+ while not self.get_available_actions_client.wait_for_service(timeout_sec=5):
self.get_logger().warn(
"Failed to contact strategix services ! Has it been started ?"
)
# Enable stepper drivers
if not self.simulation:
- self._get_enable_drivers_request = SetBool.Request()
- self._get_enable_drivers_request.data = True
- self._get_enable_drivers_client = self.create_client(
+ self.get_enable_drivers_request = SetBool.Request()
+ self.get_enable_drivers_request.data = True
+ self.get_enable_drivers_client = self.create_client(
SetBool, "enable_drivers"
)
- self._synchronous_call(
- self._get_enable_drivers_client, self._get_enable_drivers_request
+ self.synchronous_call(
+ self.get_enable_drivers_client, self.get_enable_drivers_request
)
- # Robot trigger service
- self._trigger_start_robot_server = self.create_service(
- Trigger, "start", self._start_robot_callback
+ # Pharaon trigger client
+ self.trigger_deploy_pharaon_request = Trigger.Request()
+ self.trigger_deploy_pharaon_client = self.create_client(
+ Trigger, "/pharaon/deploy"
+ )
+ # Robot start trigger service
+ self.trigger_start_robot_server = self.create_service(
+ Trigger, "start", self.start_robot_callback
)
- if self.robot == "obelix":
- self._trigger_start_asterix_request = Trigger.Request()
- self._trigger_start_asterix_client = self.create_client(
+ if self.name == "obelix":
+ self.trigger_start_asterix_request = Trigger.Request()
+ self.trigger_start_asterix_client = self.create_client(
Trigger, "/asterix/start"
)
-
# Reached initialized state
self.get_logger().info("Cetautomatix ROS node has been started")
- def _synchronous_call(self, client, request):
+ """~~~~~~~~~~~~~~ BASE FUNCTIONS ~~~~~~~~~~~~~~"""
+
+ def synchronous_call(self, client, request):
"""Call service synchronously."""
future = client.call_async(request)
rclpy.spin_until_future_complete(self, future)
@@ -134,17 +112,10 @@ def _synchronous_call(self, client, request):
response = None
return response
- def set_slider_position(self, position: int):
- """Set slider position with position in range 0 to 255."""
- self._set_slider_position_request.position = position
- return self._synchronous_call(
- self._set_slider_position_client, self._set_slider_position_request
- )
-
def fetch_available_actions(self):
- """Fetch available actions for BT."""
- response = self._synchronous_call(
- self._get_available_client, self._get_available_request
+ """Fetch available actions from Strategix."""
+ response = self.synchronous_call(
+ self.get_available_actions_client, self.get_available_actions_request
)
return response.available if response is not None else []
@@ -152,165 +123,114 @@ def preempt_action(self, action):
"""Preempt an action for the BT."""
if action is None:
return False
- self._change_action_status_request.action = str(action)
- self._change_action_status_request.request = "PREEMPT"
- response = self._synchronous_call(
- self._change_action_status_client, self._change_action_status_request
+ self.change_action_status_request.action = str(action)
+ self.change_action_status_request.request = "PREEMPT"
+ response = self.synchronous_call(
+ self.change_action_status_client, self.change_action_status_request
)
if response is None:
return False
- self._current_action = action if response.success else None
- if self._current_action is not None:
- self.get_goal_pose()
+ self.current_action = action if response.success else None
+ if self.current_action is not None:
+ self.set_goal_pose()
+ actions.get(self.current_action).preempt_action(self, actions)
return response.success
def drop_current_action(self):
"""Drop an action for the BT."""
- if self._current_action is None:
+ if self.current_action is None:
return False
- self._change_action_status_request.action = self._current_action
- self._change_action_status_request.request = "DROP"
- response = self._synchronous_call(
- self._change_action_status_client, self._change_action_status_request
+ self.change_action_status_request.action = self.current_action
+ self.change_action_status_request.request = "DROP"
+ response = self.synchronous_call(
+ self.change_action_status_client, self.change_action_status_request
)
if response is None:
return False
- self._current_action = None
+ actions.get(self.current_action).release_action(self)
+ self.current_action = None
return response.success
def confirm_current_action(self):
"""Confirm an action for the BT."""
- if self._current_action is None:
+ if self.current_action is None:
return False
- self._change_action_status_request.action = self._current_action
- self._change_action_status_request.request = "CONFIRM"
- response = self._synchronous_call(
- self._change_action_status_client, self._change_action_status_request
+ self.change_action_status_request.action = self.current_action
+ self.change_action_status_request.request = "CONFIRM"
+ response = self.synchronous_call(
+ self.change_action_status_client, self.change_action_status_request
)
if response is None:
return False
- self._current_action = None
+ actions.get(self.current_action).finish_action(self)
+ self.current_action = None
return response.success
def start_actuator_action(self):
- self.get_logger().info(f"START ACTUATOR {self._current_action}")
- if "PHARE" in self._current_action:
- response = self._synchronous_call(
- self._get_trigger_deploy_pharaon_client,
- self._get_trigger_deploy_pharaon_request,
- )
- return response.success
- elif "GOB" in self._current_action:
- self.actuators.PUMPS[self.current_pump]["STATUS"] = self._current_action
- servo = self.actuators.DYNAMIXELS[self.current_pump]
- self.actuators.arbotix.setPosition(self.current_pump, servo.get("up"))
- return True
- elif "ECUEIL" in self._current_action:
- pump_list = []
- i = 0
- for pump_id, pump_dict in self.actuators.PUMPS.items():
- if pump_dict.get("type") == NO:
- pump_list.append(pump_id)
- self.actuators.PUMPS[pump_id]["STATUS"] = actions[
- self._current_action
- ].get("GOBS")[i]
- i += 1
- self.actuators.setPumpsEnabled(True, pump_list)
- # TODO: Fermer Herse
- self.set_slider_position(255)
- return True
- elif "CHENAL" in self._current_action:
- color = "GREEN" if "VERT" in self._current_action else "RED"
- pump_list = []
- for pump_id, pump_dict in self.actuators.PUMPS.items():
- if (
- pump_dict.get("type") == self.drop
- and pump_dict.get("STATUS") == color
- ):
- pump_list.append(pump_id)
- if self.drop == NC:
- for pump in pump_list:
- servo = self.actuators.DYNAMIXELS[pump]
- self.actuators.arbotix.setPosition(pump, servo.get("down"))
- else:
- self.set_slider_position(100)
- # TODO: Ouvrir herse
- self.actuators.setPumpsEnabled(False, pump_list)
- for pump in pump_list:
- del self.actuators.PUMPS[pump]["STATUS"]
- return True
- else:
- return True
+ """Start the actuator action after the robot has reached its destination."""
+ self.get_logger().info(f"START ACTUATOR {self.current_action}")
+ return actions.get(self.current_action).start_actuator(self)
- def trigger_pavillons(self):
- self.get_logger().info("Triggered pavillons")
- self.actuators.raiseTheFlag()
- self._change_action_status_request.action = "PAVILLON"
- self._change_action_status_request.request = "CONFIRM"
- response = self._synchronous_call(
- self._change_action_status_client, self._change_action_status_request
- )
- if response is None:
- return False
- return response.success
-
- def _start_robot_callback(self, req, resp):
+ def start_robot_callback(self, req, resp):
"""Start robot."""
- self._triggered = True
+ self.triggered = True
self.get_logger().info("Triggered robot starter")
- if self.robot == "obelix":
+ if self.name == "obelix":
# TODO : Fix sync call
- # Thread(target=self._synchronous_call, args=[self._trigger_start_asterix_client, self._trigger_start_asterix_request]).start()
+ # Thread(target=self.synchronous_call, args=[self.trigger_start_asterix_client, self.trigger_start_asterix_request]).start()
self.get_logger().info("Obelix triggered Asterix")
- self._start_time = self.get_clock().now().nanoseconds * 1e-9
+ self.start_time = self.get_clock().now().nanoseconds * 1e-9
lcd_msg = Lcd()
lcd_msg.line = 0
- lcd_msg.text = f"{self.robot.capitalize()} running".ljust(16)
- self._lcd_driver_pub.publish(lcd_msg)
+ lcd_msg.text = f"{self.name.capitalize()} running".ljust(16)
+ self.lcd_driver_pub.publish(lcd_msg)
lcd_msg.line = 1
lcd_msg.text = "Score: 0".ljust(16)
- self._lcd_driver_pub.publish(lcd_msg)
+ self.lcd_driver_pub.publish(lcd_msg)
if hasattr(resp, "success"):
resp.success = True
return resp
- def _stop_robot_callback(self, req, resp):
+ def odom_callback(self, msg):
+ """Odom callback"""
+ q = msg.pose.orientation
+ self.position = (msg.pose.position.x, msg.pose.position.y)
+ self.orientation = self.quaternion_to_euler(q.x, q.y, q.z, q.w)[2]
+
+ def stop_robot_callback(self, req, resp):
"""Stop robot / ROS."""
self.stop_ros(shutdown=False)
resp.sucess = True
return resp
- def triggered(self):
- """Triggered var."""
- return self._triggered
-
- def _odom_callback(self, msg):
- try:
- # Get latest transform
- tf = self._tf_buffer.lookup_transform(
- "map", "base_link", Time(), timeout=Duration(seconds=1.0)
- )
- self._odom_pose_stamped.header = msg.header
- self._odom_pose_stamped.pose = msg.pose.pose
- tf_pose = tf2_geometry_msgs.do_transform_pose(self._odom_pose_stamped, tf)
- except LookupException:
- self.get_logger().warn("Failed to lookup_transform from map to odom")
- return
- except ConnectivityException:
- self.get_logger().warn(
- "ConnectivityException, 'map' and 'base_link' are not part of the same tree"
+ def stop_ros(self, shutdown=True):
+ """Stop ros launch processes."""
+ # Publish "Robot is done"
+ lcd_msg = Lcd()
+ lcd_msg.line = 0
+ lcd_msg.text = f"{self.name.capitalize()} is done!".ljust(16)
+ self.lcd_driver_pub.publish(lcd_msg)
+ if not self.simulation:
+ # Disable stepper drivers
+ self.get_enable_drivers_request.data = False
+ self.synchronous_call(
+ self.get_enable_drivers_client, self.get_enable_drivers_request
)
- return
- self._position = (tf_pose.pose.position.x, tf_pose.pose.position.y)
- self._orientation = self.quaternion_to_euler(
- tf_pose.pose.orientation.x,
- tf_pose.pose.orientation.y,
- tf_pose.pose.orientation.z,
- tf_pose.pose.orientation.w,
- )
+ # Stop fans and relax servos
+ self.actuators.disableDynamixels()
+ self.actuators.setFansEnabled(False)
+ # Shutdown ROS
+ for p in psutil.process_iter(["pid", "name", "cmdline"]):
+ if "ros2" in p.name() and "launch" in p.cmdline():
+ self.get_logger().warn(f"Sent SIGINT to ros2 launch {p.pid}")
+ p.send_signal(SIGINT)
+ # Shutdown Linux
+ if shutdown:
+ call(["shutdown", "-h", "now"])
def euler_to_quaternion(self, yaw, pitch=0, roll=0):
"""Conversion between euler angles and quaternions."""
+ yaw, pitch, roll = np.deg2rad(yaw), np.deg2rad(pitch), np.deg2rad(roll)
qx = np.sin(roll / 2) * np.cos(pitch / 2) * np.cos(yaw / 2) - np.cos(
roll / 2
) * np.sin(pitch / 2) * np.sin(yaw / 2)
@@ -339,167 +259,62 @@ def quaternion_to_euler(self, x, y, z, w):
Z = math.atan2(t3, t4)
return (X, Y, Z)
- def get_position(self, element, initial_orientation, diff_orientation, offset):
- element = elements[element]
- element_x, element_y = element.get("X"), element.get("Y")
- offset_x, offset_y = offset[0], offset[1]
+ def destroy_node(self):
+ """Handle SIGINT/global destructor."""
+ self.actuators.disableDynamixels()
+ self.actuators.setFansEnabled(False)
+ super().destroy_node()
- alpha = initial_orientation[0] - np.pi / 2
- v = (
- offset_x * np.cos(alpha) - offset_y * np.sin(alpha),
- offset_x * np.sin(alpha) + offset_y * np.cos(alpha),
- )
- v = (
- v[0] * np.cos(diff_orientation) - v[1] * np.sin(diff_orientation),
- v[0] * np.sin(diff_orientation) + v[1] * np.cos(diff_orientation),
- )
- return (element_x - v[0], element_y - v[1])
+ def set_goal_pose(self):
+ """Set goal pose of action in blackboard."""
+ msg = NavigateToPose_Goal()
+ msg.pose.header.frame_id = "map"
+ orientation = actions.get(self.current_action).get_initial_orientation(self)
+ position = actions.get(self.current_action).get_initial_position(self)
+ msg.pose.pose.position.x = position[0]
+ msg.pose.pose.position.y = position[1]
+ msg.pose.pose.position.z = 0.0
+ q = self.euler_to_quaternion(orientation)
+ msg.pose.pose.orientation.x = q[0]
+ msg.pose.pose.orientation.y = q[1]
+ msg.pose.pose.orientation.z = q[2]
+ msg.pose.pose.orientation.w = q[3]
+ self.blackboard.goal = msg
- def get_orientation(self, element, robot_pos, initial_orientation):
- element = elements[element]
- element_x, element_y = element.get("X"), element.get("Y")
- robot_pos_x, robot_pos_y = robot_pos[0], robot_pos[1]
+ """~~~~~~~~~~~~~~ CUSTOM FUNCTIONS ~~~~~~~~~~~~~~"""
- u = (np.cos(initial_orientation[0]), np.sin(initial_orientation[0]))
- u = u / (np.sqrt(u[0] ** 2 + u[1] ** 2))
- if element.get("Rot") is None:
- v = (element_x - robot_pos_x, element_y - robot_pos_y)
- v = v / (np.sqrt(v[0] ** 2 + v[1] ** 2))
- theta = np.arccos(
- np.dot(u, v)
- / (np.sqrt(u[0] ** 2 + u[1] ** 2) * np.sqrt(v[0] ** 2 + v[1] ** 2))
- )
- if robot_pos_y <= element_y:
- theta = theta
- else:
- theta = -theta
- else:
- angle = element.get("Rot")
- theta = np.deg2rad(angle) - initial_orientation[0]
- phi = initial_orientation[0] + theta
- return (theta, phi)
+ def trigger_pavillons(self):
+ """Function to trigger the Pavillon after 95 seconds."""
+ self.get_logger().info("Triggered pavillons")
+ self.actuators.raiseTheFlag()
+ self.change_action_status_request.action = "PAVILLON"
+ self.change_action_status_request.request = "CONFIRM"
+ response = self.synchronous_call(
+ self.change_action_status_client, self.change_action_status_request
+ )
+ if response is None:
+ return False
+ return response.success
def compute_best_action(self, action_list):
- """Calculate best action to choose from its distance to the robot."""
- return self.stupid_actions[self.i % len(self.stupid_actions)]
- check = self.check_to_empty()
- self.get_logger().info(f"{check}")
- if check is not None:
- action_list = check
+ """Calculate best action to choose from its distance to the robot and the time passed."""
if not action_list:
return None
coefficient_list = []
- for action in action_list:
+ for action_id in action_list:
+ action = actions.get(action_id)
coefficient = 0
- element = elements[action]
distance = np.sqrt(
- (element["X"] - self._position[0]) ** 2
- + (element["Y"] - self._position[1]) ** 2
+ (action.position[0] - self.position[0]) ** 2
+ + (action.position[1] - self.position[1]) ** 2
)
coefficient += 100 * (1 - distance / 3.6)
coefficient += get_time_coeff(
- self.get_clock().now().nanoseconds * 1e-9 - self._start_time,
+ self.get_clock().now().nanoseconds * 1e-9 - self.start_time,
action,
- self.strategy_mode_param.value,
+ self.strategy_mode.value,
)
coefficient_list.append(coefficient)
+ # Return best action based on the one with a maximum coefficient
best_action = action_list[coefficient_list.index(max(coefficient_list))]
return best_action
-
- def check_to_empty(self):
- empty_behind, empty_front = True, True
- for pump_id, pump_dict in self.actuators.PUMPS.items():
- if pump_dict.get("STATUS") is None:
- if pump_dict.get("type") == NO:
- empty_behind = False
- else:
- empty_front = False
- self.get_logger().info(f"behind: {empty_behind}, front: {empty_front}")
- if empty_behind or empty_front:
- if self.side.value == "blue":
- return ["CHENAL_BLEU_VERT_1", "CHENAL_BLEU_ROUGE_1"]
- else:
- return ["CHENAL_JAUNE_VERT_1", "CHENAL_JAUNE_ROUGE_1"]
- return None
-
- def get_goal_pose(self):
- """Get goal pose for action."""
- msg = NavigateToPose_Goal()
- msg.pose.header.frame_id = "map"
- # if self._current_action is not None:
- # offset = (0, 0)
- # if 'GOB' in self._current_action:
- # for pump_id, pump_dict in self.actuators.PUMPS.items():
- # if pump_dict.get('type') == NC and not pump_dict.get('STATUS'):
- # self.current_pump = pump_id
- # offset = pump_dict.get("pos")
- # self.actuators.setPumpsEnabled(True, [pump_id])
- # break
- # elif 'ECUEIL' in self._current_action:
- # for pump_id, pump_dict in self.actuators.PUMPS.items():
- # if pump_dict.get('type') == NO and pump_dict.get('STATUS') is not None:
- # offset = pump_dict.get("pos")
- # break
- # self.set_slider_position(0)
- # elif 'CHENAL' in self._current_action:
- # arriere = False
- # for pump_id, pump_dict in self.actuators.PUMPS.items():
- # if pump_dict.get('type') == NO and pump_dict.get('STATUS') is not None:
- # arriere = True
- # if arriere is True:
- # offset = (0, -0.1)
- # elements[self._current_action]["Rot"] = -90
- # self.drop = NO
- # else:
- # offset = (0, 0.1)
- # elements[self._current_action]["Rot"] = 90
- # self.drop = NC
- if self._position is not None:
- offset = (0, 0)
- (theta, phi) = self.get_orientation(
- self._current_action, self._position, self._orientation
- )
- position = self.get_position(
- self._current_action, self._orientation, theta, offset
- )
- msg.pose.pose.position.x = position[0]
- msg.pose.pose.position.y = position[1]
- msg.pose.pose.position.z = 0.0
- q = self.euler_to_quaternion(phi)
- msg.pose.pose.orientation.x = q[0]
- msg.pose.pose.orientation.y = q[1]
- msg.pose.pose.orientation.z = q[2]
- msg.pose.pose.orientation.w = q[3]
- self.blackboard.goal = msg
- self.i += 1
-
- def stop_ros(self, shutdown=True):
- """Stop ros launch processes."""
- # Publish "Robot is done"
- lcd_msg = Lcd()
- lcd_msg.line = 0
- lcd_msg.text = f"{self.robot.capitalize()} is done!".ljust(16)
- self._lcd_driver_pub.publish(lcd_msg)
- if not self.simulation:
- # Disable stepper drivers
- self._get_enable_drivers_request.data = False
- self._synchronous_call(
- self._get_enable_drivers_client, self._get_enable_drivers_request
- )
- # Stop fans and relax servos
- self.actuators.disableDynamixels()
- self.actuators.setFansEnabled(False)
- # Shutdown ROS
- for p in psutil.process_iter(["pid", "name", "cmdline"]):
- if "ros2" in p.name() and "launch" in p.cmdline():
- self.get_logger().warn(f"Sent SIGINT to ros2 launch {p.pid}")
- p.send_signal(SIGINT)
- # shutdown linux
- if shutdown:
- call(["shutdown", "-h", "now"])
-
- def destroy_node(self):
- """Handle SIGINT/global destructor."""
- self.actuators.disableDynamixels()
- self.actuators.setFansEnabled(False)
- super().destroy_node()
diff --git a/src/modules/cetautomatix/cetautomatix/strategy_modes.py b/src/modules/cetautomatix/cetautomatix/strategy_modes.py
deleted file mode 100644
index 67bae792..00000000
--- a/src/modules/cetautomatix/cetautomatix/strategy_modes.py
+++ /dev/null
@@ -1,27 +0,0 @@
-strategies = {
- "NORMAL": {
- "ECUEIL_COULEUR": [[0, 0], [25, 0], [50, 50], [100, 50]],
- "ECUEIL_GENERAL": [[0, 50], [100, 50]],
- "PHARE": [[0, 0], [25, 0], [75, 100], [100, 100]],
- }
-}
-
-
-def get_time_coeff(time, action, strategy):
- if action == "ECUEIL_BLEU" or action == "ECUEIL_JAUNE":
- action = "ECUEIL_COULEUR"
- elif action == "ECUEIL_1" or action == "ECUEIL_2":
- action = "ECUEIL_GENERAL"
- elif "PHARE" in action:
- action = "PHARE"
- element = strategies.get(strategy).get(action)
- if element is None:
- return 0
- for i in range(len(element)):
- if time < element[i][0]:
- # Calculate a score value between 0 and 100 depending on the time value
- score = element[i - 1][1] + (time - element[i - 1][0]) * (
- element[i][1] - element[i - 1][1]
- ) / (element[i][0] - element[i - 1][0])
- return score
- return element[i][-1]
diff --git a/src/modules/cetautomatix/cetautomatix/tree.py b/src/modules/cetautomatix/cetautomatix/tree.py
index 23227d6c..374d28d2 100644
--- a/src/modules/cetautomatix/cetautomatix/tree.py
+++ b/src/modules/cetautomatix/cetautomatix/tree.py
@@ -57,7 +57,7 @@ def create_tree(robot) -> py_trees.behaviour.Behaviour:
pavillon = PavillonAction(name="Pavillon Action", robot=robot)
- # Asterix Root
+ # Robot Root
all_actions = py_trees.composites.Parallel(
name="All Actions",
policy=py_trees.common.ParallelPolicy.SuccessOnOne(),
diff --git a/src/modules/cetautomatix/cetautomatix/utils/plot.py b/src/modules/cetautomatix/cetautomatix/utils/plot.py
deleted file mode 100644
index ffe6afaa..00000000
--- a/src/modules/cetautomatix/cetautomatix/utils/plot.py
+++ /dev/null
@@ -1,112 +0,0 @@
-#!/usr/bin/env python3
-
-
-import matplotlib.pyplot as plt
-
-from cetautomatix import magic_points
-
-plt.axes()
-border = plt.Rectangle((0, 0), 3, 2, ec="black", fc="w", lw=5)
-plt.gca().add_patch(border)
-for key, value in magic_points.elements.items():
- if "ECUEIL" in key:
- if "BLEU" in key:
- line = plt.Line2D(
- (value[0], value[0]),
- (value[1] - 0.2095, value[1] + 0.2095),
- lw=5,
- color="b",
- )
- plt.gca().add_line(line)
- elif "JAUNE" in key:
- line = plt.Line2D(
- (value[0], value[0]),
- (value[1] - 0.2095, value[1] + 0.2095),
- lw=5,
- color="y",
- )
- plt.gca().add_line(line)
- else:
- line = plt.Line2D(
- (value[0] - 0.2095, value[0] + 0.2095),
- (value[1], value[1]),
- lw=5,
- color="grey",
- )
- plt.gca().add_line(line)
- elif "ARUCO" in key:
- aruco = plt.Rectangle(
- (value[0] - 0.05, value[1] - 0.05), 0.1, 0.1, ec="black", fc="w", lw=2
- )
- plt.gca().add_patch(aruco)
- elif "MANCHE" in key:
- line = plt.Line2D(
- (value[0] - 0.05, value[0] + 0.05), (value[1], value[1]), lw=5, color="grey"
- )
- plt.gca().add_line(line)
- elif "PHARE" in key:
- if "BLEU" in key:
- line = plt.Line2D(
- (value[0] - 0.225, value[0] + 0.225),
- (value[1], value[1]),
- lw=5,
- color="b",
- )
- plt.gca().add_line(line)
- else:
- line = plt.Line2D(
- (value[0] - 0.225, value[0] + 0.225),
- (value[1], value[1]),
- lw=5,
- color="y",
- )
- plt.gca().add_line(line)
- elif "GOB" in key:
- for key2, value2 in value.items():
- if "VERT" in key:
- circle = plt.Circle(value2, radius=0.03, fc="g")
- plt.gca().add_patch(circle)
- else:
- circle = plt.Circle(value2, radius=0.03, fc="r")
- plt.gca().add_patch(circle)
- elif "ZONES" in key:
- for key2, value2 in value.items():
- if "CHENAL" in key2:
- if "ROUGE" in key2:
- chenal = plt.Rectangle(
- (value2[0], value2[1]),
- value2[2] - value2[0],
- value2[3] - value2[1],
- fc="r",
- )
- plt.gca().add_patch(chenal)
- else:
- chenal = plt.Rectangle(
- (value2[0], value2[1]),
- value2[2] - value2[0],
- value2[3] - value2[1],
- fc="g",
- )
- plt.gca().add_patch(chenal)
- elif "PORT" in key2:
- if "BLEU" in key2:
- port = plt.Rectangle(
- (value2[0], value2[1]),
- value2[2] - value2[0],
- value2[3] - value2[1],
- ec="b",
- fc="w",
- )
- plt.gca().add_patch(port)
- else:
- port = plt.Rectangle(
- (value2[0], value2[1]),
- value2[2] - value2[0],
- value2[3] - value2[1],
- ec="y",
- fc="w",
- )
- plt.gca().add_patch(port)
-
-plt.axis("scaled")
-plt.show()
diff --git a/src/modules/cetautomatix/package.xml b/src/modules/cetautomatix/package.xml
index 1f456c7b..3c73139a 100644
--- a/src/modules/cetautomatix/package.xml
+++ b/src/modules/cetautomatix/package.xml
@@ -10,6 +10,7 @@
rclpy
std_msgs
strategix_msgs
+ python3-tf2_kdl
ament_python