In [28]:
# Package imports
import cv2
import json
import numpy as np
import chess
import chess.svg
from cairosvg import svg2png
import random 
import chess.engine
import time
from Arm_lib import Arm_Device
import asyncio

In [None]:
# Initialize robotic arm
Arm = Arm_Device()
time.sleep(.1)
speed = 1700

# Chess game engine
engine = chess.engine.SimpleEngine.popen_uci('Stockfish/src/./stockfish' )

# Load square coordinates and arm positions
with open('chessboard_mapping.json', 'r') as file:
    square_map = json.load(file)

with open('arm_position.json', 'r') as file:
    arm_position = json.load(file)

In [None]:
# Visualizes the chessboard and game states in real time
def visualize_game(board: chess.Board, size=400, move = None) -> None:
    if move is not None:
        sq1, sq2 = chess.parse_square(move[:2]), chess.parse_square(move[2:4])
        svgwrap = chess.svg.board(board, size=size, fill=dict.fromkeys([sq1, sq2], '#ced264'))
    else:
        svgwrap = chess.svg.board(board, size=size)
    svg2png(svgwrap, write_to='output.png')
    cv2.imshow('Game', cv2.imread('output.png'))

In [23]:
# Determines a chessboard square at a point
def identify_square(x: float, y: float, square_map: dict):
    for square in square_map:
        points = np.array(square_map[square], np.int32)
        if cv2.pointPolygonTest(points, (x, y), False) > 0:
            return square
    return None

In [21]:
# Draws the chessboard grid squares on the frame
def map_grid(square_map: dict, frame, show_text = False) -> None:
    for square in square_map:
        points = np.array(square_map[square], dtype=np.int32)
        cv2.polylines(frame,[points], True, (255, 255, 255), thickness=1)
        x, y, w, h = cv2.boundingRect(points)
        if show_text:
            cv2.putText(frame, square, (x + w //2, y + h //2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

In [None]:
# Moves the arm to the rest position
async def rest_arm(array, speed):
    Arm.Arm_serial_servo_write6_array(array, speed)
    await asyncio.sleep(speed / 1000) 

In [None]:
# Moves the arm to pick up a piece 
async def pick_piece(array, speed):
    try:
        Arm.Arm_serial_servo_write6_array(array, speed)
        original = array[5]
        await asyncio.sleep(5) 
        array[5] = 179
        Arm.Arm_serial_servo_write6_array(array, speed)
        await asyncio.sleep(1) 
        array[5] = original
    except ValueError as e:
        print(f"Invalid move at index ('{array}'): {e}")         

In [None]:
# Moves the arm to place a piece 
async def place_piece(array, speed):
    try:
        Arm.Arm_serial_servo_write6_array(array, speed)
        await asyncio.sleep(3) 
        array[5] = 159
        Arm.Arm_serial_servo_write6_array(array, speed)
    except ValueError as e:
        print(f"Invalid move at index ('{array}'): {e}")

In [None]:
def initialize_camera():
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Unable to access the camera.")
        return None
    return cap

In [None]:
# Capture a frame from the webcam and overlay the chessboard grid.
def capture_and_display_frame(cap, square_map):
    ret, frame = cap.read()
    if not ret:
        print("Failed to capture frame.")
        return None
    map_grid(square_map, frame)
    cv2.imshow('Frame', frame)
    return frame

In [None]:
# Handle the computer's move using the chess engine and robotic arm.
def handle_computer_move(engine, board, arm_position, speed):
    
    result = engine.play(board, chess.engine.Limit(time=random.random()))
    comp_move = result.move.uci()
    (sq1, sq2) = (comp_move[:2], comp_move[2:4])

    # Move the robotic arm
    start_square = sq1
    start_angles = arm_position[start_square]
    asyncio.run(pick_piece(start_angles, speed))

    end_square = sq2
    end_angles = arm_position[end_square]
    asyncio.run(place_piece(end_angles, speed))

    # Update the chessboard
    board.push(result.move)
    print('Robot plays', result.move.uci())
    visualize_game(board, move=str(result.move))
    return False  # Switch to human's turn

In [None]:
# Record the initial and final frames for human move detection.
def record_move_frames(frame, initial):
    if len(initial) == 0:
        print("Recording initial frame")
        return frame, []  # Initial frame recorded
    else:
        print("Recording final frame")
        return initial, frame  # Both frames recorded

In [None]:
# Validate and execute a UCI move on the chessboard.
async def validate_and_push_move(sq1, sq2, board):
    if board.color_at(chess.parse_square(sq1)) == board.turn:
        start, end = sq1, sq2
    else:
        start, end = sq2, sq1

    move = start + end
    try:
        board.push_uci(move)
        print(f"Move played: {move}")
        visualize_game(board, move=move)
        return move
    except ValueError:
        print(f"Invalid move detected: {move}")
        return None

In [None]:
# Process the human's move by comparing the initial and final frames.
async def process_human_move(initial, final, square_map, board):
    gray1 = cv2.cvtColor(initial, cv2.COLOR_BGR2GRAY)
    gray2 = cv2.cvtColor(final, cv2.COLOR_BGR2GRAY)
    diff = cv2.absdiff(gray1, gray2)
    _, diff = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
    diff = cv2.dilate(diff, None, iterations=4)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
    diff = cv2.erode(diff, kernel, iterations=6)

    contours, _ = cv2.findContours(diff, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    sorted_contours = sorted(
        contours, key=lambda c: cv2.contourArea(c), reverse=True
    )[:2]

    if len(sorted_contours) < 2:
        print("Invalid move detected: Not enough contours.")
        return None, None

    # Identify centers of bounding boxes
    centers = [
        (x + w // 2, y + h // 2) for x, y, w, h in [cv2.boundingRect(c) for c in sorted_contours]
    ]
    highlights = set(identify_square(cx, cy, square_map) for cx, cy in centers)

    # Validate and execute the move
    if len(highlights) == 2:
        sq1, sq2 = highlights.pop(), highlights.pop()
        move = await validate_and_push_move(sq1, sq2, board)
        if move:
            return move
    print("Invalid move detected. Try again.")
    return None

In [None]:
# Quit the game and release resources.
def quit_game(cap):
    cap.release()
    cv2.destroyAllWindows()

In [None]:
# Main loop
async def main_loop():
    # Initialize webcam capture
    cap = initialize_camera()
    if cap is None:
        return  

    board = chess.Board()

    # Move the arm to the rest position
    await rest_arm(arm_position["rest"], speed)

    initial, final = [], []

    comp_move = True
    visualize_game(board)
    cv2.waitKey(1)

    # Main game loop
    while not board.is_game_over():
        frame = capture_and_display_frame(cap, square_map)
        if frame is None:
            break

        if comp_move:
            # Handle computer's move asynchronously
            comp_move_task = asyncio.create_task(handle_computer_move(engine, board, arm_position, speed))
            await comp_move_task
            comp_move = False

        if cv2.waitKey(1) & 0xFF == ord('r'):
            initial, final = record_move_frames(frame, initial)
            if final:
                move = await process_human_move(initial, final, square_map, board)
                if move:
                    comp_move = True  # Switch to computer's turn
                initial, final = [], []  # Reset frames

        if cv2.waitKey(2) & 0xFF == ord('q'):
            quit_game(cap)
            break

    visualize_game(board)
    quit_game(cap)

asyncio.run(main_loop())