In [None]:
%pip install requests

import cv2
import numpy as np
import pyrealsense2 as rs
from ultralytics import YOLO
import os
from datetime import datetime
from utils import get_real_world_coordinates, transform_coordinates, create_zip_archive, process_images

def calculate_2d_angle(center_point, grasp_point):
    """Calculate 2D angle in degrees between grasp direction and positive x-axis."""
    delta_x = grasp_point[0] - center_point[0]
    delta_y = center_point[1] - grasp_point[1]  # Inverted because y increases downward
    angle_rad = np.arctan2(delta_y, delta_x)
    angle_deg = np.degrees(angle_rad)
    return (angle_deg + 360) % 360

def calculate_3d_angle(center_xyz, grasp_xyz):
    """Calculate 3D angles (theta and phi) in spherical coordinates."""
    rel_pos = grasp_xyz - center_xyz
    r = np.linalg.norm(rel_pos)
    
    # Calculate theta (azimuthal angle in x-y plane from x-axis)
    theta = np.degrees(np.arctan2(rel_pos[1], rel_pos[0]))
    theta = (theta + 360) % 360
    
    # Calculate phi (polar angle from z-axis)
    phi = np.degrees(np.arccos(rel_pos[2] / r))
    
    return theta, phi, r

def determine_pickup_mode(angle_2d):
    """Determine pickup mode based on 2D angle."""
    normalized_angle = angle_2d if angle_2d <= 180 else angle_2d - 180
    threshold = 30
    
    if (normalized_angle <= threshold or normalized_angle >= 180 - threshold):
        return "HORIZONTAL PICKUP"
    elif (90 - threshold <= normalized_angle <= 90 + threshold):
        return "VERTICAL PICKUP"
    else:
        return "UNDEFINED"

