In [1]:
# ROS imports
import rospy
import actionlib
from geometry_msgs.msg import Pose, Twist
from nav_msgs.msg import Odometry
from sensor_msgs.msg import LaserScan
from assignment_2_2024.msg import PlanningAction, PlanningGoal, PositionVelocity
from actionlib_msgs.msg import GoalStatus

# Widgets and plotting
from ipywidgets import BoundedFloatText, Button, VBox, HBox, Box, Label, Output, Layout
from IPython.display import display
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

import time
from gazebo_msgs.srv import SetModelState
from gazebo_msgs.msg import ModelState
import tf
from tf.transformations import euler_from_quaternion

In [2]:
# ---- Initialize ROS Node ----
rospy.init_node('notebook_client', anonymous=True)

# Action client
client = actionlib.SimpleActionClient('/reaching_goal', PlanningAction)
client.wait_for_server()

True

In [3]:
# Global variables to store robot info
current_pos = {"x": 0.0, "y": 0.0}
robot_path = []
obstacle_distance = float('inf')
reached_goals = 0
cancelled_goals = 0
goal_start_time = None
# Global velocity status
last_movement_time = time.time()
last_correct_or_time = time.time()
is_stuck = False
vel_threshold = 0.1  # threshold below which we consider velocity zero
is_flipped = False

In [4]:
# ---- Widgets ----
x_input = BoundedFloatText(
    description='Target X:',
    value=0.0,
    min=-10.0,
    max=10.0,
    step=0.1
)

y_input = BoundedFloatText(
    description='Target Y:',
    value=0.0,
    min=-10.0,
    max=10.0,
    step=0.1
)
btn_set = Button(description='Set Goal', button_style='success')
btn_cancel = Button(description='Cancel Goal', button_style='danger')
btn_respawn = Button(description='Respawn', button_style='info')

info_pos = Label("Current Pos: (0.00, 0.00)")
info_vel = Label("Velocity: 0.00 lin / 0.00 ang")
info_obs = Label("Closest Obstacle: N/A")
goal_output = Output()

In [5]:
# ---- Callbacks ----
def odom_callback(msg):
    global current_pos, robot_path
    current_pos['x'] = msg.pose.pose.position.x
    current_pos['y'] = msg.pose.pose.position.y
    vx = msg.twist.twist.linear.x
    vz = msg.twist.twist.angular.z
    robot_path.append((current_pos['x'], current_pos['y']))

    info_pos.value = f"Current Pos: ({current_pos['x']:.2f}, {current_pos['y']:.2f})"
    info_vel.value = f"Velocity: {vx:.2f} lin / {vz:.2f} ang"    
    
    global last_movement_time, last_correct_or_time, is_stuck, is_flipped
    vx = abs(msg.twist.twist.linear.x)
    wz = abs(msg.twist.twist.angular.z)

    # If moving
    if vx > vel_threshold or wz > vel_threshold:
        last_movement_time = time.time()
        is_stuck = False
    else:
        # If stationary too long
        if time.time() - last_movement_time > 10:
            is_stuck = True
            
    orientation_q = msg.pose.pose.orientation
    orientation_list = [orientation_q.x, orientation_q.y, orientation_q.z, orientation_q.w]
    roll, pitch, yaw = euler_from_quaternion(orientation_list)

    if abs(roll) > 0.5 or abs(pitch) > 0.5:
        if time.time() - last_correct_or_time > 10:
            is_flipped = True        
    else: 
        last_correct_or_time = time.time()
        is_flipped = False        
        

def scan_callback(msg):
    global obstacle_distance
    valid_ranges = [r for r in msg.ranges if r > 0.01]
    if valid_ranges:
        obstacle_distance = min(valid_ranges)
        info_obs.value = f"Closest Obstacle: {obstacle_distance:.2f} m"

In [6]:
# ---- Action Functions ----
def send_goal(btn):
    global goal_start_time
    goal = PlanningGoal()
    goal.target_pose.pose.position.x = x_input.value
    goal.target_pose.pose.position.y = y_input.value
    
    goal_start_time = time.time()  # Record time when goal is sent
    client.send_goal(goal, done_cb=goal_done_callback)

    with goal_output:
        print(f"Sent goal to ({x_input.value}, {y_input.value})")

