In [None]:
#a_star_maze_navigator

import csv
import cv2
import numpy as np
import heapq
import math

# Global variable to store points selected on the maze
selected_points = []

def mouse_click_handler(event, pos_x, pos_y, flags, param):
    """
    Handle mouse click events to store points on the maze image.
    """
    global selected_points
    if event == cv2.EVENT_LBUTTONDOWN:
        selected_points.append((pos_y, pos_x))  # Save point as (row, col)
        print(f"Selected Point: ({pos_x}, {pos_y})")

def clean_maze_image(maze_img):
    """
    Preprocess maze image to produce a binary black and white image.
    """
    gray_maze = cv2.cvtColor(maze_img, cv2.COLOR_BGR2GRAY)
    _, binary_maze = cv2.threshold(gray_maze, 127, 255, cv2.THRESH_BINARY)
    return binary_maze

def generate_cost_matrix(binary_maze):
    """
    Generate a cost matrix using distance transform.
    """
    dist_matrix = cv2.distanceTransform(binary_maze, cv2.DIST_L2, 5)
    cost_matrix = np.max(dist_matrix) - dist_matrix
    cost_matrix[binary_maze == 0] = np.inf
    return cost_matrix

def calculate_heuristic(current_point, goal_point):
    """
    Calculate the heuristic for A* using Manhattan distance.
    """
    return abs(current_point[0] - goal_point[0]) + abs(current_point[1] - goal_point[1])

def perform_a_star_search(binary_maze, cost_matrix, start_coord, goal_coord):
    """
    Execute A* algorithm with cost considerations.
    """
    if binary_maze[start_coord] == 0 or binary_maze[goal_coord] == 0:
        raise ValueError("Invalid start or goal point. Adjust the points.")

    rows, cols = binary_maze.shape
    open_queue = []
    heapq.heappush(open_queue, (0 + calculate_heuristic(start_coord, goal_coord), 0, start_coord))
    previous_node = {}
    g_cost = {start_coord: 0}
    f_cost = {start_coord: calculate_heuristic(start_coord, goal_coord)}
    movement_directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]

    while open_queue:
        _, current_g, current_node = heapq.heappop(open_queue)
        if current_node == goal_coord:
            final_path = []
            while current_node in previous_node:
                final_path.append(current_node)
                current_node = previous_node[current_node]
            final_path.append(start_coord)
            return final_path[::-1]

        for move in movement_directions:
            neighbor_node = (current_node[0] + move[0], current_node[1] + move[1])
            if not (0 <= neighbor_node[0] < rows and 0 <= neighbor_node[1] < cols):
                continue
            if binary_maze[neighbor_node] == 0:
                continue

            tentative_g_cost = current_g + cost_matrix[neighbor_node]
            if tentative_g_cost < g_cost.get(neighbor_node, float('inf')):
                previous_node[neighbor_node] = current_node
                g_cost[neighbor_node] = tentative_g_cost
                f_cost[neighbor_node] = tentative_g_cost + calculate_heuristic(neighbor_node, goal_coord)
                heapq.heappush(open_queue, (f_cost[neighbor_node], tentative_g_cost, neighbor_node))

    raise ValueError("No path found between the start and goal points.")

def reduce_pathway(point_list, tolerance=5):
    """
    Simplify a path using the Douglas-Peucker algorithm.
    """
    def perpendicular_dist(point, line_start, line_end):
        if line_start == line_end:
            return math.sqrt((point[0] - line_start[0])**2 + (point[1] - line_start[1])**2)
        num = abs((line_end[1] - line_start[1]) * point[0] - 
                  (line_end[0] - line_start[0]) * point[1] +
                  line_end[0] * line_start[1] - 
                  line_end[1] * line_start[0])
        denom = math.sqrt((line_end[1] - line_start[1])**2 + (line_end[0] - line_start[0])**2)
        return num / denom

    def douglas_peucker(points, tolerance):
        if len(points) < 3:
            return points
        first_point, last_point = points[0], points[-1]
        max_dist, split_index = 0, 0
        for idx in range(1, len(points) - 1):
            dist = perpendicular_dist(points[idx], first_point, last_point)
            if dist > max_dist:
                split_index, max_dist = idx, dist
        if max_dist > tolerance:
            left_side = douglas_peucker(points[:split_index + 1], tolerance)
            right_side = douglas_peucker(points[split_index:], tolerance)
            return left_side[:-1] + right_side
        return [first_point, last_point]

    return douglas_peucker(point_list, tolerance)

def save_path_to_file(final_path, file_path):
    """
    Save the path coordinates to a CSV file.
    """
    with open(file_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["Row", "Column"])
        writer.writerows(final_path)

