In [4]:
import cv2
import numpy as np
import torch
import torch.nn as nn
import random
from PIL import ImageFont, ImageDraw, Image
import ssl
from yolov8 import YOLOv8

# Initialize YOLOv8 model
model = YOLOv8("yolov8n.pt", classes_path="dnn_model/classes.txt")

# Use the model for object detection
detections = model.detect(frame)

ssl._create_default_https_context = ssl._create_unverified_context

# Define the ObjectDetection class for YOLOv8 object detection
class ObjectDetection:
    def __init__(self, model_weights='yolov8n.pt', classes_path='dnn_model/classes.txt'):
        print("Loading Object Detection")
        print("Running YOLOv8")

        self.confThreshold = 0.5
        self.nmsThreshold = 0.4

        # Load the YOLOv8 model
        self.net = cv2.dnn.readNet(model_weights)
        self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
        self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
        self.current_object_id = None  # Initialize the current_object_id to None
        # Load the class names
        self.load_class_names(classes_path)

    def load_class_names(self, classes_path):
        with open(classes_path, "r") as file_object:
            self.classes = [class_name.strip() for class_name in file_object]

        self.colors = np.random.uniform(0, 255, size=(len(self.classes), 3))

    def detect(self, frame):
        blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
        self.net.setInput(blob)
        output_layers = self.net.getUnconnectedOutLayersNames()
        outputs = self.net.forward(output_layers)

        class_ids = []
        confidences = []
        boxes = []

        height, width = frame.shape[:2]

        for output in outputs:
            for detection in output:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]

                if confidence > self.confThreshold:
                    center_x = int(detection[0] * width)
                    center_y = int(detection[1] * height)
                    w = int(detection[2] * width)
                    h = int(detection[3] * height)
                    x = int(center_x - w / 2)
                    y = int(center_y - h / 2)

                    class_ids.append(int(class_id))
                    confidences.append(float(confidence))
                    boxes.append((x, y, w, h))

        indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confThreshold, self.nmsThreshold)
        filtered_indices = indices.flatten()
        filtered_class_ids = [class_ids[i] for i in filtered_indices]
        filtered_confidences = [confidences[i] for i in filtered_indices]
        filtered_boxes = [boxes[i] for i in filtered_indices]

        return filtered_class_ids, filtered_confidences, filtered_boxes

# Define the DQN class (you can use your existing DQN class)

# Define the ObjectTracker class
class ObjectTracker:
    def __init__(self, object_detection, dqn_model_path, video_path, output_video_path):
        self.object_detection = object_detection
        self.dqn_model_path = dqn_model_path
        self.video_path = video_path
        self.output_video_path = output_video_path

        # Initialize other parameters and variables

    def run(self):
        # Load the trained DQN model
        dqn_model = DQN(self.state_size, self.action_size)
        dqn_model.load_state_dict(torch.load(self.dqn_model_path))
        dqn_model.eval()

        # Open the video capture
        cap = cv2.VideoCapture(self.video_path)

        # Initialize the output video writer
        output_video = self.initialize_output_video_writer(cap)

        while True:
            ret, frame = cap.read()

            if not ret:
                break

            # Perform object detection
            detections = self.object_detection.detect(frame)

            # Update tracking objects using Kalman filtering and Hungarian matching
            self.update_tracking_objects(detections)

            # Perform action and update states
            action = self.choose_action()
            reward = self.calculate_reward(action)
            next_state = self.get_next_state()
            done = self.check_episode_completion()

            # Update total reward and other counters

            # Store the transition in the replay buffer

            # Perform DQN training

            # Update the target network periodically

            # Write frame with bounding boxes and labels to output video
            self.write_frame_to_output_video(frame, output_video)

            # Show the frame with the detected objects
            cv2.imshow("Frame", frame)

            if cv2.waitKey(1) == ord("q"):
                break

        # Release video capture and close windows
        cap.release()
        cv2.destroyAllWindows()

        # Print the summary of detected objects
        self.print_detected_objects_summary()

        # Print the tracked objects
        self.print_tracked_objects()

    # Implement the rest of the methods

# Create an instance of the ObjectDetection class
od = ObjectDetection(model_weights='yolov8n.pt', classes_path='dnn_model/classes.txt')

# Specify the paths
dqn_model_path = 'modell.h5'
video_path = 'output.mp4'
output_video_path = 'tracked_output.mp4'

# Create an instance of the ObjectTracker class and run the object tracking
tracker = ObjectTracker(od, dqn_model_path, video_path, output_video_path)
tracker.run()


ImportError: cannot import name 'YOLOv8' from 'yolov8' (/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/yolov8/__init__.py)

In [16]:
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from PIL import ImageFont, ImageDraw, Image
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
 