def goal_done_callback(state, result):
    global reached_goals, goal_start_time
    
    time_taken = time.time() - goal_start_time if goal_start_time else 0

    # Callback when goal is done
    if state == GoalStatus.SUCCEEDED:
        reached_goals += 1
        with goal_output:
            print(f"Goal successfully reached in {time_taken:.2f} seconds")
    else:
        with goal_output:
            print(f"Goal finished with state: {state} after {time_taken:.2f} seconds")

def cancel_goal(btn):
    global cancelled_goals

    # Check if the goal is still active or pending
    state = client.get_state()
    if state in [GoalStatus.ACTIVE, GoalStatus.PENDING]:
        client.cancel_goal()
        cancelled_goals += 1
        with goal_output:
            print("Goal cancelled")
    else:
        with goal_output:
            print("No active goal to cancel")
            
def respawn_robot(btn=None):
    rospy.wait_for_service('/gazebo/set_model_state')
    try:
        set_state = rospy.ServiceProxy('/gazebo/set_model_state', SetModelState)

        state_msg = ModelState()
        state_msg.model_name = 'robot1'
        state_msg.pose.position.x = 0.0
        state_msg.pose.position.y = 1.0
        state_msg.pose.position.z = 0.0

        # Convert yaw to quaternion (yaw = 0.0)
        quaternion = tf.transformations.quaternion_from_euler(0, 0, 0)
        state_msg.pose.orientation.x = quaternion[0]
        state_msg.pose.orientation.y = quaternion[1]
        state_msg.pose.orientation.z = quaternion[2]
        state_msg.pose.orientation.w = quaternion[3]

        state_msg.twist.linear.x = 0.0
        state_msg.twist.angular.z = 0.0
        state_msg.reference_frame = 'world'

        resp = set_state(state_msg)

        with goal_output:
            print("Robot respawned at (0.0, 1.0) with yaw 0.0")
    except rospy.ServiceException as e:
        with goal_output:
            print("Failed to respawn robot:", e)


In [7]:
import os
import random
import csv
import subprocess
from actionlib_msgs.msg import GoalStatus

# Timeout for each trial
TIMEOUT = 130.0

GOAL_FILE = "goals.csv"
RESULT_FILE = "results.csv"

