In [1]:
import cv2
import numpy as np
from ultralytics import YOLO
from PIL import Image
import easyocr
from rapidfuzz.distance import JaroWinkler
import supervision as sv
from tqdm import tqdm
from Levenshtein import distance as levenshtein_distance
from paddleocr import PaddleOCR

In [2]:
upscale_model = cv2.dnn_superres.DnnSuperResImpl_create()
upscale_model.readModel('/Users/shravanp/Coding/Robotics/AIScouter/src/models/ESPCN_x4.pb')
upscale_model.setModel('espcn', 4)

In [2]:
model = YOLO('/Users/shravanp/Coding/Robotics/AIScouter/src/models/y8v7.pt')

# Initialize EasyOCR reader
# reader = easyocr.Reader(['en'], gpu=True)
ocr = PaddleOCR(use_angle_cls=True, lang='en')
# Define the team numbers for red and blue teams
original_red_team_numbers = ["75", "2722", "1391"]  # Replace with actual red team numbers
original_blue_team_numbers = ["56", "5401", "8513"]  # Replace with actual blue team numbers

[2024/08/18 19:04:49] ppocr DEBUG: Namespace(help='==SUPPRESS==', use_gpu=False, use_xpu=False, use_npu=False, use_mlu=False, ir_optim=True, use_tensorrt=False, min_subgraph_size=15, precision='fp32', gpu_mem=500, gpu_id=0, image_dir=None, page_num=0, det_algorithm='DB', det_model_dir='/Users/shravanp/.paddleocr/whl/det/en/en_PP-OCRv3_det_infer', det_limit_side_len=960, det_limit_type='max', det_box_type='quad', det_db_thresh=0.3, det_db_box_thresh=0.6, det_db_unclip_ratio=1.5, max_batch_size=10, use_dilation=False, det_db_score_mode='fast', det_east_score_thresh=0.8, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, det_sast_score_thresh=0.5, det_sast_nms_thresh=0.2, det_pse_thresh=0, det_pse_box_thresh=0.85, det_pse_min_area=16, det_pse_scale=1, scales=[8, 16, 32], alpha=1.0, beta=1.0, fourier_degree=5, rec_algorithm='SVTR_LCNet', rec_model_dir='/Users/shravanp/.paddleocr/whl/rec/en/en_PP-OCRv4_rec_infer', rec_image_inverse=True, rec_image_shape='3, 48, 320', rec_batch_num=6, max_t

In [None]:
import cv2
import numpy as np
from PIL import Image
from tqdm import tqdm
from your_module import levenshtein_distance, upscale_model, ocr, model  # Update this with actual imports

video_path = "/notebooks/videos/dcmp58-3sec.mp4"
output_path = "/notebooks/videos/output/dcmp58-3sec-test.mp4"
cap = cv2.VideoCapture(video_path)

# Check if video opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

frame_count = 0
previous_frame_assignments = {}
with tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT))) as pbar:
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Finished processing all frames.")
            break

        if frame is None:
            print("Empty frame encountered.")
            continue

        # Convert the frame from BGR (OpenCV) to RGB (PIL)
        image_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        results = model(image_pil)

        blue_boxes = []
        red_boxes = []

        # Separate boxes by team color
        for result in results:
            boxes = result.boxes
            for box in boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                start_row = (y1 + y2) // 2
                bottom_half = frame[start_row:y2, x1:x2]

                # Calculate average RGB to determine the team color
                average_rgb = np.mean(bottom_half, axis=(0, 1))
                label = 'Blue' if average_rgb[0] > average_rgb[2] else 'Red'
                color = (255, 0, 0) if label == 'Blue' else (0, 0, 255)

                # OCR: Recognize text in the bottom half of the box
                upscaled_image = upscale_model.upsample(bottom_half)
                gray = cv2.cvtColor(upscaled_image, cv2.COLOR_BGR2GRAY)
                clahe = cv2.createCLAHE(clipLimit=2, tileGridSize=(30,30))
                enhanced_gray = clahe.apply(gray)

                result = ocr.ocr(np.array(enhanced_gray), cls=False)
                ocr_result = ""
                try:
                    ocr_result = result[0][0][1][0]
                except:
                    ocr_result = ""

                detected_text = ocr_result.replace(" ", "") if ocr_result else ""

                if label == 'Blue':
                    blue_boxes.append((x1, y1, x2, y2, detected_text, color))
                else:
                    red_boxes.append((x1, y1, x2, y2, detected_text, color))

        def assign_team_numbers(boxes, team_numbers, previous_assignments=None):
            # Create a copy of the team numbers to avoid modifying the original list
            available_team_numbers = team_numbers[:]
            assigned_numbers = set()  # Track used numbers in this frame

            # Create a mapping of detected boxes to team numbers
            box_to_team_mapping = {}

            for x1, y1, x2, y2, detected_text, color in boxes:
                if detected_text:
                    # Calculate Levenshtein distances
                    distances = {num: levenshtein_distance(detected_text, num) for num in available_team_numbers}
                    # Sort distances by closest match
                    sorted_distances = sorted(distances.items(), key=lambda item: item[1])

                    detected_team_number = None
                    distance_used = None

                    for team_number, distance in sorted_distances:
                        if team_number not in assigned_numbers:
                            detected_team_number = team_number
                            assigned_numbers.add(team_number)
                            available_team_numbers.remove(team_number)
                            distance_used = distance
                            break

                    # If no match found, fallback to the first available number
                    if detected_team_number is None and available_team_numbers:
                        detected_team_number = available_team_numbers.pop(0)
                        assigned_numbers.add(detected_team_number)
                        distance_used = None
                else:
                    # Handle cases with no detected text
                    detected_team_number = available_team_numbers.pop(0) if available_team_numbers else "Unknown"
                    assigned_numbers.add(detected_team_number)
                    distance_used = None

                # Map the box to the detected team number
                box_to_team_mapping[(x1, y1, x2, y2)] = (detected_team_number, distance_used)

            # If you have previous frame assignments, find the best match from the previous frame
            if previous_assignments:
                # Keep track of which previous boxes have been used
                used_previous_boxes = set()
                for (x1, y1, x2, y2), (detected_team_number, distance_used) in box_to_team_mapping.items():
                    best_match = None
                    best_distance = float('inf')
                    best_prev_team_number = None
                    best_prev_distance_used = None

                    for (px1, py1, px2, py2), (prev_team_number, prev_distance_used) in previous_assignments.items():
                        if (px1, py1, px2, py2) in used_previous_boxes:
                            continue

                        # Calculate the Euclidean distance between the current and previous bounding boxes
                        distance = np.sqrt((x1 - px1)**2 + (y1 - py1)**2 + (x2 - px2)**2 + (y2 - py2)**2)
                        if distance < best_distance:
                            best_distance = distance
                            best_match = (px1, py1, px2, py2)
                            best_prev_team_number = prev_team_number
                            best_prev_distance_used = prev_distance_used

                    if best_match:
                        # Ensure that the best previous box is not used by other current boxes
                        used_previous_boxes.add(best_match)

                        # Compare current Levenshtein distance with previous frame's Levenshtein distance
                        if best_prev_distance_used is not None and (distance_used is None or best_prev_distance_used < distance_used):
                            # Use the previous frame's team number and Levenshtein distance
                            box_to_team_mapping[(x1, y1, x2, y2)] = (best_prev_team_number, best_prev_distance_used)
                            # Update the previous frame's assignment with the current bounding box
                            previous_assignments[(x1, y1, x2, y2)] = (best_prev_team_number, best_prev_distance_used)

            # Draw the bounding boxes and labels on the image
            for (x1, y1, x2, y2), (detected_team_number, distance_used) in box_to_team_mapping.items():
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(frame, f"{detected_team_number}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

            return box_to_team_mapping  # Return mapping for potential interpolation in next frame
        
        # Assign team numbers to Blue and Red boxes
        current_frame_assignments_blue = assign_team_numbers(blue_boxes, blue_team_numbers, previous_frame_assignments.get('blue'))
        current_frame_assignments_red = assign_team_numbers(red_boxes, red_team_numbers, previous_frame_assignments.get('red'))

        # Save current frame assignments for the next iteration
        previous_frame_assignments['blue'] = current_frame_assignments_blue
        previous_frame_assignments['red'] = current_frame_assignments_red

        # Write the frame with annotations to the output video
        out.write(frame)
        frame_count += 1
        pbar.update(1)

# Release resources
cap.release()
out.release()
cv2.destroyAllWindows()

print(f"Video processing complete. Output saved to {output_path}")
