-
Notifications
You must be signed in to change notification settings - Fork 240
Open
Description
hi, i try to make script of drone that use realsense and move the drone at x,y origin to get the red obstacle
the problem is that the drone change its altitude after he get the first command from the camera
import asyncio
import math
from typing import Union, Tuple, Optional
import cv2
import mavsdk
import numpy as np
import pyrealsense2.pyrealsense2 as rs
from cv2 import Mat
from mavsdk.offboard import OffboardError, PositionNedYaw
MOVE_ACCURACY = 0.1
FRAME_HEIGHT = 480
FRAME_WIDTH = 640
CAMERA_X_FOV_DEGREES = 70.0
CAMERA_Y_FOV_DEGREES = 43.0
async def setup_connect() -> mavsdk.System:
"""
Connect to the drone and set up initial conditions.
"""
drone = mavsdk.System()
await drone.connect(system_address="udp://:14540")
print("Waiting for drone to have a global position estimate...")
async for health in drone.telemetry.health():
if health.is_global_position_ok and health.is_home_position_ok:
print("-- Global position estimate OK")
break
return drone # Return the connected drone object
async def setup_drone(drone: mavsdk.System):
# Rest of the function remains the same
"""
Connect to the drone and set up initial conditions.
"""
drone = mavsdk.System()
await drone.connect(system_address="udp://:14540")
print("Waiting for drone to have a global position estimate...")
async for health in drone.telemetry.health():
if health.is_global_position_ok and health.is_home_position_ok:
print("-- Global position estimate OK")
break
print("-- Arming")
await drone.action.arm()
print("-- Setting initial setpoint")
await drone.offboard.set_position_ned(PositionNedYaw(0.0, 0.0, 0.0, 0.0))
print("-- Starting offboard")
try:
await drone.offboard.start()
except OffboardError as error:
print(f"Starting offboard mode failed with error code: {error._result.result}")
print("-- Disarming")
await drone.action.disarm()
raise
return drone
async def move_drone(drone: mavsdk.System, north_m: float, east_m: float, down_m: float, yaw_deg: float = 0.0):
"""
Move the drone to a specified position.
"""
await drone.offboard.set_position_ned(PositionNedYaw(north_m, east_m, down_m, yaw_deg))
while True:
await asyncio.sleep(0.1)
position_ned = await drone.telemetry.position_velocity_ned().__aiter__().__anext__()
north_m_distance = position_ned.position.north_m - north_m
east_m_distance = position_ned.position.east_m - east_m
down_m_distance = position_ned.position.down_m - down_m
if (
MOVE_ACCURACY > north_m_distance > -MOVE_ACCURACY
and MOVE_ACCURACY > east_m_distance > -MOVE_ACCURACY
and MOVE_ACCURACY > down_m_distance > -MOVE_ACCURACY
):
break
def setup_camera() -> rs.pipeline:
"""
Set up the camera pipeline.
"""
camera = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, FRAME_WIDTH, FRAME_HEIGHT, rs.format.z16, 30)
config.enable_stream(rs.stream.color, FRAME_WIDTH, FRAME_HEIGHT, rs.format.bgr8, 30)
camera.start(config)
return camera
def wait_for_frames(camera: rs.pipeline, tries: int = 5, timeout_ms: int = 2000) -> Optional[rs.composite_frame]:
"""
Wait for frames from the camera.
"""
for _ in range(tries):
try:
frames = camera.wait_for_frames(timeout_ms=timeout_ms)
except RuntimeError:
pass
else:
if frames:
return frames
print("Found no Frames")
return None
def get_depth_frame(frames: rs.composite_frame) -> Optional[rs.depth_frame]:
"""
Get the depth frame from the composite frame.
"""
pipeline_depth_frame = frames.get_depth_frame()
return pipeline_depth_frame
def get_current_frame(frames: rs.composite_frame) -> np.ndarray:
"""
Get the current color frame from the composite frame.
"""
current_color_frame: rs.video_frame = frames.get_color_frame()
color_image = np.asanyarray(current_color_frame.get_data())
hsv_colored_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2HSV)
return hsv_colored_image
def _get_red_mask(current_image: np.ndarray) -> np.ndarray:
"""
Create a mask for red objects in the image.
"""
lower_red1 = np.array([0, 100, 100])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([160, 100, 100])
upper_red2 = np.array([180, 255, 255])
range_1_mask = cv2.inRange(current_image, lower_red1, upper_red1)
range_2_mask = cv2.inRange(current_image, lower_red2, upper_red2)
red_mask = cv2.bitwise_or(range_1_mask, range_2_mask)
kernel = np.ones((5, 5), np.uint8)
red_mask = cv2.erode(red_mask, kernel, iterations=1)
red_mask = cv2.dilate(red_mask, kernel, iterations=2)
return red_mask
def find_largest_obstacle(current_image: np.ndarray) -> Optional[np.ndarray]:
"""
Find the largest red object in the image.
"""
red_mask = _get_red_mask(current_image)
contours, _ = cv2.findContours(red_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
largest_area = 0
largest_obstacle = None
for contour in contours:
area = cv2.contourArea(contour)
if area > largest_area:
largest_area = area
largest_obstacle = contour
return largest_obstacle
def get_obstacle_center(obstacle: np.ndarray) -> Tuple[int, int]:
"""
Get the center coordinates of the bounding box of the obstacle.
"""
x, y, w, h = cv2.boundingRect(obstacle)
center_x = x + w // 2
center_y = y + h // 2
return center_x, center_y
def draw_obstacle(obstacle: np.ndarray, current_frame: np.ndarray, depth_value: float):
"""
Draw the bounding box and depth information of the obstacle on the current frame.
"""
x, y, w, h = cv2.boundingRect(obstacle)
cv2.rectangle(current_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
distance_text = f"Depth: {depth_value:.2f} meters"
cv2.putText(
current_frame,
distance_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 255, 0),
2,
cv2.LINE_AA
)
def calculate_distance_to_obstacle(
obstacle_location: Tuple[int, int],
current_depth_frame: rs.depth_frame
) -> Tuple[float, float]:
"""
Calculate the distance to the obstacle in NED coordinates.
"""
camera_center_x = FRAME_WIDTH // 2
camera_center_y = FRAME_HEIGHT // 2
vector_x_size = obstacle_location[0] - camera_center_x
vector_y_size = obstacle_location[1] - camera_center_y
depth_value = current_depth_frame.get_distance(*obstacle_location)
east_direction = vector_x_size * math.tan(math.radians(CAMERA_X_FOV_DEGREES / 2)) * (depth_value / FRAME_WIDTH)
north_direction = vector_y_size * math.tan(math.radians(CAMERA_Y_FOV_DEGREES / 2)) * (depth_value / FRAME_HEIGHT)
return north_direction, east_direction
async def move_to_obstacle(drone: mavsdk.System, current_depth_frame: rs.depth_frame,
obstacle_location: Tuple[int, int]):
"""
Move the drone to the obstacle's location.
"""
north_direction, east_direction = calculate_distance_to_obstacle(obstacle_location, current_depth_frame)
await move_drone(drone, north_direction, east_direction, 0)
async def start_drone(drone: mavsdk.System):
"""
Start the drone at a specified height.
"""
try:
height = float(input("Enter drone height [in meters]: "))
except ValueError:
print("Invalid input. Please enter a valid number.")
return
position_ned = await drone.telemetry.position_velocity_ned().__aiter__().__anext__()
position = position_ned.position
await move_drone(drone, position.north_m, position.east_m, -height)
async def main():
drone = await setup_connect()
await setup_drone(drone) # Call this function once to set up the drone
camera = setup_camera()
await start_drone(drone) # Pass the drone object to this function
while True:
frames = wait_for_frames(camera)
if frames:
current_depth_frame = get_depth_frame(frames)
current_color_frame = get_current_frame(frames)
largest_obstacle = find_largest_obstacle(current_color_frame)
if largest_obstacle is not None:
obstacle_location = get_obstacle_center(largest_obstacle)
draw_obstacle(largest_obstacle, current_color_frame, current_depth_frame.get_distance(*obstacle_location))
print(f">> Moving to obstacle [{obstacle_location}]")
await move_to_obstacle(drone, current_depth_frame, obstacle_location)
print(">> Moved to obstacle")
cv2.imshow("Drone Vision", current_color_frame)
key = cv2.waitKey(1)
if key == 27: # Press 'Esc' to exit
break
cv2.destroyAllWindows()
if __name__ == "__main__":
asyncio.run(main())
thanks for the help
Metadata
Metadata
Assignees
Labels
No labels