In [10]:
import torch
import numpy as np
import cv2
import pafy #take videos from yt and pass to model
from time import time
import rosbag
import cv2
from cv_bridge import CvBridge

#note: base code developed using: https://www.youtube.com/watch?v=3wdqO_vYMpA&t=0s

class ObjectDetection:
    """
    Implements the YOLO V5 Model on a YT video, webcam or local file using OpenCV 
    """

    def __init__(self, url, inp_typ, out_file):
        """
        Initialises the class with the YT Url and the Output File
        :param url: A valid YT URL OR Local file location
        :paral inp_typ: User defined either 'Webcam', 'Local' or 'YT'
        :out_file: A valid output file name.
        :r type: None
        """
        #initilising attributes
        self.input_t = inp_typ 
        self._URL = url  #can be YT url or local file path
        self.model = self.load_model()
        self.classes = self.model.names
        self.out_file = out_file
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu :(' #checks if cuda is available and uses it if it is
        print("\n \nDevice Used (if its not cuda gl)", self.device)

    def get_video_from_url(self):
        """
        Generates video streaming object. Frame by frame extraction will be done in order to make predictions
        :return: openCV2 video capture object, with lowest quality frame available for video
        """
        #distingish between local file and YT input
        
        
        if self.input_t == "Webcam":
            print("Opening Webcam")
            cap = cv2.VideoCapture(0)
            # Set resolution of input frames to 640x480
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)

            # Set frame rate of input frames to 30 frames per second
            cap.set(cv2.CAP_PROP_FPS, 30)
            return cap
        
        elif self.input_t == "Local":
            print("Loading local video file")
            input_file = self._URL #test for mp4
            cap = cv2.VideoCapture(input_file)
            # Set resolution of input frames to 640x480
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) #doesnt seem to work yet, check

            # Set frame rate of input rames to 30 frames per second
            cap.set(cv2.CAP_PROP_FPS, 30)
            return cap

        elif self.input_t == "YT":
            print("Loading YT Video")
            play = pafy.new(self._URL).streams[-1]
            input_file = play.url
            cap = cv2.VideoCapture(input_file)
            # Set resolution of input frames to 640x480
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 5)
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 5)

            # Set frame rate of input frames to 30 frames per second
            cap.set(cv2.CAP_PROP_FPS, 100)
            return cap
        
        elif self.input_t == "Rosbag":
            bag = rosbag.Bag(self._URL)
            bridge = CvBridge()
            cap = cv2.VideoCapture(bag.get_type_and_topic_info('/davis/left/image_raw'))
        

    
    def load_model(self):
        """
        Loads YOLO V5 Model from PyTorch
        :return: Train model from PyTorch
        """
        #you can also train your own model
        model = torch.hub.load('ultralytics/yolov5', 'yolov5n', pretrained=True)
        #specity directory, additionally YOLO V5 small model specified (pretrained)
        #if you specify custom, path for weights need to be provided
        return model
    
    def score_frame(self, frame):
        """
        Takes a single frame as input, scores frame using the model
        :param frame: Input frame in numpy/tuple/list format.
        :return: Labels and Coordinates of obj detected by model in that frame
        ::
        """
        #take a frame and do a forward pass
        self.model.to(self.device) #setting device
        if self.input_t == "Rosbag":
            frame = bridge.imgmsg_to_cv2(bag.read_messages('/davis/left/image_raw')[0].data)
        else:
            frame = [frame]
        #frame = [frame]
        results = self.model(frame) #for each frame the boundraies and labels will be stored

        labels, cord = results.xyxyn[0][:, -1], results.xyxyn[0][:, :-1]
        #keeps labels/coords of boundary boxes so they can be drawn later
        #take all val of first col, and last index in [:, -1]

    

        return labels, cord
    
    def class_to_label(self, x):
        """
        For given value of label, return string label
        :param x: numeric label
        :return: corresponding string label
        :r type: string
        """
        return self.classes[int(x)]
    
    def plot_boxes(self, results, frame):
        """
        Takes a given frame and results as input and then overlays bounding boxes and labels on the frame.
        :param results: Contains labels and coords predicted by model on frame.
        :param frame: Frame that has been scored.
        :return: Frame with bounding boxes and labels overlayed on it
        """
        labels, cord = results
        n = len(labels) #number of detected labels
        x_shape, y_shape = frame.shape[1], frame.shape[0]

        for i in range(n): #running through all the detections
            row = cord[i]
            if row[4]>=0.2:
                x1, y1, x2, y2 = int(row[0]*x_shape), int(row[1]*y_shape), int(row[2]*x_shape), int(row[3]*y_shape)
                bgr =  (0, 0, 255) #colour of boundary box, currently red
                label = self.class_to_label(labels[i])
                confidence = row[4]
                text = f"{label}: {confidence:.2f}" #label and confidence text to be shown
                cv2.rectangle(frame, (x1, y1), (x2, y2), bgr, 2) #draw rectangle around object
                cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, bgr, 2) #displaying correspoding label
        return frame 

    def __call__(self):
        """
        This function is called when the class is executed. Runs loop to read video frame by frame and outputs the result to a new file
        :return: void
        """
    

        player = self.get_video_from_url()
        assert player.isOpened()
        x_shape = int(player.get(cv2.CAP_PROP_FRAME_WIDTH))
        y_shape = int(player.get(cv2.CAP_PROP_FRAME_HEIGHT)) #output resolution
        four_cc = cv2.VideoWriter_fourcc(*"MJPG")
        out = cv2.VideoWriter(self.out_file, four_cc, 60, (x_shape, y_shape))

        while True: #as long as you have frames in video
            start_time = time() #timer
            if self.input_t == "Rosbag":
                ret, frame = cap.read()
            else:
                ret, frame = player.read()
            #ret, frame = player.read() #load frame from video
            if not ret:
                break
            results = self.score_frame(frame) #get results
            frame = self.plot_boxes(results, frame) #plot boxes
            
            # Display the frame with bounding boxes and labels in real-time
            cv2.imshow('Object Detection', frame)
            cv2.waitKey(1)  # Wait for a key event (1 millisecond delay)

            # Check for 'q' key press to exit the video processing
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

            end_time = time()
            fps = 1/np.round(end_time-start_time, 3) #calculate fps
            print(f"FPS:{fps}")
            print(x_shape, y_shape)
            out.write(frame)
        # Release the video capture and close the window
    cv2.destroyAllWindows()


