In [3]:
import socket
import cv2
import numpy as np
import time
from ultralytics import YOLO

# YOLO Model Load
model = YOLO('best.pt')  # Adjust if needed

# TCP/IP settings for simulation
HOST = '127.0.0.1'
PORT = 2000
SIMULATION_MODE = True  # Simulation mode flag

# Global connection variable
tcp_connected = False

# Drop coordinates for different battery types
drop_coordinates = {
    '9v': (12.434, 464.768, 314.318),  # P0
    'D type': (-275.251, 374.701, 314.318),  # P1
    'AA': (313.466, 343.370, 314.318)  # P2
}

# Load corner points calibration data
corner_points = np.load('corner_points.npz')['corner_points']
tl, tr, bl, br = corner_points[:4]

# Define class names (must match your trained model)
class_names = ['AA', 'D type', '9v']

# Initialize TCP connection for simulation
if SIMULATION_MODE:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.connect((HOST, PORT))
        tcp_connected = True
        print(f"Connected to {HOST} on port {PORT}")
    except ConnectionRefusedError:
        print(f"Connection to {HOST}:{PORT} refused. Make sure the simulator is running.")
        exit()

# Function to move robot via TCP/IP in simulation mode
def move_to(x, y, z):
    command = f"MOVE {x} {y} {z}\r\n"
    if tcp_connected:
        s.sendall(command.encode('utf-8'))
        print(f"Command sent: {command.strip()}")
    else:
        print("Error: Not connected to the simulator.")
    time.sleep(20)  # Simulate movement delay of 20 seconds

# Function to pick a battery
def pick_battery(x, y, z_pick):
    move_to(x, y, z_pick)  # Move to pick position
    time.sleep(1)  # Simulate pick delay
    print("Battery picked.")

# Function to drop the battery
def drop_battery(battery_type):
    if battery_type in drop_coordinates:
        x_drop, y_drop, z_drop = drop_coordinates[battery_type]
        move_to(x_drop, y_drop, z_drop)  # Move to the specified drop point
        print(f"Battery dropped at {battery_type} position: {x_drop}, {y_drop}, {z_drop}")
    else:
        print(f"Error: Unknown battery type {battery_type}")

# Function to map detected position to normalized grid coordinates
def map_to_grid(x, y, tl, tr, bl, br):
    # Compute vectors for mapping to normalized grid coordinates
    v1 = np.array([tr[0] - tl[0], tr[1] - tl[1]])
    v2 = np.array([bl[0] - tl[0], bl[1] - tl[1]])
    vp = np.array([x - tl[0], y - tl[1]])

    # Calculate normalized grid coordinates (u, v)
    u = np.dot(vp, v1) / np.dot(v1, v1)
    v = np.dot(vp, v2) / np.dot(v2, v2)

    return u, v

# Function to convert grid coordinates to robot coordinates
def grid_to_robot(u, v, tl, tr, bl, br):
    # Calculate the robot coordinates (X, Y, Z) from the normalized grid coordinates (u, v)
    x_robot = tl[0] + u * (tr[0] - tl[0]) + v * (bl[0] - tl[0])
    y_robot = tl[1] + u * (tr[1] - tl[1]) + v * (bl[1] - tl[1])
    z_robot = tl[2] + u * (tr[2] - tl[2]) + v * (bl[2] - tl[2])  # Adjust for height if necessary

    return x_robot, y_robot, z_robot

# Main function to detect batteries and command the robot to pick and drop them
def detect_and_sort_batteries():
    camera_index = 1  # Adjust based on your camera setup
    cap = cv2.VideoCapture(camera_index)

    if not cap.isOpened():
        print("Error: Could not open camera.")
        exit()

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Error: Failed to capture image.")
                break

            # YOLO detection
            results = model(frame)[0]

            # Iterate through the detected objects
            detections = []
            for result in results.boxes.data:
                x1, y1, x2, y2, confidence, class_id = result.tolist()
                x_center = (x1 + x2) / 2
                y_center = (y1 + y2) / 2

                # Map detected position to grid and convert to robot coordinates
                u, v = map_to_grid(x_center, y_center, tl, tr, bl, br)
                x_robot, y_robot, z_robot = grid_to_robot(u, v, tl, tr, bl, br)

                # Round robot coordinates
                x_robot, y_robot, z_robot = round(x_robot), round(y_robot), round(z_robot)

                battery_type = class_names[int(class_id)]

                # Save detection information
                detections.append({
                    'battery_type': battery_type,
                    'robot_coordinates': (x_robot, y_robot, z_robot)
                })

                # Draw bounding box and labels
                cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
                label = f'{battery_type} {confidence:.2f}'
                cv2.putText(frame, label, (int(x1), int(y1 - 10)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

                # Display robot coordinates
                coords_label = f'X: {x_robot}, Y: {y_robot}, Z: {z_robot}'
                cv2.putText(frame, coords_label, (int(x_center), int(y_center)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

                # Simulate robot moving to pick and drop
                move_to(x_robot, y_robot, z_robot + 50)  # Move above battery
                pick_battery(x_robot, y_robot, z_robot)  # Pick the battery
                drop_battery(battery_type)  # Drop at appropriate location

            # Display the frame with detections
            cv2.imshow('Battery Detection', frame)

            # Print detected information
            for detection in detections:
                print(f"Detected {detection['battery_type']} at robot coordinates {detection['robot_coordinates']}")

            # Add a 20-second delay after robot action
            time.sleep(20)

            # Break the loop if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    except KeyboardInterrupt:
        print("Detection stopped by user.")
    finally:
        cap.release()
        cv2.destroyAllWindows()

        # Close TCP connection after sorting
        if tcp_connected:
            s.close()
            print("TCP connection closed.")

# Main function to start detection and sorting
def main():
    print("Starting battery detection and sorting...")
    detect_and_sort_batteries()

if __name__ == '__main__':
    main()


Connection to 127.0.0.1:2000 refused. Make sure the simulator is running.
Starting battery detection and sorting...


KeyboardInterrupt: 