def solve_maze_and_export(csv_output_path):
    global selected_points
    print("Open camera. Press 's' to capture the maze image.")
    video_capture = cv2.VideoCapture(1)
    if not video_capture.isOpened():
        raise RuntimeError("Unable to access the camera.")

    while True:
        success, video_frame = video_capture.read()
        if not success:
            raise RuntimeError("Failed to read camera feed.")

        cv2.imshow("Camera Feed", video_frame)
        key_press = cv2.waitKey(1)
        if key_press == ord('s'):
            maze_snapshot = video_frame.copy()
            print("Maze image captured!")
            break
        elif key_press == ord('q'):
            print("Exiting...")
            video_capture.release()
            cv2.destroyAllWindows()
            return

    video_capture.release()
    cv2.destroyAllWindows()

    processed_maze = clean_maze_image(maze_snapshot)
    cost_matrix = generate_cost_matrix(processed_maze)
    print("Click start and end points on the maze image.")
    cv2.imshow("Click Points", maze_snapshot)
    cv2.setMouseCallback("Click Points", mouse_click_handler)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

    if len(selected_points) != 2:
        raise ValueError("Please select exactly two points.")

    maze_start, maze_goal = selected_points
    calculated_path = perform_a_star_search(processed_maze, cost_matrix, maze_start, maze_goal)
    simplified_path = reduce_pathway(calculated_path, tolerance=10)
    save_path_to_file(simplified_path, csv_output_path)

    print(f"Path saved to {csv_output_path}")

# Main execution
output_csv_file = r"D:\Ajith\ASU\2ND SEMESTER\RAS 545 (ROBOTIC SYSTEMS 1)\Project\solution\path_points.csv"
solve_maze_and_export(output_csv_file)


In [None]:
#pixel_to_robot_mapper

import csv
import pandas as pd

def compute_slope_and_offset(x1, y1, x2, y2):
    if x2 - x1 == 0:
        raise ValueError("Vertical line detected, slope is undefined.")
    gradient = (y2 - y1) / (x2 - x1)
    offset = y1 - gradient * x1
    return gradient, offset

def transform_to_robot_coords(pix_x, pix_y, gradient_x, offset_x, gradient_y, offset_y):
    return (pix_x * gradient_x + offset_x, pix_y * gradient_y + offset_y)

def initialize_detection_system():
    pix_x_min, pix_x_max = 0, 639
    pix_y_min, pix_y_max = 0, 479

    try:
        robot_top_left_x = float(input("Enter the robot's top-left x-coordinate: "))
        robot_bottom_right_x = float(input("Enter the robot's bottom-right x-coordinate: "))
        gradient_x, offset_x = compute_slope_and_offset(pix_x_min, robot_top_left_x, pix_x_max, robot_bottom_right_x)

        robot_top_left_y = float(input("Enter the robot's top-left y-coordinate: "))
        robot_bottom_right_y = float(input("Enter the robot's bottom-right y-coordinate: "))
        gradient_y, offset_y = compute_slope_and_offset(pix_y_min, robot_top_left_y, pix_y_max, robot_bottom_right_y)

        print(f"gradient_x={gradient_x}, offset_x={offset_x}, gradient_y={gradient_y}, offset_y={offset_y}")

    except ValueError as e:
        print(f"Error occurred: {e}")
        return

    # Read pixel coordinates from CSV file
    waypoint_file = input("Enter the path to the CSV file with waypoints: ")
    result_file = "robot_coordinates_output.txt"

    try:
        waypoint_table = pd.read_csv(waypoint_file)

        print("Transforming pixel coordinates to robot coordinates:")
        with open(result_file, "w") as result:
            result.write("Pixel Coordinates -> Robot Coordinates\n")
            for idx, waypoint in waypoint_table.iterrows():
                try:
                    pix_x = float(waypoint['Column'])
                    pix_y = float(waypoint['Row'])
                    robot_coords = transform_to_robot_coords(pix_x, pix_y, gradient_x, offset_x, gradient_y, offset_y)
                    result.write(f"Pixel ({pix_x}, {pix_y}) -> Robot {robot_coords}\n")
                    print(f"Pixel ({pix_x}, {pix_y}) -> Robot {robot_coords}")
                except ValueError:
                    print(f"Invalid data in row {idx}. Skipping.")

        print(f"Transformed coordinates saved to {result_file}")

    except FileNotFoundError:
        print("CSV file not found. Please verify the file path.")
    except KeyError:
        print("The CSV file must contain 'Row' and 'Column' headers.")

if __name__ == "__main__":
    initialize_detection_system()