class ObjectDetection:
    def __init__(self, model_config='dnn_model/yolov4.cfg', model_weights='dnn_model/yolov4.weights',
            classes_path='dnn_model/classes.txt'):
        print("Loading Object Detection")
        print("Running YOLOv8")

        self.confThreshold = 0.5
        self.nmsThreshold = 0.4

        # Load the YOLOv4 model
        self.net = cv2.dnn.readNetFromDarknet(model_config, model_weights)
        self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
        self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
        self.current_object_id = None  # Initialize the current_object_id to None
        # Load the class names
        self.load_class_names(classes_path)

    def load_class_names(self, classes_path):
        with open(classes_path, "r") as file_object:
            self.classes = [class_name.strip() for class_name in file_object]

        self.colors = np.random.uniform(0, 255, size=(len(self.classes), 3))

    def detect(self, frame):
        blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
        self.net.setInput(blob)
        output_layers = self.net.getUnconnectedOutLayersNames()
        outputs = self.net.forward(output_layers)

        class_ids = []
        confidences = []
        boxes = []

        height, width = frame.shape[:2]

        for output in outputs:
            for detection in output:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]

                if confidence > self.confThreshold:
                    center_x = int(detection[0] * width)
                    center_y = int(detection[1] * height)
                    w = int(detection[2] * width)
                    h = int(detection[3] * height)
                    x = int(center_x - w / 2)
                    y = int(center_y - h / 2)

                    class_ids.append(int(class_id))
                    confidences.append(float(confidence))
                    boxes.append((x, y, w, h))

        indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confThreshold, self.nmsThreshold)
        filtered_indices = indices.flatten()
        filtered_class_ids = [class_ids[i] for i in filtered_indices]
        filtered_confidences = [confidences[i] for i in filtered_indices]
        filtered_boxes = [boxes[i] for i in filtered_indices]

        return filtered_class_ids, filtered_confidences, filtered_boxes


class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x


class ObjectTracker:
    def __init__(self, object_detection, dqn_model_path, video_path, output_video_path):
        self.object_detection = object_detection
        self.dqn_model_path = dqn_model_path
        self.video_path = video_path
        self.output_video_path = output_video_path

        self.state_size = 150528
        self.action_size = 2
        self.batch_size = 64
        self.learning_rate = 0.001
        self.num_episodes = 500
        self.num_steps = 100
        self.target_update_freq = 10
        self.epsilon = 1.0
        self.epsilon_decay = 0.99
        self.epsilon_min = 0.01
        self.gamma = 0.99
        self.target_objects = ['car', 'person']

        self.tracking_objects = {}
        self.object_id_counter = 0
        self.total_reward = 0
        self.correct_target_count = 0
        self.incorrect_target_count = 0
        self.no_target_count = 0
        self.movement_count = 0

    def run(self):
        # Load the trained DQN model
        dqn_model = DQN(self.state_size, self.action_size)
        dqn_model.load_state_dict(torch.load(self.dqn_model_path))
        dqn_model.eval()

        # Open the video capture
        cap = cv2.VideoCapture(self.video_path)

        # Initialize the output video writer
        output_video = self.initialize_output_video_writer(cap)

        while True:
            ret, frame = cap.read()

            if not ret:
                break

            # Perform object detection
            detections = self.object_detection.detect(frame)

            # Update tracking objects
            self.update_tracking_objects(detections)

            # Perform action and update states
            action = self.choose_action()
            reward = self.calculate_reward(action)
            next_state = self.get_next_state()
            done = self.check_episode_completion()

            # Update total reward and other counters

            # Store the transition in the replay buffer

            # Perform DQN training

            # Update the target network periodically

            # Write frame with bounding boxes and labels to output video
            self.write_frame_to_output_video(frame, output_video)

            # Show the frame with the detected objects
            cv2.imshow("Frame", frame)

            if cv2.waitKey(1) == ord("q"):
                break

        # Release video capture and close windows
        cap.release()
        cv2.destroyAllWindows()

        # Print the summary of detected objects
        self.print_detected_objects_summary()

        # Print the tracked objects
        self.print_tracked_objects()

    def initialize_output_video_writer(self, cap):
        output_video_fps = cap.get(cv2.CAP_PROP_FPS)
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        output_video = cv2.VideoWriter(self.output_video_path, fourcc, output_video_fps,
 (frame_width, frame_height), True)
        return output_video

    def update_tracking_objects(self, detections):
        class_ids, confidences, boxes = detections

        if len(boxes) == 0:
            print("No objects detected.")
            return
        max_confidence_index = np.argmax(confidences)
        max_confidence_box = boxes[max_confidence_index]
        class_id = class_ids[max_confidence_index]

        class_name = self.object_detection.classes[class_id]
        object_id = self.tracking_objects.get(class_name)
        for i, box in enumerate(boxes):
            class_id = class_ids[i]
            class_name = self.object_detection.classes[class_id]
            (x, y, w, h) = box

            if class_name not in self.tracking_objects:
                self.object_id_counter += 1
                self.tracking_objects[class_name] = self.object_id_counter

            if object_id is None:
            self.object_id_counter += 1
            self.tracking_objects[class_name] = self.object_id_counter
            object_id = self.object_id_counter
            self.current_object_id = object_id
            self.current_bounding_box = max_confidence_box
            # Update object position and other information

    def choose_action(self):
        # Implement action selection logic (e.g., epsilon-greedy policy)
        if random.random() < self.epsilon:
            # Random action (exploration)
            return random.choice([0, 1])  # Example: 0 for left, 1 for right
        else:
            # Choose action based on DQN
            state = self.get_current_state()  # Implement this function
            with torch.no_grad():
                q_values = self.dqn_model(state)
            action = q_values.argmax().item()
            return action
    def calculate_reward(self, action):
        bounding_box = self.get_current_bounding_box()
        frame_width, frame_height = self.get_frame_dimensions()

        middle_50_start = frame_width // 4
        middle_50_end = frame_width - frame_width // 4
        x1 = bounding_box[0]
        x2 = bounding_box[0] + bounding_box[2]

        if middle_50_start < x1 < x2 < middle_50_end:
            # Calculate distance reward (example)
            distance_reward = self.calculate_distance_reward(bounding_box, frame_width, frame_height)
            return distance_reward
        else:
            # Apply penalty (example)
            penalty = self.calculate_penalty(bounding_box, frame_width, frame_height)
            return penalty
    def calculate_distance_reward(self, bounding_box, frame_width, frame_height):
        # Implement your logic to calculate distance-based reward
        # Example: Calculate the distance between the bounding box center and frame center
        box_center_x = bounding_box[0] + bounding_box[2] // 2
        frame_center_x = frame_width // 2
        distance = abs(box_center_x - frame_center_x)
        max_distance = frame_width // 2  # Maximum possible distance
        normalized_distance = 1 - (distance / max_distance)  # Normalize to [0, 1]
        return normalized_distance

    def calculate_penalty(self, bounding_box, frame_width, frame_height):
        # Implement your logic to calculate penalties
        # Example: Penalize for being outside the middle 50%
        middle_50_start = frame_width // 4
        middle_50_end = frame_width - frame_width // 4
        x1 = bounding_box[0]
        x2 = bounding_box[0] + bounding_box[2]

        if x1 < middle_50_start or x2 > middle_50_end:
            # Penalize for being outside the middle 50%
            return -0.5  # Example penalty value
        else:
            return 0  # No penalty

    def get_next_state(self):
        # Implement your logic to get the next state
      return
    def check_episode_completion(self):
        # Implement your logic to check if the episode is completed
      return
    def write_frame_to_output_video(self, frame, output_video):
        # Implement your logic to write frame with bounding boxes and labels to output video
        return
    def print_detected_objects_summary(self):
        # Implement your logic to print the summary of detected objects
        return
    def print_tracked_objects(self):
        # Implement your logic to print the tracked objects
        return
    # ... (other methods)
    def get_current_state(self):
        # Implement your logic to get the current state as a tensor
        # Example: Convert the current frame to a tensor
        # Note: You may need to preprocess the frame
        frame = self.get_current_frame()  # Implement this function
        state = torch.tensor(frame, dtype=torch.float32)
        return state

    def get_current_frame(self):
        # Implement your logic to get the current frame
        # Example: Capture the current frame from video
        ret, frame = self.cap.read()
        if not ret:
            return None
        return frame

    def get_frame_dimensions(self):
        # Implement your logic to get the frame dimensions
        frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        return frame_width, frame_height

    def get_current_bounding_box(self):
        # Assuming each object is represented as a dictionary with keys 'x', 'y', 'w', 'h'
        # and the current tracked object is identified by self.current_object_id
        if self.current_object_id in self.tracking_objects:
            current_object = self.tracking_objects[self.current_object_id]
            current_bounding_box = (current_object['x'], current_object['y'], current_object['w'], current_object['h'])
            return current_bounding_box
        else:
            # If the current object is not found in the tracking_objects dictionary
            # You may return a default value or handle the situation accordingly
            return None

