## Simulate a webcam that will replay the previous x seconds of recording after a basket.  Works on windows, must modify for pi.  

In [None]:
import pygame
import cv2
import time
from collections import deque

# Constants
WINDOW_WIDTH, WINDOW_HEIGHT = 640, 480
FPS = 30
REPLAY_DURATION = 2  # seconds
MAX_FRAMES = REPLAY_DURATION * FPS  # Ensure buffer holds enough frames for the replay duration
MAX_CAMERAS = 10

def list_cameras():
    """
    List all available cameras and print their information.
    Returns a list of available camera indices.
    """
    print("Detecting cameras...")
    available_cameras = []
    for i in range(MAX_CAMERAS):
        cap = cv2.VideoCapture(i)
        if cap.isOpened():
            print(f"Camera {i}: AVAILABLE")
            available_cameras.append(i)
            cap.release()
        else:
            print(f"Camera {i}: NOT AVAILABLE")
    return available_cameras

def main():
    pygame.init()
    available_cameras = list_cameras()
    if not available_cameras:
        print("No cameras detected. Exiting.")
        return

    screen = pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))
    pygame.display.set_caption("Instant Replay")
    clock = pygame.time.Clock()
    font = pygame.font.Font(None, 36)  # Default font for "Instant Replay" text
    countdown_font = pygame.font.Font(None, 28)  # Smaller font for the countdown timer

    # Initialize video capture for each available camera
    cameras = {}
    buffers = {}
    for cam_index in available_cameras:
        cap = cv2.VideoCapture(cam_index)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, WINDOW_WIDTH)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, WINDOW_HEIGHT)
        cameras[cam_index] = cap
        buffers[cam_index] = deque(maxlen=MAX_FRAMES)

    active_camera = available_cameras[0]
    replaying = False
    replay_start_time = 0
    replay_end_time = 0

    running = True
    while running:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            elif event.type == pygame.KEYDOWN:
                if event.unicode.isdigit():
                    cam_number = int(event.unicode)
                    if cam_number in cameras:
                        print(f"Switched to Camera {cam_number}")
                        active_camera = cam_number
                elif event.key == pygame.K_r:
                    print(f"Instant Replay for Camera {active_camera}!")
                    replaying = True
                    replay_start_time = time.time()
                    replay_end_time = replay_start_time + REPLAY_DURATION

        screen.fill((0, 0, 0))  # Clear the screen with a black background

        if replaying:
            # Replay the buffered frames for the active camera
            elapsed = time.time() - replay_start_time
            remaining_time = replay_end_time - time.time()

            if remaining_time > 0:
                frame_index = int(elapsed * FPS)

                # Check if we have enough frames in the buffer
                if frame_index < len(buffers[active_camera]):
                    frame = buffers[active_camera][frame_index]
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    pygame_frame = pygame.surfarray.make_surface(frame.swapaxes(0, 1))
                    screen.blit(pygame_frame, (0, 0))

                    # Render countdown timer in the lower-right corner
                    countdown_surface = countdown_font.render(f"{int(remaining_time)}s left", True, (255, 255, 255))
                    screen.blit(countdown_surface, (WINDOW_WIDTH - 100, WINDOW_HEIGHT - 40))

                else:
                    # If not enough frames, stop replay and return to live feed
                    print("Exhausted frame buffer!  Returning to live feed.")
                    replaying = False  # End replay
                    buffers[active_camera].clear()  # Clear the buffer to stop further playback
                    # Optionally, reset the replay start and end times to prepare for next replay
                    replay_start_time = 0
                    replay_end_time = 0

            else:
                # End the replay when time is over
                replaying = False
                buffers[active_camera].clear()  # Clear the buffer to stop further playback


        else:
            # Regular capture and buffering for the active camera
            for cam_index, cap in cameras.items():
                ret, frame = cap.read()
                if ret:
                    buffers[cam_index].append(frame)
                    if cam_index == active_camera:
                        # Display only the active camera's feed
                        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                        pygame_frame = pygame.surfarray.make_surface(frame.swapaxes(0, 1))
                        screen.blit(pygame_frame, (0, 0))

        if not replaying:
            # Render "Instant Replay" text after the video feed if not replaying
            text_surface = font.render(f"Instant Replay - Camera {active_camera}", True, (255, 255, 255))
            screen.blit(text_surface, (10, 10))

        pygame.display.flip()
        clock.tick(FPS)

    # Release all cameras
    for cap in cameras.values():
        cap.release()
    pygame.quit()