def load_or_generate_goals(n_goals=30):
    goals = []

    if os.path.exists(GOAL_FILE):
        with open(GOAL_FILE, mode='r') as file:
            reader = csv.DictReader(file)
            goals = [(float(row["x"]), float(row["y"])) for row in reader]
        print(f"📄 Loaded {len(goals)} goals from {GOAL_FILE}")
    else:
        with open(GOAL_FILE, mode='w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=["x", "y"])
            writer.writeheader()
            for _ in range(n_goals):
                x = random.uniform(-9, 9)
                y = random.uniform(-9, 9)
                writer.writerow({"x": x, "y": y})
                goals.append((x, y))
        print(f"📝 Generated and saved {n_goals} random goals to {GOAL_FILE}")

    return goals

def append_result_to_csv(result_dict):
    file_exists = os.path.exists(RESULT_FILE)

    with open(RESULT_FILE, mode='a', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=result_dict.keys())

        if not file_exists:
            writer.writeheader()

        writer.writerow(result_dict)

# Take screenshot of Gazebo window
def take_gazebo_screenshot(trial_id, strategy_name):
    timestamp = time.strftime("%Y%m%d-%H%M%S")
    filename = f"fail_screenshots/{strategy_name}_trial{trial_id}_{timestamp}.png"

    # Make sure directory exists
    subprocess.call(['mkdir', '-p', 'fail_screenshots'])

    # Take screenshot (this works if using Gazebo client GUI)
    subprocess.call(['import', '-window', 'Gazebo', filename])

    print(f"📸 Screenshot saved: {filename}")

# Run single trial
def run_trial(x, y, trial_id, strategy_name):
    global is_stuck, is_flipped
    is_stuck = False
    is_flipped = False
    from assignment_2_2024.msg import PlanningGoal

    goal = PlanningGoal()
    goal.target_pose.pose.position.x = x
    goal.target_pose.pose.position.y = y

    client.send_goal(goal)
    start_time = time.time()

    while not rospy.is_shutdown():
        elapsed = time.time() - start_time

        # Check for success
        if client.get_state() == GoalStatus.SUCCEEDED:
            print(f"Goal reached ({x}, {y}) with time: {round(elapsed, 2)}")
            return {
                "trial": trial_id,"strategy": strategy_name,"x": round(x, 2), "y": round(y, 2), 
                "success": 1, "time": round(elapsed, 2), "reason": ""
            }        

        # Check stuck detection
        if is_stuck:
            client.cancel_goal()
            print(f"🛑 Robot stuck at ({x}, {y}), aborted early")
            take_gazebo_screenshot(trial_id, strategy_name)
            
            return {
                 "trial": trial_id, "strategy": strategy_name, "x": round(x, 2), "y": round(y, 2),
                "success": 0, "time": round(elapsed, 2), "reason": "stuck"
            }
        
        if is_flipped:
            client.cancel_goal()
            print(f"🛑 Robot flipped ({x}, {y}), aborted early")
            take_gazebo_screenshot(trial_id, strategy_name)
            
            return {
                 "trial": trial_id, "strategy": strategy_name, "x": round(x, 2), "y": round(y, 2),
                "success": 0, "time": round(elapsed, 2), "reason": "flipped"
            }
        
        # Check timeout
        if elapsed > TIMEOUT:
            client.cancel_goal()
            print(f"🛑 Timeout elapsed: loop, aborted early")
            take_gazebo_screenshot(trial_id, strategy_name)
            
            return {
                 "trial": trial_id, "x": round(x, 2), "y": round(y, 2), "strategy": strategy_name,
                "success": 0, "time": round(elapsed, 2), "reason": "loop"
            }

        time.sleep(0.5)  # short sleep to avoid busy loop


# Run trials for one strategy
def run_all_trials(goals, strategy_name="A"):
    results = []

    for i, (x, y) in enumerate(goals):
        print(f"🚀 Trial {i+1}/{len(goals)}: Sending goal ({x:.2f}, {y:.2f})")
        
        # respawn function before each trial
        respawn_robot()
        time.sleep(1)
        
        result = run_trial(x, y, trial_id=i, strategy_name=strategy_name)
        results.append(result)
        append_result_to_csv(result)

    return results

In [8]:
# ---- Subscribers ----
rospy.Subscriber("/odom", Odometry, odom_callback)

<rospy.topics.Subscriber at 0x7c023ef70e20>

In [9]:
rospy.Subscriber("/scan", LaserScan, scan_callback)

<rospy.topics.Subscriber at 0x7c023ef708e0>

In [None]:
# ---- User Interface to set target, display info
ui = VBox([
    info_pos,info_vel,info_obs
])

display(ui)
goals = load_or_generate_goals(30)
#run_all_trials(goals, strategy_name="A")
run_all_trials(goals, strategy_name="B")

VBox(children=(Label(value='Current Pos: (0.00, 0.00)'), Label(value='Velocity: 0.00 lin / 0.00 ang'), Label(v…

📄 Loaded 30 goals from goals.csv
🚀 Trial 1/30: Sending goal (7.22, 1.48)
Goal reached (7.222576884699727, 1.4766319271340898) with time: 19.04
🚀 Trial 2/30: Sending goal (2.87, 3.53)
Goal reached (2.8732916876807817, 3.530926359757343) with time: 15.04
🚀 Trial 3/30: Sending goal (4.22, 0.70)
Goal reached (4.219997222063153, 0.7046274325070918) with time: 13.53
🚀 Trial 4/30: Sending goal (-8.07, -3.81)
🛑 Robot stuck at (-8.071364053352466, -3.8102389037789255), aborted early
📸 Screenshot saved: fail_screenshots/B_trial3_20250524-113521.png
🚀 Trial 5/30: Sending goal (-4.41, -7.47)
Goal reached (-4.413084044115051, -7.470030243490727) with time: 69.16
🚀 Trial 6/30: Sending goal (-7.67, 4.27)