if __name__ == '__main__':
    # Create an instance of the ObjectDetection class
    od = ObjectDetection(model_config='dnn_model/yolov4.cfg', model_weights='dnn_model/yolov4.weights',
                         classes_path='dnn_model/classes.txt')

    # Specify the paths
    dqn_model_path = 'modell.h5'
    video_path = 'output.mp4'
    output_video_path = 'tracked_output.mp4'

    # Create an instance of the ObjectTracker class and run the object tracking
    tracker = ObjectTracker(od, dqn_model_path, video_path, output_video_path)
    tracker.run()


IndentationError: expected an indented block after 'if' statement on line 204 (4183001720.py, line 205)

In [2]:
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import ssl

ssl._create_default_https_context = ssl._create_unverified_context

# Define the ObjectDetection class for YOLOv4
class ObjectDetection:
    def __init__(self, model_config='dnn_model/yolov4.cfg', model_weights='dnn_model/yolov4.weights', classes_path='dnn_model/classes.txt'):
        print("Loading Object Detection")
        print("Running YOLOv8")

        self.confThreshold = 0.5
        self.nmsThreshold = 0.4

        # Load the YOLOv4 model
        self.net = cv2.dnn.readNet(model_config, model_weights)
        self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
        self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)

        # Load the class names
        self.load_class_names(classes_path)

    def load_class_names(self, classes_path):
        with open(classes_path, "r") as file_object:
            self.classes = [class_name.strip() for class_name in file_object]

        self.colors = np.random.uniform(0, 255, size=(len(self.classes), 3))

    def detect(self, frame, target_object):
        blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
        self.net.setInput(blob)
        output_layers = self.net.getUnconnectedOutLayersNames()
        outputs = self.net.forward(output_layers)

        class_ids = []
        confidences = []
        boxes = []

        height, width = frame.shape[:2]

        for output in outputs:
            for detection in output:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]

                if confidence > self.confThreshold:
                    class_name = self.classes[class_id]

                    if class_name == target_object:
                        center_x = int(detection[0] * width)
                        center_y = int(detection[1] * height)
                        w = int(detection[2] * width)
                        h = int(detection[3] * height)
                        x = int(center_x - w / 2)
                        y = int(center_y - h / 2)

                        class_ids.append(int(class_id))
                        confidences.append(float(confidence))
                        boxes.append((x, y, w, h))

        indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confThreshold, self.nmsThreshold)
        filtered_indices = indices.flatten()
        filtered_class_ids = [class_ids[i] for i in filtered_indices]
        filtered_confidences = [confidences[i] for i in filtered_indices]
        filtered_boxes = [boxes[i] for i in filtered_indices]

        return filtered_class_ids, filtered_confidences, filtered_boxes