if __name__ == "__main__":
    main()

pygame 2.6.1 (SDL 2.28.4, Python 3.13.0)
Hello from the pygame community. https://www.pygame.org/contribute.html
Detecting cameras...
Camera 0: AVAILABLE
Camera 1: AVAILABLE
Camera 2: AVAILABLE
Camera 3: NOT AVAILABLE
Camera 4: NOT AVAILABLE
Camera 5: NOT AVAILABLE
Camera 6: NOT AVAILABLE
Camera 7: NOT AVAILABLE
Camera 8: NOT AVAILABLE
Camera 9: NOT AVAILABLE
Instant Replay for Camera 0!
Instant Replay for Camera 0!
Instant Replay for Camera 0!
Instant Replay for Camera 0!
Instant Replay for Camera 0!
Not enough frames for replay. Returning to live feed.
Instant Replay for Camera 0!
Not enough frames for replay. Returning to live feed.
Instant Replay for Camera 0!
Not enough frames for replay. Returning to live feed.
Instant Replay for Camera 0!
Not enough frames for replay. Returning to live feed.
Instant Replay for Camera 0!
Not enough frames for replay. Returning to live feed.
Instant Replay for Camera 0!
Not enough frames for replay. Returning to live feed.
Instant Replay for Camer

## We can use ws2812b led panels for the feedback lights.  This will remove the relay  noise and allow customizable colors!

In [4]:
import serial
import serial.tools.list_ports
import ipywidgets as widgets
from IPython.display import display

# Global variable for the serial connection
arduino = None

# Function to list available serial ports
def list_com_ports():
    ports = list(serial.tools.list_ports.comports())
    return [port.device for port in ports]

# Function to send data to Arduino
def send_data(btn):
    global arduino
    if arduino and arduino.is_open:
        panel = panel_dropdown.value
        r = int(r_input.value)
        g = int(g_input.value)
        b = int(b_input.value)
        brightness = int(brightness_input.value)
        duration = int(duration_input.value)

        command = f"panel {panel} {r} {g} {b} {brightness} {duration}\n"
        arduino.write(command.encode())  # Send the command to Arduino
        last_sent_command_widget.value = f"Last Command: {command.strip()}"
    else:
        last_sent_command_widget.value = "Error: Arduino not connected."

# Function to handle port selection and connect
def connect_to_arduino(btn):
    global arduino
    port = port_dropdown.value
    try:
        if arduino and arduino.is_open:
            arduino.close()  # Close previous connection
        arduino = serial.Serial(port, 9600, timeout=1)  # Open serial connection
        connection_status_widget.value = f"Connected to {port}"
    except Exception as e:
        connection_status_widget.value = f"Error connecting to {port}: {str(e)}"

# Create the widgets
port_dropdown = widgets.Dropdown(
    options=list_com_ports(),
    description='Port:',
)

connect_button = widgets.Button(description="Connect")
connect_button.on_click(connect_to_arduino)

panel_dropdown = widgets.Dropdown(
    options=[(f"Panel {i+1}", i+1) for i in range(2)],
    description='Panel:',
)

r_input = widgets.IntText(value=255, description='Red:', min=0, max=255)
g_input = widgets.IntText(value=0, description='Green:', min=0, max=255)
b_input = widgets.IntText(value=0, description='Blue:', min=0, max=255)
brightness_input = widgets.IntSlider(value=25, description='Brightness:', min=0, max=255, step= 15)
duration_input = widgets.IntText(value=1000, description='Duration (ms):', min=1)

send_button = widgets.Button(description="Send Command")
send_button.on_click(send_data)

# Display widgets for connection status and last sent command
connection_status_widget = widgets.Label(value="Not connected")
last_sent_command_widget = widgets.Label(value="Last Command: None")

# Display all the widgets
display(port_dropdown, connect_button, panel_dropdown, r_input, g_input, b_input, brightness_input, duration_input, send_button, connection_status_widget, last_sent_command_widget)

Dropdown(description='Port:', options=('COM1', 'COM16'), value='COM1')

Button(description='Connect', style=ButtonStyle())

Dropdown(description='Panel:', options=(('Panel 1', 1), ('Panel 2', 2)), value=1)

IntText(value=255, description='Red:')