def remap_angle(theta, in_min=250, in_max=150, out_min=30, out_max=125):
    """Remap theta angle to robot angle range."""
    robot_angle = (theta - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
    return 180 - robot_angle

def capture_and_process_grasp(target_classes=['bottle'], grasp_object='bottle'):
    # Initialize RealSense
    pipeline = rs.pipeline()
    config = rs.config()
    config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
    config.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 30)
    
    # Start streaming
    profile = pipeline.start(config)
    align = rs.align(rs.stream.color)
    depth_profile = rs.video_stream_profile(profile.get_stream(rs.stream.depth))
    depth_intrinsics = depth_profile.get_intrinsics()

    # Initialize YOLO for object detection
    yolo_model = YOLO('yolov8l-world.pt')
    yolo_model.set_classes(target_classes)

    try:
        while True:
            # Show preview frames
            frames = pipeline.wait_for_frames()
            color_frame = frames.get_color_frame()
            if not color_frame:
                continue

            # Convert to numpy array for display
            color_image = np.asanyarray(color_frame.get_data())
            cv2.imshow('Press "s" to capture, "q" to quit', color_image)
            
            key = cv2.waitKey(1)
            if key == ord('s'):
                # Get aligned frames
                frames = pipeline.wait_for_frames()
                aligned_frames = align.process(frames)
                depth_frame = aligned_frames.get_depth_frame()
                color_frame = aligned_frames.get_color_frame()
                
                if not depth_frame or not color_frame:
                    print("Failed to capture frames")
                    continue

                # Create directories and save frames
                root_dir = "captured_frames"
                current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
                folder_name = f"{root_dir}/captured_frame_{current_time}"
                
                os.makedirs(f'{folder_name}/rgb_image', exist_ok=True)
                os.makedirs(f'{folder_name}/depth_image', exist_ok=True)
                os.makedirs(f'{folder_name}/output', exist_ok=True)

                color_image = np.asanyarray(color_frame.get_data())
                depth_image = np.asanyarray(depth_frame.get_data())
                
                cv2.imwrite(f'{folder_name}/rgb_image/image_0.jpg', color_image)
                np.save(f'{folder_name}/depth_image/image_0.npy', depth_image)

                # Create zip archives
                rgb_zip_path = f"{folder_name}/rgb.zip"
                depth_zip_path = f"{folder_name}/depth.zip"
                create_zip_archive(f'{folder_name}/rgb_image', rgb_zip_path)
                create_zip_archive(f'{folder_name}/depth_image', depth_zip_path)

                # Process frames for grasp detection
                decoded_csv_data, keypoints = process_images(rgb_zip_path, depth_zip_path, output_dir=f"{folder_name}/output")
                
                # Detect object
                results = yolo_model(color_image)
                object_center = None
                for result in results[0].boxes.data:
                    x1, y1, x2, y2, conf, class_id = result
                    if yolo_model.names[int(class_id)] == grasp_object:
                        center_x = int((x1 + x2) / 2)
                        center_y = int((y1 + y2) / 2)
                        object_center = (center_x, center_y)
                        break

                if keypoints and len(keypoints) > 0 and object_center:
                    keypoint_3, keypoint_7 = keypoints[0][3], keypoints[0][7]
                    
                    # Get real world coordinates
                    real_world_point_3 = get_real_world_coordinates(folder_name, keypoint_3[0], keypoint_3[1])
                    real_world_point_7 = get_real_world_coordinates(folder_name, keypoint_7[0], keypoint_7[1])
                    real_world_object = get_real_world_coordinates(folder_name, object_center[0], object_center[1])
                    
                    # Transform coordinates
                    transformed_point_3 = transform_coordinates(*real_world_point_3)
                    transformed_point_7 = transform_coordinates(*real_world_point_7)
                    transformed_object = transform_coordinates(*real_world_object)

                    # Calculate grasp center
                    center_x = (keypoint_3[0] + keypoint_7[0]) // 2
                    center_y = (keypoint_3[1] + keypoint_7[1]) // 2
                    grasp_center = (center_x, center_y)
                    
                    # Get real world coordinates for grasp center
                    real_world_grasp = get_real_world_coordinates(folder_name, center_x, center_y)
                    transformed_grasp = transform_coordinates(*real_world_grasp)

                    # Calculate angles
                    angle_2d = calculate_2d_angle(object_center, grasp_center)
                    theta, phi, radius = calculate_3d_angle(
                        np.array(transformed_object), 
                        np.array(transformed_grasp)
                    )
                    robot_angle = remap_angle(theta)
                    pickup_mode = determine_pickup_mode(angle_2d)

                    # Visualize results
                    vis_img = color_image.copy()
                    
                    # Draw keypoints and grasp line
                    cv2.circle(vis_img, (int(keypoint_3[0]), int(keypoint_3[1])), 5, (0, 0, 255), -1)  # Red
                    cv2.circle(vis_img, (int(keypoint_7[0]), int(keypoint_7[1])), 5, (255, 0, 0), -1)  # Blue
                    cv2.line(vis_img, (int(keypoint_3[0]), int(keypoint_3[1])), 
                            (int(keypoint_7[0]), int(keypoint_7[1])), (0, 255, 0), 2)  # Green line
                    
                    # Draw object center and grasp center
                    cv2.circle(vis_img, object_center, 5, (255, 255, 0), -1)  # Cyan
                    cv2.circle(vis_img, grasp_center, 5, (0, 255, 255), -1)  # Yellow

                    # Add text for angles and modes
                    cv2.putText(vis_img, f"2D Angle: {angle_2d:.2f}°", (10, 30), 
                              cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                    cv2.putText(vis_img, f"3D Theta: {theta:.2f}°", (10, 60), 
                              cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                    cv2.putText(vis_img, f"Robot Angle: {robot_angle:.2f}°", (10, 90), 
                              cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                    cv2.putText(vis_img, f"Mode: {pickup_mode}", (10, 120), 
                              cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

                    # Save results to JSON
                    results_data = {
                        'timestamp': current_time,
                        'angles': {
                            '2d_angle': float(angle_2d),
                            '3d_theta': float(theta),
                            '3d_phi': float(phi),
                            'robot_angle': float(robot_angle)
                        },
                        'pickup_mode': pickup_mode,
                        'keypoints': {
                            'point_3': transformed_point_3.tolist(),
                            'point_7': transformed_point_7.tolist(),
                            'grasp_center': transformed_grasp.tolist(),
                            'object_center': transformed_object.tolist()
                        }
                    }
                    
                    import json
                    with open(f'{folder_name}/output/grasp_analysis.json', 'w') as f:
                        json.dump(results_data, f, indent=4)

                    cv2.imshow('Grasp Analysis', vis_img)
                    cv2.waitKey(0)

                    # Save visualization
                    cv2.imwrite(f'{folder_name}/output/analyzed_grasp.jpg', vis_img)
                    print(f"Results saved in {folder_name}")
                    break
                else:
                    print("No keypoints or object detected")

            elif key == ord('q'):
                break

    finally:
        pipeline.stop()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    capture_and_process_grasp()

Note: you may need to restart the kernel to use updated packages.


RuntimeError: Couldn't resolve requests