# Define the ObjectTracker class
class ObjectTracker:
    def __init__(self, object_detection, video_path, output_video_path, target_object):
        self.object_detection = object_detection
        self.video_path = video_path
        self.output_video_path = output_video_path
        self.target_object = target_object

        self.tracking_objects = {}

        # Initialize the video capture (cap)
        self.cap = cv2.VideoCapture(self.video_path)

    def run(self):
        # Initialize the output video writer
        output_video = self.initialize_output_video_writer(self.cap)

        while True:
            ret, frame = self.cap.read()

            if not ret:
                break

            # Perform object detection for the target object
            detections = self.object_detection.detect(frame, self.target_object)

            # Update tracking objects
            self.update_tracking_objects(detections)

            # Draw bounding boxes and labels
            self.draw_boxes(frame)

            # Write frame with bounding boxes and labels to output video
            self.write_frame_to_output_video(frame, output_video)

            # Show the frame with the detected and tracked objects
            cv2.imshow("Frame", frame)

            if cv2.waitKey(1) == ord("q"):
                break

        # Release video capture and close windows
        self.cap.release()
        cv2.destroyAllWindows()

    def initialize_output_video_writer(self, cap):
        output_video_fps = cap.get(cv2.CAP_PROP_FPS)
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        output_video = cv2.VideoWriter(self.output_video_path, fourcc, output_video_fps, (frame_width, frame_height), True)
        return output_video

    def update_tracking_objects(self, detections):
        class_ids, confidences, boxes = detections

        for i, box in enumerate(boxes):
            class_id = class_ids[i]
            confidence = confidences[i]
            (x, y, w, h) = box

            # Create or update the tracking object using class_id as the identifier
            self.tracking_objects[class_id] = {
                'class_id': class_id,
                'confidence': confidence,
                'box': (x, y, w, h)
            }

    def draw_boxes(self, frame):
        for obj_id, obj_info in self.tracking_objects.items():
            class_id = obj_info['class_id']
            confidence = obj_info['confidence']
            (x, y, w, h) = obj_info['box']

            color = self.object_detection.colors[class_id]
            label = f"{self.object_detection.classes[class_id]}: {confidence:.2f}"

            cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
            cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    def write_frame_to_output_video(self, frame, output_video):
        output_video.write(frame)

if __name__ == '__main__':
    # Create an instance of the ObjectDetection class
    od = ObjectDetection(model_config='dnn_model/yolov4.cfg', model_weights='dnn_model/yolov4.weights', classes_path='dnn_model/classes.txt')

    # Specify the paths and target object
    video_path = 'output.mp4'
    output_video_path = 'tracked_output.mp4'
    target_object = 'car'  # Specify the object you want to prioritize (e.g., 'car')

    # Create an instance of the ObjectTracker class and run the object tracking
    tracker = ObjectTracker(od, video_path, output_video_path, target_object)
    tracker.run()


Loading Object Detection
Running YOLOv8


In [25]:
import cv2
import numpy as np
import torch
import random
import ssl

ssl._create_default_https_context = ssl._create_unverified_context

# Define the ObjectDetection class for YOLOv4 using PyTorch
class ObjectDetection:
    def __init__(self, model_path='yolov8x.pt', classes_path='dnn_model/classes.txt'):
        print("Loading Object Detection")
        print("Running YOLOv8")

        self.confThreshold = 0.5
        self.nmsThreshold = 0.4

        # Load the YOLOv4 model using PyTorch
        self.net = torch.load(model_path)  # Load your pretrained YOLOv4 model
        self.net.eval()

        # Load the class names
        self.load_class_names(classes_path)

    def load_class_names(self, classes_path):
        with open(classes_path, "r") as file_object:
            self.classes = [class_name.strip() for class_name in file_object]

        self.colors = np.random.uniform(0, 255, size=(len(self.classes), 3))

    def detect(self, frame, target_object):
        # Perform object detection using the YOLOv4 model
        results = self.net(frame)

        class_ids = []
        confidences = []
        boxes = []

        for result in results.pred[0]:
            class_id = int(result[-1])
            class_name = self.classes[class_id]
            confidence = float(result[4])

            if confidence > self.confThreshold and class_name == target_object:
                xyxy = result[:4].tolist()
                boxes.append([int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])])
                class_ids.append(class_id)
                confidences.append(confidence)

        return class_ids, confidences, boxes

# Rest of the ObjectTracker and main code remains the same

if __name__ == '__main__':
    # Create an instance of the ObjectDetection class
    od = ObjectDetection(model_path='yolov8x.pt', classes_path='dnn_model/classes.txt')

    # Specify the paths and target object
    video_path = 'output.mp4'
    output_video_path = 'tracked_output.mp4'
    target_object = 'car'  # Specify the object you want to prioritize (e.g., 'car')

    # Create an instance of the ObjectTracker class and run the object tracking
    tracker = ObjectTracker(od, video_path, output_video_path, target_object)
    tracker.run()


Loading Object Detection
Running YOLOv8


AttributeError: 'dict' object has no attribute 'eval'

In [46]:
import numpy as np
import random
import os
import cv2
import time
%pip install ultralytics
import ultralytics
ultralytics.checks()
from ultralytics import YOLO


Ultralytics YOLOv8.0.132 🚀 Python-3.11.1 torch-2.0.1 CPU
Setup complete ✅ (8 CPUs, 8.0 GB RAM, 182.2/228.3 GB disk)


In [47]:
target_class_ids = [2, 3]