IntText(value=0, description='Green:')

IntText(value=0, description='Blue:')

IntSlider(value=25, description='Brightness:', max=255, step=15)

IntText(value=1000, description='Duration (ms):')

Button(description='Send Command', style=ButtonStyle())

Label(value='Not connected')

Label(value='Last Command: None')

In [None]:
import serial
import serial.tools.list_ports
import ipywidgets as widgets
from IPython.display import display

# Global variable for the serial connection
arduino = None

# Function to list available serial ports
def list_com_ports():
    ports = list(serial.tools.list_ports.comports())
    return [port.device for port in ports]

# Function to send data to Arduino
def send_data(panel, btn):
    global arduino
    if arduino and arduino.is_open:
        r = int(panel['r_input'].value)
        g = int(panel['g_input'].value)
        b = int(panel['b_input'].value)
        brightness = int(panel['brightness_input'].value)
        duration = int(panel['duration_input'].value)

        command = f"panel {panel['id']} {r} {g} {b} {brightness} {duration}\n"
        arduino.write(command.encode())  # Send the command to Arduino
        panel['last_sent_command_widget'].value = f"Last Command: {command.strip()}"
    else:
        panel['last_sent_command_widget'].value = "Error: Arduino not connected."

# Function to handle port selection and connect
def connect_to_arduino(btn):
    global arduino
    port = port_dropdown.value
    try:
        if arduino and arduino.is_open:
            arduino.close()  # Close previous connection
        arduino = serial.Serial(port, 9600, timeout=1)  # Open serial connection
        connection_status_widget.value = f"Connected to {port}"
    except Exception as e:
        connection_status_widget.value = f"Error connecting to {port}: {str(e)}"

# Create the widgets for Panel 1 and Panel 2
def create_panel_widgets(panel_id):
    return {
        'id': panel_id,
        'r_input': widgets.IntText(value=255, description='Red:', min=0, max=255, step= 15),
        'g_input': widgets.IntText(value=0, description='Green:', min=0, max=255, step= 15),
        'b_input': widgets.IntText(value=0, description='Blue:', min=0, max=255, step= 15),
        'brightness_input': widgets.IntSlider(value=30, description='Brightness:', min=0, max=255, step= 15),
        'duration_input': widgets.IntText(value=1000, description='Duration (ms):', min=1, step=100),
        'send_button': widgets.Button(description="Send Command"),
        'last_sent_command_widget': widgets.Label(value="Last Command: None")
    }

# Create the widgets
port_dropdown = widgets.Dropdown(
    options=list_com_ports(),
    description='Port:',
)

connect_button = widgets.Button(description="Connect")
connect_button.on_click(connect_to_arduino)

connection_status_widget = widgets.Label(value="Not connected")

# Panel 1 widgets
panel_1_widgets = create_panel_widgets(1)
panel_1_widgets['send_button'].on_click(lambda btn: send_data(panel_1_widgets, btn))

# Panel 2 widgets
panel_2_widgets = create_panel_widgets(2)
panel_2_widgets['send_button'].on_click(lambda btn: send_data(panel_2_widgets, btn))

# Layout for Panel 1 and Panel 2 side by side
panel_1_layout = widgets.VBox([
    panel_1_widgets['r_input'],
    panel_1_widgets['g_input'],
    panel_1_widgets['b_input'],
    panel_1_widgets['brightness_input'],
    panel_1_widgets['duration_input'],
    panel_1_widgets['send_button'],
    panel_1_widgets['last_sent_command_widget']
])

panel_2_layout = widgets.VBox([
    panel_2_widgets['r_input'],
    panel_2_widgets['g_input'],
    panel_2_widgets['b_input'],
    panel_2_widgets['brightness_input'],
    panel_2_widgets['duration_input'],
    panel_2_widgets['send_button'],
    panel_2_widgets['last_sent_command_widget']
])

# Combine panel 1 and panel 2 in side-by-side layout
panels_layout = widgets.HBox([panel_1_layout, panel_2_layout])

# Display all widgets
display(port_dropdown, connect_button, connection_status_widget, panels_layout)

Dropdown(description='Port:', options=('COM1', 'COM16'), value='COM1')

Button(description='Connect', style=ButtonStyle())

Label(value='Not connected')

HBox(children=(VBox(children=(IntText(value=255, description='Red:', step=15), IntText(value=0, description='G…