In [1]:
from nav2_simple_commander.robot_navigator import BasicNavigator

import rclpy as rp
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.duration import Duration

import tf2_ros
# from geometry_msgs.msg import PoseStamped
from geometry_msgs.msg import TransformStamped
from visualization_msgs.msg import MarkerArray

import time

import cv2
# import rclpy as rp
import numpy as np
import logging

# from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import PoseStamped, PoseArray
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
from visualization_msgs.msg import Marker, MarkerArray
from cv_bridge import CvBridge
from ament_index_python.packages import get_package_share_directory
from tf_transformations import quaternion_from_matrix

In [2]:
import rclpy as rp
import numpy as np
import os

from ultralytics import YOLO
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge

from yolo_inference.msg import InferenceResult
from yolo_inference.msg import Yolov8Inference

from ament_index_python.packages import get_package_share_directory

class ObjectDetection(Node):
	def __init__(self):

		super().__init__('object_dectection_node')
		self.cv_bridge = CvBridge()
		self.model_name_param = self.declare_parameter('model_name', 'yolov8n.pt')
		self.pkg_share_path = get_package_share_directory('yolo_pkg')
		self.model_dir = os.path.join(self.pkg_share_path, 'models')
		self.model_name = self.model_name_param.value
		self.model_path = os.path.join(self.model_dir, self.model_name)
		self.model = YOLO(self.model_path)
		self.get_logger().info(f"model : {self.model_path}")

		self.yolov8_inference = Yolov8Inference()

		self.camera_sub = self.create_subscription(
			Image,
			'image_raw',
			self.camera_callback,
			10
		)
		self.camera_sub

		self.yolov8_pub = self.create_publisher(Yolov8Inference, "/yolov8_inference", 1)
		self.img_pub = self.create_publisher(Image, "/inference_result", 1)
        
		self.img_size = 0
		self.is_detected = False



	def camera_callback(self, msg):
		self.is_detected = False

		img = self.cv_bridge.imgmsg_to_cv2(msg, "bgr8")
		results = self.model(img)
        
		if self.img_size == 0:
			self.img_size = msg.height * msg.width

		self.yolov8_inference.header.frame_id = "inference"
		self.yolov8_inference.header.stamp = self.get_clock().now().to_msg()

		for result in results:
			boxes = result.boxes.data.cpu()
			# box = np.array(boxes)
			for box in boxes:
				self.inference_result = InferenceResult()
				label = int(box[5])
				self.inference_result.label = self.model.names[label]
				self.inference_result.top = int(box[0])
				self.inference_result.top_left = int(box[1])
				self.inference_result.bottom = int(box[2])
				self.inference_result.bottom_right = int(box[3])
				self.inference_result.conf = float(box[4])
				self.yolov8_inference.yolov8_inference.append(self.inference_result)
				
				each_box_size = (self.inference_result.bottom - self.inference_result.top) * (self.inference_result.bottom_right - self.inference_result.top_left)
				print(f"each_box_size: {each_box_size}")
				print(f"img_size: {self.img_size}")
				print("self.inference_result.label type:", type(self.inference_result.label))
				print(self.inference_result.label)
				if each_box_size / self.img_size >= 0.6 and self.inference_result.label == "person":
					self.is_detected = True

		annotated_frame = results[0].plot()
		img_msg = self.cv_bridge.cv2_to_imgmsg(annotated_frame)

# 		print("img_msg in ObjectionDetection: ")
# 		print(img_msg)
# 		print("self.yolov8_inference in ObjectionDetection: ")
# 		print(self.yolov8_inference)
        
		self.img_pub.publish(img_msg)
		self.yolov8_pub.publish(self.yolov8_inference)
		self.yolov8_inference.yolov8_inference.clear()


In [3]:
rp.init()

nav = BasicNavigator()
nav.waitUntilNav2Active()

[INFO] [1691397175.647579555] [basic_navigator]: Setting initial pose
[INFO] [1691397175.649319783] [basic_navigator]: Publishing Initial Pose
[INFO] [1691397175.650269414] [basic_navigator]: Waiting for amcl_pose to be received
[INFO] [1691397175.651417036] [basic_navigator]: Setting initial pose
[INFO] [1691397175.652242388] [basic_navigator]: Publishing Initial Pose
[INFO] [1691397175.653013539] [basic_navigator]: Waiting for amcl_pose to be received
[INFO] [1691397175.653744701] [basic_navigator]: Setting initial pose
[INFO] [1691397175.654457993] [basic_navigator]: Publishing Initial Pose
[INFO] [1691397175.655168710] [basic_navigator]: Waiting for amcl_pose to be received
[INFO] [1691397177.658475386] [basic_navigator]: Nav2 is ready for use!


In [4]:
od = ObjectDetection()

[INFO] [1691397181.216918533] [object_dectection_node]: model : /home/ksh/yolo_ws/install/yolo_pkg/share/yolo_pkg/models/yolov8n.pt