#create new obj and execute
#give video url and output file name

detection = ObjectDetection("/home/shashank/Downloads/outdoor_day1_data.bag", "Rosbag", "video_t7.avi")
detection()
#choose between 'Local', 'Webcam' and 'YT' for input
#either give URL or path for YT and Local respectively




Using cache found in /home/shashank/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2023-5-20 Python-3.8.10 torch-2.0.1+cu117 CUDA:0 (Quadro T1000 with Max-Q Design, 3912MiB)

Fusing layers... 
YOLOv5n summary: 213 layers, 1867405 parameters, 0 gradients
Adding AutoShape... 


[31m[1mrequirements:[0m /home/shashank/.local/lib/python3.8/site-packages/requirements.txt not found, check failed.

 
Device Used (if its not cuda gl) cuda


error: OpenCV(4.7.0) :-1: error: (-5:Bad argument) in function 'VideoCapture'
> Overload resolution failed:
>  - Can't convert object to 'str' for 'filename'
>  - VideoCapture() missing required argument 'apiPreference' (pos 2)
>  - Argument 'index' is required to be an integer
>  - VideoCapture() missing required argument 'apiPreference' (pos 2)


In [6]:
import rosbag
import cv2
from cv_bridge import CvBridge

bag = rosbag.Bag('/home/shashank/Downloads/outdoor_day1_data.bag')
bridge = CvBridge()

for topic, msg, t in bag.read_messages(topics=['/davis/left/image_raw']):
    cv_image = bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
    cv2.imshow('Image', cv_image)
    cv2.waitKey(1)

cv2.destroyAllWindows()
bag.close()


KeyboardInterrupt: 

: 

In [2]:
import rosbag
import cv2
from cv_bridge import CvBridge

bag = rosbag.Bag('/home/shashank/Downloads/outdoor_day1_data.bag')
bridge = CvBridge()

# Create a VideoWriter object to save the video
output_file = 'output_video.avi'
output_fps = 30  # Set the desired output frames per second (FPS)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(output_file, fourcc, output_fps, (640, 480))  # Update the resolution if needed

for topic, msg, t in bag.read_messages(topics=['/davis/left/image_raw']):
    cv_image = bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
    
    # Write the frame to the video file
    video_writer.write(cv_image)
    
    cv2.imshow('Image', cv_image)
    cv2.waitKey(1)

cv2.destroyAllWindows()
bag.close()
video_writer.release()  # Release the video writer when done


In [1]:
import cv2
import rosbag
from cv_bridge import CvBridge

bag = rosbag.Bag('/home/shashank/Downloads/outdoor_day1_data.bag')
bridge = CvBridge()

# Create a cv2.VideoCapture object and pass the rosbag file to the constructor.
cap = cv2.VideoCapture("/dev/video0")

# Set the camera settings.
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

# Create a virtual webcam.
webcam = cv2.VideoWriter("/dev/video1", cv2.VideoWriter_fourcc('M','J','P','G'), 30, (640, 480))

# Loop over the messages in the bag.
while cap.isOpened():

    # Read the next image from the bag.
    ret, frame = cap.read()

    # Write the image to the virtual webcam.
    webcam.write(frame)

    # Display the image on the screen.
    cv2.imshow("Image", frame)

    # Wait for a key press.
    key = cv2.waitKey(1)

    # If the user presses ESC, close the window.
    if key == 27:
        break

# Close the rosbag file.
bag.close()

# Close the virtual webcam.
webcam.release()





KeyboardInterrupt: 

In [4]:
import torch
import numpy as np
import cv2
from cv_bridge import CvBridge
import rosbag

class ObjectDetection:
    """
    Implements the YOLO V5 Model on ROS bag images using OpenCV
    """

    def __init__(self, bag_file, out_file):
        """
        Initializes the class with the ROS bag file and the output file
        :param bag_file: Path to the ROS bag file
        :param out_file: A valid output file name.
        """
        self.bag_file = bag_file
        self.out_file = out_file
        self.model = self.load_model()
        self.classes = self.model.names
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print("Device Used:", self.device)

    def load_model(self):
        """
        Loads YOLO V5 Model from PyTorch
        :return: Trained model from PyTorch
        """
        model = torch.hub.load('ultralytics/yolov5', 'yolov5n', pretrained=True)
        return model

    def score_frame(self, frame):
        """
        Takes a single frame as input and scores the frame using the model
        :param frame: Input frame in numpy/tuple/list format.
        :return: Labels and coordinates of objects detected by the model in that frame
        """
        self.model.to(self.device)
        frame = [frame]
        results = self.model(frame)

        labels, cord = results.xyxyn[0][:, -1], results.xyxyn[0][:, :-1]
        return labels, cord

    def class_to_label(self, x):
        """
        For a given value of label, returns the corresponding string label
        :param x: Numeric label
        :return: Corresponding string label
        """
        return self.classes[int(x)]

    def plot_boxes(self, results, frame):
        """
        Takes a given frame and results as input and overlays bounding boxes and labels on the frame.
        :param results: Contains labels and coordinates predicted by the model on the frame.
        :param frame: Frame that has been scored.
        :return: Frame with bounding boxes and labels overlaid on it
        """
        labels, cord = results
        n = len(labels)
        x_shape, y_shape = frame.shape[1], frame.shape[0]

        for i in range(n):
            row = cord[i]
            if row[4] >= 0.2:
                x1, y1, x2, y2 = int(row[0] * x_shape), int(row[1] * y_shape), int(row[2] * x_shape), int(row[3] * y_shape)
                bgr = (0, 0, 255)
                label = self.class_to_label(labels[i])
                confidence = row[4]
                text = f"{label}: {confidence:.2f}"
                cv2.rectangle(frame, (x1, y1), (x2, y2), bgr, 2)
                cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, bgr, 2)
        return frame

    def __call__(self):
        """
        This function is called when the class is executed. Reads images from the ROS bag file and outputs the result to a new file.
        """
        bag = rosbag.Bag(self.bag_file)
        bridge = CvBridge()

        # Create a VideoWriter object to save the video
        output_fps = 30  # Set the desired output frames per second (FPS)
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        video_writer = cv2.VideoWriter(self.out_file, fourcc, output_fps, (1280, 720))  # Update the resolution if needed

        for topic, msg, t in bag.read_messages(topics=['/davis/left/image_raw']):
            cv_image = bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')

            results = self.score_frame(cv_image)
            cv_image = self.plot_boxes(results, cv_image)

            cv2.imshow('Object Detection', cv_image)
            cv2.waitKey(1)

            # Write the frame to the video file
            video_writer.write(cv_image)

        cv2.destroyAllWindows()
        bag.close()
        video_writer.release()


# Create an instance of the ObjectDetection class and execute
detection = ObjectDetection("/home/shashank/Downloads/outdoor_day1_data.bag", "output_video.avi")
detection()


Using cache found in /home/shashank/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2023-5-20 Python-3.8.10 torch-2.0.1+cu117 CUDA:0 (Quadro T1000 with Max-Q Design, 3912MiB)

Fusing layers... 
YOLOv5n summary: 213 layers, 1867405 parameters, 0 gradients
Adding AutoShape... 


[31m[1mrequirements:[0m /home/shashank/.local/lib/python3.8/site-packages/requirements.txt not found, check failed.
Device Used: cuda


KeyboardInterrupt: 

In [6]:
import torch
import numpy as np
import cv2
import pafy #take videos from yt and pass to model
from time import time
import rosbag
import cv2
from cv_bridge import CvBridge

#note: base code developed using: https://www.youtube.com/watch?v=3wdqO_vYMpA&t=0s

class ObjectDetection:
    """
    Implements the YOLO V5 Model on a YT video, webcam or local file using OpenCV 
    """

    def __init__(self, url, inp_typ, out_file):
        """
        Initialises the class with the YT Url and the Output File
        :param url: A valid YT URL OR Local file location
        :paral inp_typ: User defined either 'Webcam', 'Local' or 'YT'
        :out_file: A valid output file name.
        :r type: None
        """
        #initilising attributes
        self.input_t = inp_typ 
        self._URL = url  #can be YT url or local file path
        self.model = self.load_model()
        self.classes = self.model.names
        self.out_file = out_file
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu :(' #checks if cuda is available and uses it if it is
        print("\n \nDevice Used (if its not cuda gl)", self.device)

    def get_video_from_url(self):
        """
        Generates video streaming object. Frame by frame extraction will be done in order to make predictions
        :return: openCV2 video capture object, with lowest quality frame available for video
        """
        #distingish between local file and YT input
        
        
        if self.input_t == "Webcam":
            print("Opening Webcam")
            cap = cv2.VideoCapture(0)
            # Set resolution of input frames to 640x480
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)

            # Set frame rate of input frames to 30 frames per second
            cap.set(cv2.CAP_PROP_FPS, 30)
            return cap
        
        elif self.input_t == "Local":
            print("Loading local video file")
            input_file = self._URL #test for mp4
            cap = cv2.VideoCapture(input_file)
            # Set resolution of input frames to 640x480
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) #doesnt seem to work yet, check

            # Set frame rate of input rames to 30 frames per second
            cap.set(cv2.CAP_PROP_FPS, 30)
            return cap

        elif self.input_t == "YT":
            print("Loading YT Video")
            play = pafy.new(self._URL).streams[-1]
            input_file = play.url
            cap = cv2.VideoCapture(input_file)
            # Set resolution of input frames to 640x480
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 5)
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 5)

            # Set frame rate of input frames to 30 frames per second
            cap.set(cv2.CAP_PROP_FPS, 100)
            return cap
        

    
    def load_model(self):
        """
        Loads YOLO V5 Model from PyTorch
        :return: Train model from PyTorch
        """
        #you can also train your own model
        model = torch.hub.load('ultralytics/yolov5', 'yolov5n', pretrained=True)
        #specity directory, additionally YOLO V5 small model specified (pretrained)
        #if you specify custom, path for weights need to be provided
        return model
    
    def score_frame(self, frame):
        """
        Takes a single frame as input, scores frame using the model
        :param frame: Input frame in numpy/tuple/list format.
        :return: Labels and Coordinates of obj detected by model in that frame
        ::
        """
        #take a frame and do a forward pass
        self.model.to(self.device) #setting device
        frame = [frame]
        results = self.model(frame) #for each frame the boundraies and labels will be stored

        labels, cord = results.xyxyn[0][:, -1], results.xyxyn[0][:, :-1]
        #keeps labels/coords of boundary boxes so they can be drawn later
        #take all val of first col, and last index in [:, -1]

    

        return labels, cord
    
    def class_to_label(self, x):
        """
        For given value of label, return string label
        :param x: numeric label
        :return: corresponding string label
        :r type: string
        """
        return self.classes[int(x)]
    
    def plot_boxes(self, results, frame):
        """
        Takes a given frame and results as input and then overlays bounding boxes and labels on the frame.
        :param results: Contains labels and coords predicted by model on frame.
        :param frame: Frame that has been scored.
        :return: Frame with bounding boxes and labels overlayed on it
        """
        labels, cord = results
        n = len(labels) #number of detected labels
        x_shape, y_shape = frame.shape[1], frame.shape[0]

        for i in range(n): #running through all the detections
            row = cord[i]
            if row[4]>=0.2:
                x1, y1, x2, y2 = int(row[0]*x_shape), int(row[1]*y_shape), int(row[2]*x_shape), int(row[3]*y_shape)
                bgr =  (0, 0, 255) #colour of boundary box, currently red
                label = self.class_to_label(labels[i])
                confidence = row[4]
                text = f"{label}: {confidence:.2f}" #label and confidence text to be shown
                cv2.rectangle(frame, (x1, y1), (x2, y2), bgr, 2) #draw rectangle around object
                cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, bgr, 2) #displaying correspoding label
        return frame 

    def __call__(self):
        """
        This function is called when the class is executed. Runs loop to read video frame by frame and outputs the result to a new file
        :return: void
        """
    

        player = self.get_video_from_url()
        assert player.isOpened()
        x_shape = int(player.get(cv2.CAP_PROP_FRAME_WIDTH))
        y_shape = int(player.get(cv2.CAP_PROP_FRAME_HEIGHT)) #output resolution
        four_cc = cv2.VideoWriter_fourcc(*"MJPG")
        out = cv2.VideoWriter(self.out_file, four_cc, 60, (x_shape, y_shape))

        while True: #as long as you have frames in video
            start_time = time() #timer
            ret, frame = player.read() #load frame from video
            if not ret:
                break
            results = self.score_frame(frame) #get results
            frame = self.plot_boxes(results, frame) #plot boxes
            
            # Display the frame with bounding boxes and labels in real-time
            cv2.imshow('Object Detection', frame)
            cv2.waitKey(1)  # Wait for a key event (1 millisecond delay)

            # Check for 'q' key press to exit the video processing
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

            end_time = time()
            fps = 1/np.round(end_time-start_time, 3) #calculate fps
            print(f"FPS:{fps}")
            print(x_shape, y_shape)
            out.write(frame)
        # Release the video capture and close the window
    cv2.destroyAllWindows()


#create new obj and execute
#give video url and output file name

detection = ObjectDetection("/home/shashank/Downloads/outdoor_day1_data.bag", "Webcam", "video_t7.avi")
detection()
#choose between 'Local', 'Webcam' and 'YT' for input
#either give URL or path for YT and Local respectively




Using cache found in /home/shashank/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2023-5-20 Python-3.8.10 torch-2.0.1+cu117 CUDA:0 (Quadro T1000 with Max-Q Design, 3912MiB)

Fusing layers... 
YOLOv5n summary: 213 layers, 1867405 parameters, 0 gradients
Adding AutoShape... 


[31m[1mrequirements:[0m /home/shashank/.local/lib/python3.8/site-packages/requirements.txt not found, check failed.

 
Device Used (if its not cuda gl) cuda
Opening Webcam
FPS:1.5552099533437014
1280 720
FPS:43.47826086956522
1280 720
FPS:10.416666666666666
1280 720
FPS:10.204081632653061
1280 720
FPS:12.345679012345679
1280 720
FPS:10.526315789473685
1280 720
FPS:13.333333333333334
1280 720
FPS:10.416666666666666
1280 720
FPS:10.416666666666666
1280 720
FPS:11.76470588235294
1280 720
FPS:10.638297872340425
1280 720
FPS:12.345679012345679
1280 720
FPS:13.333333333333334
1280 720
FPS:11.904761904761903
1280 720
FPS:13.698630136986303
1280 720
FPS:11.49425287356322
1280 720
FPS:11.11111111111111
1280 720
FPS:11.904761904761903
1280 720
FPS:10.526315789473685
1280 720
FPS:12.820512820512821
1280 720
FPS:10.1010101010101
1280 720
FPS:11.363636363636365
1280 720
FPS:9.900990099009901
1280 720
FPS:11.904761904761903
1280 720
FPS:14.285714285714285
1280 720
FPS:9.523809523809524
1280 720
F

KeyboardInterrupt: 