In [60]:
class YOLOv8_ObjectDetector:
    """
    A class for performing object detection on images and videos using YOLOv8.

    Args:
    ------------
        model_file (str): Path to the YOLOv8 model file or yolo model variant name in ths format: [variant].pt
        labels (list[str], optional): A list of class labels for the model. If None, uses the default labels from the model file.
        classes (list[str], optional): Alias for labels. Deprecated.
        conf (float, optional): Minimum confidence threshold for object detection.
        iou (float, optional): Minimum IOU threshold for non-max suppression.

    Attributes:
    --------------
        classes (list[str]): A list of class labels for the model ( a Dict is also acceptable).
        conf (float): Minimum confidence threshold for object detection.
        iou (float): Minimum IOU threshold for non-max suppression.
        model (YOLO): The YOLOv8 model used for object detection.
        model_name (str): The name of the YOLOv8 model file (without the .pt extension).

    Methods :
    -------------
        default_display: Returns a default display (ultralytics plot implementation) of the object detection results.
        custom_display: Returns a custom display of the object detection results.
        predict_video: Predicts objects in a video and saves the results to a file.
        predict_img: Predicts objects in an image and returns the detection results.

    """

    def __init__(self, model_file = 'yolov8x.pt', labels= None, classes = None, conf = 0.25, iou = 0.45 ):

        self.classes = classes
        self.conf = conf
        self.iou = iou

        self.model = YOLO(model_file)
        self.model_name = model_file.split('.')[0]
        self.results = None

        if labels == None:
            self.labels = self.model.names

    def predict_img(self, img, verbose=True):
        """
        Runs object detection on a single image and filter objects based on target class IDs.

        Parameters
        ----------
        img (numpy.ndarray): Input image to perform object detection on.
        verbose (bool): Whether to print detection details.

        Returns:
        -----------
        'ultralytics.yolo.engine.results.Results': A YOLO results object that contains 
         details about detection results:
            - Class IDs
            - Bounding Boxes
            - Confidence score
            ...
        (please refer to https://docs.ultralytics.com/reference/results/#results-api-reference for results API reference)

        """

        # Run the model on the input image with the given parameters
        results = self.model(img, classes=self.classes, conf=self.conf, iou=self.iou, verbose=verbose)

        # Save the original image and the results for further analysis if needed
        self.orig_img = img

        # Filter objects based on target class IDs
        filtered_results = self.filter_objects_by_class_ids(results, target_class_ids)

        # Return the filtered detection results
        return filtered_results

    def filter_objects_by_class_ids(self, results, target_class_ids):
        """
        Filter detection results based on target class IDs.

        Parameters:
        -----------
        results: YOLO detection results.
        target_class_ids (list): A list of target class IDs to keep.

        Returns:
        --------
        'ultralytics.yolo.engine.results.Results': A YOLO results object containing filtered detection results.
        """
        filtered_boxes = []

        for box in results.pred[0]:
            class_id = int(box[5])

            if class_id in target_class_ids:
                filtered_boxes.append(box)

        filtered_results = results.clone()
        filtered_results.pred[0] = np.stack(filtered_boxes)

        return filtered_results


    def track_specific_objects(self, results, target_class_ids):
        """
        Track specific objects based on their class IDs.

        Args:
            results: YOLO detection results.
            target_class_ids (list): A list of target class IDs to track.

        Returns:
            list: A list of tracked objects with the specified class IDs.
        """
        tracked_objects = []

        for box in results.pred[0]:
            class_id = int(box[5])

            if class_id in target_class_ids:
                # Extract object details for tracking (e.g., bounding box coordinates)
                x1, y1, x2, y2 = map(int, box[:4])
                tracked_objects.append((class_id, (x1, y1, x2, y2)))

        return tracked_objects
    def default_display(self, show_conf=True, line_width=None, font_size=None, 
                        font='Arial.ttf', pil=False, example='abc'):
        """
        Displays the detected objects on the original input image.

        Parameters
        ----------
        show_conf : bool, optional
            Whether to show the confidence score of each detected object, by default True.
        line_width : int, optional
            The thickness of the bounding box line in pixels, by default None.
        font_size : int, optional
            The font size of the text label for each detected object, by default None.
        font : str, optional
            The font type of the text label for each detected object, by default 'Arial.ttf'.
        pil : bool, optional
            Whether to return a PIL Image object, by default False.
        example : str, optional
            A string to display on the example bounding box, by default 'abc'.

        Returns
        -------
        numpy.ndarray or PIL Image
            The original input image with the detected objects displayed as bounding boxes.
            If `pil=True`, a PIL Image object is returned instead.

        Raises
        ------
        ValueError
            If the input image has not been detected by calling the `predict_img()` method first.
        """
        # Check if the `predict_img()` method has been called before displaying the detected objects
        if self.results is None:
            raise ValueError('No detected objects to display. Call predict_img() method first.')
        
        # Call the plot() method of the `self.results` object to display the detected objects on the original image
        display_img = self.results.plot(show_conf, line_width, font_size, font, pil, example)
        
        # Return the displayed image
        return display_img

        

    def custom_display(self, colors, show_cls = True, show_conf = True):
        """
        Custom display method that draws bounding boxes and labels on the original image, 
        with additional options for showing class and confidence information.

        Parameters:
        -----------
        colors : list
            A list of tuples specifying the color of each class.
        show_cls : bool, optional
            Whether to show class information in the label text. Default is True.
        show_conf : bool, optional
            Whether to show confidence information in the label text. Default is True.

        Returns:
        --------
        numpy.ndarray
            The image with bounding boxes and labels drawn on it.
        """

        img = self.orig_img
        # calculate the bounding box thickness based on the image width and height
        bbx_thickness = (img.shape[0] + img.shape[1]) // 450

        for box in self.results.boxes:
            textString = ""

            # Extract object class and confidence score
            score = box.conf.item() * 100
            class_id = int(box.cls.item())

            x1 , y1 , x2, y2 = np.squeeze(box.xyxy.numpy()).astype(int)

            # Print detection info
            if show_cls:
                textString += f"{self.labels[class_id]}"

            if show_conf:
                textString += f" {score:,.2f}%"

            # Calculate font scale based on object size
            font = cv2.FONT_HERSHEY_COMPLEX
            fontScale = (((x2 - x1) / img.shape[0]) + ((y2 - y1) / img.shape[1])) / 2 * 2.5
            fontThickness = 1
            textSize, baseline = cv2.getTextSize(textString, font, fontScale, fontThickness)

            # Draw bounding box, a centroid and label on the image
            img = cv2.rectangle(img, (x1,y1), (x2,y2), colors[class_id], bbx_thickness)
            center_coordinates = ((x1 + x2)//2, (y1 + y2) // 2)

            img =  cv2.circle(img, center_coordinates, 5 , (0,0,255), -1)
            
             # If there are no details to show on the image
            if textString != "":
                if (y1 < textSize[1]):
                    y1 = y1 + textSize[1]
                else:
                    y1 -= 2
                # show the details text in a filled rectangle
                img = cv2.rectangle(img, (x1, y1), (x1 + textSize[0] , y1 -  textSize[1]), colors[class_id], cv2.FILLED)
                img = cv2.putText(img, textString , 
                    (x1, y1), font, 
                    fontScale,  (0, 0, 0), fontThickness, cv2.LINE_AA)

        return img


    def predict_video(self, video_path, save_dir, save_format="avi", display='custom', verbose=True, **display_args):
        """Runs object detection on each frame of a video and saves the output to a new video file.

        Args:
        ----------
            video_path (str): The path to the input video file.
            save_dir (str): The path to the directory where the output video file will be saved.
            save_format (str, optional): The format for the output video file. Defaults to "avi".
            display (str, optional): The type of display for the detection results. Defaults to 'custom'.
            verbose (bool, optional): Whether to print information about the video file and output file. Defaults to True.
            **display_args: Additional arguments to be passed to the display function.

        Returns:
        ------------
            None
        """
        # Open the input video file
        cap = cv2.VideoCapture(video_path)

        # Get the name of the input video file
        vid_name = os.path.basename(video_path)

        # Get the dimensions of each frame in the input video file
        width = int(cap.get(3))  # get `width`
        height = int(cap.get(4))  # get `height`

        # Create the directory for the output video file if it does not already exist
        if not os.path.isdir(save_dir):
            os.makedirs(save_dir)

        # Set the name and path for the output video file
        save_name = self.model_name + ' -- ' + vid_name.split('.')[0] + '.' + save_format
        save_file = os.path.join(save_dir, save_name)

        # Print information about the input and output video files if verbose is True
        if verbose:
            print("----------------------------")
            print(f"DETECTING OBJECTS IN : {vid_name} : ")
            print(f"RESOLUTION : {width}x{height}")
            print('SAVING TO :' + save_file)

        # Define an output VideoWriter object
        out = cv2.VideoWriter(save_file,
                              cv2.VideoWriter_fourcc(*"MJPG"),
                              30, (width, height))

        # Check if the input video file was opened correctly
        if not cap.isOpened():
            print("Error opening video stream or file")

        # Read each frame of the input video file
        while cap.isOpened():
            ret, frame = cap.read()

            # If the frame was not read successfully, break the loop
            if not ret:
                print("Error reading frame")
                break

            # Run object detection on the frame and calculate FPS
            beg = time.time()
            results = self.predict_img(frame, verbose=False)
            if results is None:
                print('***********************************************')
            fps = 1 / (time.time() - beg)

            # Display the detection results
            if display == 'default':
                frame = self.default_display(**display_args)
            elif display == 'custom':
                frame == self.custom_display(**display_args)

            # Display the FPS on the frame
            frame = cv2.putText(frame, f"FPS : {fps:,.2f}",
                                (5, 15), cv2.FONT_HERSHEY_COMPLEX,
                                0.5, (0, 0, 255), 1, cv2.LINE_AA)

            # Write the frame to the output video file
            out.write(frame)

            # Exit the loop if the 'q' button is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        # After the loop release the cap and video writer
        cap.release()
        out.release()

    


In [61]:
d = YOLOv8_ObjectDetector()
print(d.labels) 

{0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microw

In [62]:
# Initialize YOLOv8 detectors with different model files and confidence thresholds
yolo_names = ['yolov8x.pt']
colors = [(random.randint(50, 255), random.randint(50, 255), random.randint(50, 255)) for _ in range(80)]
detectors = []

for yolo_name in yolo_names:
    detector = YOLOv8_ObjectDetector(yolo_name, conf=0.55)
    detectors.append(detector)

# Process a test video and track specific objects
vid_results_path = 'ultralytics/ultralytics/tracker/utils/output'
test_vids_path = 'ultralytics/ultralytics/tracker/trackers/test vids'


In [64]:
yolo_names = ['yolov8x.pt']
colors = [(random.randint(50, 255), random.randint(50, 255), random.randint(50, 255)) for _ in range(80)]
detectors = []

for yolo_name in yolo_names:
    detector = YOLOv8_ObjectDetector(yolo_name, conf=0.55)
    detectors.append(detector)

# Process a test video and track specific objects
vid_results_path = 'ultralytics/ultralytics/tracker/utils/output'
test_vids_path = 'ultralytics/ultralytics/tracker/trackers/test vids'

if not os.path.isdir(vid_results_path):
    os.makedirs(vid_results_path)

for detector in detectors:
    detector.predict_video(
        video_path='ultralytics/ultralytics/tracker/trackers/test vids/output.mp4',
        save_dir=vid_results_path,
        save_format="avi",
        display='custom',
        colors=colors,
        target_class_ids=target_class_ids
    )

----------------------------
DETECTING OBJECTS IN : output.mp4 : 
RESOLUTION : 1024x540
SAVING TO :ultralytics/ultralytics/tracker/utils/output/yolov8x -- output.avi


AttributeError: 'list' object has no attribute 'pred'

In [45]:
for detector in detectors:
    detector.predict_video(video_path= 'ultralytics/ultralytics/tracker/trackers/test vids/output.mp4'
, save_dir = vid_results_path, save_format = "avi", display = 'custom', colors = colors)

----------------------------
DETECTING OBJECTS IN : output.mp4 : 
RESOLUTION : 1024x540
SAVING TO :ultralytics/ultralytics/tracker/utils/output/yolov8n -- output.avi
Error reading frame
----------------------------
DETECTING OBJECTS IN : output.mp4 : 
RESOLUTION : 1024x540
SAVING TO :ultralytics/ultralytics/tracker/utils/output/yolov8x -- output.avi
Error reading frame


In [65]:
import cv2
import numpy as np
import random
import os
import time
import ssl
from ultralytics import YOLO

ssl._create_default_https_context = ssl._create_unverified_context

# Load YOLOv8 model
yolo_names = ['yolov8x.pt']
yolov8_models = {}
for yolo_name in yolo_names:
    yolov8_models[yolo_name[:-3]] = YOLO(yolo_name)

class YOLOv8_ObjectDetector:
    def __init__(self, model_file='yolov8x.pt', conf=0.25, iou=0.45):
        self.conf = conf
        self.iou = iou
        self.model = yolov8_models[model_file.split('.')[0]]
        self.model_name = model_file.split('.')[0]
        self.results = None

    def predict_img(self, img, verbose=True):
        results = self.model(img, conf=self.conf, iou=self.iou, verbose=verbose)
        self.orig_img = img
        self.results = results[0]
        return results[0]

    def custom_display(self, colors, show_cls=True, show_conf=True):
        img = self.orig_img
        bbx_thickness = (img.shape[0] + img.shape[1]) // 450

        for box in self.results.boxes:
            textString = ""
            score = box.conf.item() * 100
            class_id = int(box.cls.item())
            x1, y1, x2, y2 = np.squeeze(box.xyxy.numpy()).astype(int)

            if show_cls:
                textString += f"{self.model.names[class_id]}"

            if show_conf:
                textString += f" {score:,.2f}%"

            font = cv2.FONT_HERSHEY_COMPLEX
            fontScale = (((x2 - x1) / img.shape[0]) + ((y2 - y1) / img.shape[1])) / 2 * 2.5
            fontThickness = 1
            textSize, baseline = cv2.getTextSize(textString, font, fontScale, fontThickness)

            img = cv2.rectangle(img, (x1, y1), (x2, y2), colors[class_id], bbx_thickness)
            center_coordinates = ((x1 + x2) // 2, (y1 + y2) // 2)
            img = cv2.circle(img, center_coordinates, 5, (0, 0, 255), -1)

            if textString != "":
                if (y1 < textSize[1]):
                    y1 = y1 + textSize[1]
                else:
                    y1 -= 2
                img = cv2.rectangle(img, (x1, y1), (x1 + textSize[0], y1 - textSize[1]), colors[class_id], cv2.FILLED)
                img = cv2.putText(img, textString, (x1, y1), font, fontScale, (0, 0, 0), fontThickness, cv2.LINE_AA)

        return img

    def predict_video(self, video_path, save_dir, save_format="avi", display='custom', verbose=True, **display_args):
        cap = cv2.VideoCapture(video_path)
        vid_name = os.path.basename(video_path)
        width = int(cap.get(3))
        height = int(cap.get(4))

        if not os.path.isdir(save_dir):
            os.makedirs(save_dir)

        save_name = self.model_name + ' -- ' + vid_name.split('.')[0] + '.' + save_format
        save_file = os.path.join(save_dir, save_name)

        if verbose:
            print("----------------------------")
            print(f"DETECTING OBJECTS IN: {vid_name}")
            print(f"RESOLUTION: {width}x{height}")
            print('SAVING TO: ' + save_file)

        out = cv2.VideoWriter(save_file, cv2.VideoWriter_fourcc(*"MJPG"), 30, (width, height), True)

        if not cap.isOpened():
            print("Error opening video stream or file")

        while cap.isOpened():
            ret, frame = cap.read()

            if not ret:
                print("Error reading frame")
                break

            beg = time.time()
            results = self.predict_img(frame, verbose=False)
            if results is None:
                print('***********************************************')
            fps = 1 / (time.time() - beg)

            if display == 'custom':
                frame = self.custom_display(**display_args)

            frame = cv2.putText(frame, f"FPS: {fps:,.2f}", (5, 15), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)

            out.write(frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        out.release()

# Initialize YOLOv8 detectors with different model files and confidence thresholds
yolo_names = ['yolov8x.pt']
colors = [(random.randint(50, 255), random.randint(50, 255), random.randint(50, 255)) for _ in range(80)]
detectors = []

for yolo_name in yolo_names:
    detector = YOLOv8_ObjectDetector(yolo_name, conf=0.55)
    detectors.append(detector)

# Process a test video and track specific objects
vid_results_path = 'ultralytics/ultralytics/tracker/utils/output'
test_vids_path = 'ultralytics/ultralytics/tracker/trackers/test vids'

if not os.path.isdir(vid_results_path):
    os.makedirs(vid_results_path)

for detector in detectors:
    detector.predict_video(
        video_path='ultralytics/ultralytics/tracker/trackers/test vids/output.mp4',
        save_dir=vid_results_path,
        save_format="avi",
        display='custom',
        colors=colors
    )


----------------------------
DETECTING OBJECTS IN: output.mp4
RESOLUTION: 1024x540
SAVING TO: ultralytics/ultralytics/tracker/utils/output/yolov8x -- output.avi
Error reading frame


In [72]:
import cv2
import numpy as np
import random
import os
import time
import ssl
from ultralytics import YOLO
import torch
from collections import deque
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter
from filterpy.common import Q_discrete_white_noise

ssl._create_default_https_context = ssl._create_unverified_context

# Load YOLOv8 model
yolo_names = ['yolov8x.pt']
yolov8_models = {}
for yolo_name in yolo_names:
    yolov8_models[yolo_name[:-3]] = YOLO(yolo_name)

# Define colors for bounding boxes
colors = [(random.randint(50, 255), random.randint(50, 255), random.randint(50, 255)) for _ in range(80)]

# Initialize DQN model
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)
    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x
state_size = 150528  # Define the state size based on your specific requirements
action_size = 2  # Define the action size based on your specific requirements

dqn_model_path = 'modell.h5'
dqn_model = DQN(state_size, action_size)
dqn_model.load_state_dict(torch.load(dqn_model_path))
dqn_model.eval()

# Initialize Kalman filters for tracking
kalman_filters = {}  # Dictionary to store Kalman filters for each object

# Initialize a deque to store the last N Kalman filter predictions
N_HISTORY = 10  # Define the number of history states to store for each object
kalman_history = {}  # Dictionary to store Kalman prediction history for each object

# Initialize Hungarian algorithm cost matrix
cost_matrix = None

# Initialize prioritization weights for tracked objects
prioritization_weights = {}  # Dictionary to store weights for each tracked object

# Define YOLOv8 object detection class
class YOLOv8_ObjectDetector:
    def __init__(self, model_file='yolov8x.pt', conf=0.25, iou=0.45):
        self.conf = conf
        self.iou = iou
        self.model = yolov8_models[model_file.split('.')[0]]
        self.model_name = model_file.split('.')[0]
        self.results = None

    def predict_img(self, img, verbose=True):
        results = self.model(img, conf=self.conf, iou=self.iou, verbose=verbose)
        self.orig_img = img
        self.results = results[0]
        return results[0]

    def custom_display(self, show_cls=True, show_conf=True):
        img = self.orig_img
        bbx_thickness = (img.shape[0] + img.shape[1]) // 450

        for box in self.results.boxes:
            textString = ""
            score = box.conf.item() * 100
            class_id = int(box.cls.item())
            x1, y1, x2, y2 = np.squeeze(box.xyxy.numpy()).astype(int)

            if show_cls:
                textString += f"{self.model.names[class_id]}"

            if show_conf:
                textString += f" {score:,.2f}%"

            font = cv2.FONT_HERSHEY_COMPLEX
            fontScale = (((x2 - x1) / img.shape[0]) + ((y2 - y1) / img.shape[1])) / 2 * 2.5
            fontThickness = 1
            textSize, baseline = cv2.getTextSize(textString, font, fontScale, fontThickness)

            img = cv2.rectangle(img, (x1, y1), (x2, y2), colors[class_id], bbx_thickness)
            center_coordinates = ((x1 + x2) // 2, (y1 + y2) // 2)
            img = cv2.circle(img, center_coordinates, 5, (0, 0, 255), -1)

            if textString != "":
                if (y1 < textSize[1]):
                    y1 = y1 + textSize[1]
                else:
                    y1 -= 2
                img = cv2.rectangle(img, (x1, y1), (x1 + textSize[0], y1 - textSize[1]), colors[class_id], cv2.FILLED)
                img = cv2.putText(img, textString, (x1, y1), font, fontScale, (0, 0, 0), fontThickness, cv2.LINE_AA)

        return img

# Initialize YOLOv8 detectors with different model files and confidence thresholds
yolo_names = ['yolov8x.pt']
detectors = []

for yolo_name in yolo_names:
    detector = YOLOv8_ObjectDetector(yolo_name, conf=0.55)
    detectors.append(detector)

# Process a test video and track specific objects
vid_results_path = 'ultralytics/ultralytics/tracker/utils/output'
test_vids_path = 'ultralytics/ultralytics/tracker/trackers/test vids'

if not os.path.isdir(vid_results_path):
    os.makedirs(vid_results_path)

# Initialize video capture
cap = cv2.VideoCapture('ultralytics/ultralytics/tracker/trackers/test vids/output.mp4')

while True:
    ret, frame = cap.read()

    if not ret:
        print("Error reading frame")
        break

    for detector in detectors:
        results = detector.predict_img(frame, verbose=False)
        if results is None:
            print('***********************************************')

    # Implement tracking, Kalman filtering, Hungarian algorithm, and prioritization logic here

    # Display the frame with tracked objects and prioritization weights

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


Error reading frame


: 