In [5]:
def move_with_long_delay(arg_x, arg_y, arg_w, arg_z):
    
    print(f"Move to pose: x {arg_x}, y {arg_y}, orientation: w {arg_w}, z {arg_z}")
    
    goal_pose1 = PoseStamped()
    goal_pose1.header.frame_id = 'map'
    goal_pose1.header.stamp = nav.get_clock().now().to_msg()
    goal_pose1.pose.position.x = arg_x
    goal_pose1.pose.position.y = arg_y
    goal_pose1.pose.orientation.w = arg_w
    goal_pose1.pose.orientation.z = arg_z

    nav.goToPose(goal_pose1)

    i = 0
    while not nav.isTaskComplete():
        i = i + 1

        rp.spin_once(od)
        if (od.is_detected == True):
            print("Is detected!!!!!!")
            time.sleep(2)
            nav.spin(1.57)
            time.sleep(4)
            nav.backup(0.5, 0.5)
            nav.goToPose(goal_pose1)
        
        feedback = nav.getFeedback()
        if feedback and i % 5 == 0:
            print('Distance remaining: ' + '{:.2f}'.format(
            feedback.distance_remaining) + 'meters.')
#             print(feedback.navigation_time)

            if feedback.distance_remaining < 0.3 and Duration.from_msg(feedback.navigation_time) > Duration(seconds=5.0):
                nav.cancelTask()
            

def move_with_short_delay(arg_x, arg_y, arg_w, arg_z):
    
    print(f"Move to pose: x {arg_x}, y {arg_y}, orientation: w {arg_w}, z {arg_z}")
    
    goal_pose1 = PoseStamped()
    goal_pose1.header.frame_id = 'map'
    goal_pose1.header.stamp = nav.get_clock().now().to_msg()
    goal_pose1.pose.position.x = arg_x
    goal_pose1.pose.position.y = arg_y
    goal_pose1.pose.orientation.w = arg_w
    goal_pose1.pose.orientation.z = arg_z

    nav.goToPose(goal_pose1)

    i = 0
    while not nav.isTaskComplete():
        i = i + 1

        feedback = nav.getFeedback()
        if feedback and i % 5 == 0:
            print('Distance remaining: ' + '{:.2f}'.format(
            feedback.distance_remaining) + 'meters.')
#             print(feedback.navigation_time)

            if feedback.distance_remaining < 0.3 and Duration.from_msg(feedback.navigation_time) > Duration(seconds=5.0):
                nav.cancelTask()

In [None]:

print("Move to B")
move_with_long_delay(1.0, 0.15, 0., 0.) # to B
# turn back
print("Turning right(90)")
time.sleep(2)
nav.spin(-1.57)
time.sleep(4)
# move_with_short_delay(1.15, 0.15, 1., -1.) # turn 90

# go to C
print("Move to C")
move_with_long_delay(1.7, -0.15, 0., 0.) # to C
print("Turning spin(360)")
# robot spin
time.sleep(2)
nav.spin(6.28)
time.sleep(8)
# move_with_short_delay(1.5, 1., 1., 1.)
# move_with_short_delay(1.5, 1., 0., 0.)
# move_with_short_delay(1.5, 1., 1., -1.)
# move_with_short_delay(1.5, 1., 0., 1.)
# move_with_short_delay(1.5, 1., 1., 1.)

print("Move to D")
# go to D
move_with_long_delay(1.5, 1., 1., 1.) # to D

# print end
print("End!!!!!!!")


In [None]:
nav.spin(6.28)
time.sleep(4)
move_with_long_delay(1.5, 1., 1., 1.) # to C

In [None]:
nav.spin(1.57)
time.sleep(2)
move_with_long_delay(0., 0., 0., 0.)

In [None]:
move_with_long_delay(0., 0., 0., 0.)

In [None]:
nav.backup(0.15, 0.25)

In [None]:
import threading

# Define your goToPose() function here

def spin_nav2():
    # Replace this with the actual code to start and run nav2.spin()
    # For example, you might have a function like nav2_run() that starts and runs the navigation stack.
#     nav2_run()
    nav.spin(6.28)

# Create a thread for the nav2.spin()
nav2_thread = threading.Thread(target=spin_nav2)

# Start the thread
nav2_thread.start()


# nav.spin(6.28)
move_with_long_delay(1.5, 1., 1., 1.) # to C

# Now, you can call your goToPose() function here
# goToPose()

# Wait for the nav2_thread to finish (optional)
nav2_thread.join()

# Rest of your code...


In [19]:
time.sleep(2)
nav.backup(0.25, 0.5)
nav.spin(-0.77)
time.sleep(4)
nav.backup(0.25, 0.5)

[INFO] [1691397546.563633804] [basic_navigator]: Backing up 0.25 m at 0.5 m/s....
[INFO] [1691397546.568538801] [basic_navigator]: Spinning to angle -0.77....
[INFO] [1691397550.576777317] [basic_navigator]: Backing up 0.25 m at 0.5 m/s....


True