In [None]:
# !pip install torch==2.2.2 torchvision==0.17.2 torchaudio==2.2.2 --index-url https://download.pytorch.org/whl/cpu
import ultralytics
from ultralytics import YOLO
ultralytics.checks()
import cv2
import matplotlib.pyplot as plt
from collections import defaultdict
import numpy as np

## Object detection and trail using yolov8n

In [None]:
# Load the YOLOv8 model
model = YOLO("yolov8n.pt")

# Open the video file
video_path = "videoplayback.mp4"
cap = cv2.VideoCapture(video_path)

# Store the track history
track_history = defaultdict(list)

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()

    if success:
        # Run YOLOv8 inference on the frame
        results = model.track(frame,conf=0.4,persist=True,agnostic_nms=True,classes=[0])
        annotated_frame = results[0].plot() # Visualize the results on the frame
        try:
            ids= results[0].boxes.id.int().cpu().tolist()
            boxes= results[0].boxes.xywh.cpu()

            for box, id in zip(boxes,ids):
                x,y,w,h=box
                track=track_history[id]
                track.append((float(x),float(y)))  # x, y center point
                points=np.hstack(track).astype(np.int32).reshape((-1,1,2))
                if len(points)>30:
                    points=points[20:]            
                cv2.polylines(annotated_frame,[points],isClosed=False,thickness=10,color=(240,240,240))
        except: pass
        # Display the annotated frame
        cv2.imshow("YOLOv8 Inference", annotated_frame)
        
        
        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        # Break the loop if the end of the video is reached
        break

# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()

## Object tracking using deepSORT

In [2]:
from deep_sort import nn_matching
from deep_sort.detection import Detection
from deep_sort.tracker import Tracker
import cv2 
from ultralytics import YOLO
import numpy as np
import matplotlib.pyplot as plt
from tools import generate_detections_ as gdet

In [None]:
model_d='mars-small128.pb'
metric=nn_matching.NearestNeighborDistanceMetric(metric='cosine',matching_threshold=0.4)
tracker=Tracker(metric)
encoder = gdet.create_box_encoder(model_d, batch_size=1)

# Load the YOLOv8 model
model = YOLO("yolov8n.pt")

# Open the capeo file
video_path = "videoplayback.mp4"
cap = cv2.VideoCapture(video_path)

while cap.isOpened():
    success, frame= cap.read()

    if success:
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))

        results=model.track(frame,conf=0.4,persist=True,agnostic_nms=True,show=True,classes=[0])
        try:
            ids= results[0].boxes.id.int().cpu().tolist()
            boxes= results[0].boxes
            for box, id in zip(boxes,ids):
                cls=results[0].names[int(box.cls.item())]
                x,y,w,h=np.array(box.xywh[0])
                prob=box.conf
                print(cls)
                features=encoder(frame,boxes)
                detections = [Detection(bbox, score, class_name, feature) for bbox, score, class_name, feature in 
                              zip(boxes, prob, cls, features)]
            break
        except:
             break

        

        if cv2.waitKey(1) & 0xFF == ord("q"):
                break
    else:
        print('Failed!!')
    

cap.release()
cv2.destroyAllWindows()

In [3]:
from PIL import Image
import cv2 
import ultralytics
from ultralytics import YOLO
import numpy as np
import matplotlib.pyplot as plt
from tracker import Tracker
import random

In [None]:
# Load the YOLOv8 model
detection_model = YOLO("yolov8n.pt")
pose_model=YOLO('yolov8n-pose.pt')

# Open the capeo file
video_path = "videoplayback.mp4"
cap = cv2.VideoCapture(video_path)

tracker=Tracker()

colors=[(random.randint(0,255),random.randint(0,255),random.randint(0,255)) for j in range(10) ]

while cap.isOpened():
     success, frame= cap.read()
     if success:
          results=detection_model(frame,conf=0.3,classes=[0])
          pose_results=pose_model(frame,conf=0.3,classes=[0])
          detections=[]
          try:
             for result,pose in zip(results,pose_results):
               for r in result.boxes.data.tolist():
                    x1,y1,x2,y2,probs,cls=r
                    x1=int(x1)
                    y1=int(y1)
                    x2=int(x2)
                    y2=int(y2)
                    cls=int(cls)
                    detections.append([x1,y1,x2,y2,probs])
               tracker.update(frame,detections)
               for track in tracker.tracks:
                    x1,y1,x2,y2=track.bbox
                    id=track.track_id
                    cv2.rectangle(frame,(int(x1) , int(y1)),(int(x2) , int(y2)), 
                                  (colors[id % len(colors)]),3)
          except Exception as e:
               cap.release()
               cv2.destroyAllWindows()
               raise e

          cv2.imshow("YOLOv8 Inference", frame)
          #success,frame=cap.read()

          if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
     else:
          print('Failed!!')
cap.release()
cv2.destroyAllWindows()

In [None]:
# Load the YOLOv8 model
detection_model = YOLO("yolov8n.pt")
pose_model=YOLO('yolov8n-pose.pt')


In [None]:
t=np.array([[4.1655e+02, 3.8043e+02, 9.2585e-01],
        [4.2234e+02, 3.7166e+02, 6.4439e-01],
        [4.0699e+02, 3.7212e+02, 9.2182e-01],
        [0.0000e+00, 0.0000e+00, 3.3340e-01],
        [3.8414e+02, 3.7460e+02, 8.9738e-01],
        [4.1617e+02, 4.3102e+02, 9.6508e-01],
        [3.5874e+02, 4.3142e+02, 9.9606e-01],
        [4.1607e+02, 5.1359e+02, 8.0344e-01],
        [3.4776e+02, 5.0876e+02, 9.8815e-01],
        [4.3893e+02, 5.3288e+02, 7.5023e-01],
        [4.1230e+02, 5.0540e+02, 9.6717e-01],
        [4.0255e+02, 5.7523e+02, 9.8760e-01],
        [3.7483e+02, 5.7878e+02, 9.9580e-01],
        [3.7855e+02, 6.8531e+02, 9.7513e-01],
        [4.1787e+02, 6.9362e+02, 9.9192e-01],
        [3.2192e+02, 7.7031e+02, 9.3976e-01],
        [4.2578e+02, 7.8576e+02, 9.7017e-01]])
def estimate_threat(kypts):
    kypts=kypts.tolist()
    kypts=[[x,y,p] if p>0.6 else [0,0,p] for x,y,p in kypts]
    re,le=kypts[2][:2],kypts[1][:2]
    rs,ls=kypts[6][:2],kypts[5][:2]
    rw,lw=kypts[10][:2],kypts[9][:2]
    avg_e=(np.mean((re[0],le[0])),np.mean((re[1],le[1])))
    avg_s=(np.mean((rs[0],ls[0])),np.mean((rs[1],ls[1])))
    print(avg_e,avg_s,rw,lw)
    if avg_s[1]!=0:
        max_wrist_coord=np.max((rw[1],lw[1]))
        if max_wrist_coord!=0 and max_wrist_coord<avg_s[1]:            
            print('threat')
            return True
        else:
            return False
    else:
        return False
    
estimate_threat(t)

In [None]:
np.mean((2,3.9))

In [None]:
# Open the capeo file
video_path = "videoplayback.mp4"
cap = cv2.VideoCapture(0)
while cap.isOpened():
     success, frame= cap.read()
     if success:
          results=pose_model(frame,conf=0.3,classes=[0],show=True)
          pose=[]
          try:
             for result in results:
                   for r in result.keypoints.data:
                         threat=estimate_threat(r)
                         print(threat)                       
          except Exception as e:
               cap.release()
               cv2.destroyAllWindows()
               raise e

          # cv2.imshow("YOLOv8 Inference", frame)
          
          if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
     else:
          print('Failed!!')
cap.release()
cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()

In [None]:
vector1 = np.array([0, 0])
vector2 = np.array([0, 0])

angle_between(vector1, vector2)

In [None]:
relative_pos(174.7609405517578,174.7609405517578,364.7063293457031)

In [None]:
# def relative_pos(o,p2,p3):
#     p2=np.array(o)-np.array(p2)
#     p3=np.array(o)-np.array(p3)
#     return (p2,p3)

# def angle_between(v1, v2):
#   cos_theta = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
#   return np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))

# def estimate_threat(kypts):
#     flag=False
#     kypts=kypts.tolist()
#     kypts=[[x,y,p] if p>0.4 else [0,0,p] for x,y,p in kypts]
#     rs,ls=kypts[6][:2],kypts[5][:2] #shoulders c,y coord
#     rw,lw=kypts[10][:2],kypts[9][:2] #wrist x,y coord
#     rel,lel=kypts[8][:2],kypts[7][:2]
#     avg_s=(np.mean((rs[0],ls[0])),np.mean((rs[1],ls[1])))
#     avg_el=(np.mean((rel[0],lel[0])),np.mean((rel[1],lel[1])))
#     if (avg_el[1]!=0)&(avg_s[1]!=0):
#         elb_y=[rel[1],lel[1]]
#         wrist_y=[rw[1],lw[1]]
#         min_wrist_coord=min(wrist_y)
#         min_ind=wrist_y.index(min_wrist_coord)
#         max_wrist_coord=max(wrist_y)
#         max_ind=wrist_y.index(max_wrist_coord)
#         angle=0
#         if max_wrist_coord!=0:
#             if (elb_y[min_ind]!=0)&(min_wrist_coord!=0) | (elb_y[max_ind]!=0)&(max_wrist_coord!=0):
#                 if min_wrist_coord<=elb_y[min_ind]:
#                     if min_ind==0:
#                         p1,q1=relative_pos(rel,rw,avg_s)
#                         angle=angle_between(q1,p1)
#                     else:
#                         p1,q1=relative_pos(lel,lw,avg_s)
#                         angle=angle_between(q1,p1)
#                     if (angle>90):
#                         print('threat1',angle, wrist_y, elb_y, min_ind)
#                         flag=True
#                         return flag
#                 if max_wrist_coord<=elb_y[max_ind]:
#                     if max_ind==0:
#                         p1,q1=relative_pos(rel,rw,avg_s)
#                         angle=angle_between(q1,p1)
#                     else:
#                         p1,q1=relative_pos(lel,lw,avg_s)
#                         angle=angle_between(q1,p1)
#                     if (angle>90):
#                         print('threat2',angle, wrist_y, elb_y, max_ind)
#                         flag=True
#                         return flag  
#     return flag

# '''threat2 97.86680876173493 [0, 454.80377197265625] [0, 457.8333435058594] 1'''

In [None]:
'''
Function to create a local coordinate system
'''
def relative_pos(o,p2,p3):
    p2=np.array(o)-np.array(p2)
    p3=np.array(o)-np.array(p3)
    return (p2,p3)

'''
Function to calculate angle between two vectors
'''
def angle_between(v1, v2):
  cos_theta = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
  return np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))

'''
Function to estimate whether a person is a threat or not
based on location of his joints
'''
def estimate_threat(kypts):
    flag=False
    kypts=[[x,y] if p>0.5 else [0,0] for x,y,p in kypts]
    rs,ls=kypts[6],kypts[5] #shoulders c,y coord
    rw,lw=kypts[10],kypts[9] #wrist x,y coord
    rel,lel=kypts[8],kypts[7] #elbow x,y coord
    
    '''
    Check if any of the wrist is raised above the elbow level and the angle 
    formed between the shoulder and hand of the respective hand is greater than 90
    then we may consider it as a threating gesture.
    '''
    if (rel!=[0,0]) & (rs!=[0,0]) & (rw!=[0,0]):
        if rw[1]<=rel[1]:
            p1,q1=relative_pos(rel,rw,rs)
            angle=angle_between(q1,p1)
            if angle>90:
                flag=True
                return flag
    if (lel!=[0,0]) & (ls!=[0,0]) & (lw!=[0,0]):
        if lw[1]<=lel[1]:
            p1,q1=relative_pos(lel,lw,ls)
            angle=angle_between(q1,p1)
            if angle>90:
                flag=True
                return flag
    return flag


In [None]:
# Load the YOLOv8-pose model for object detection & gathering coordinate data of joints
pose_model=YOLO('yolov8n-pose.pt')

# Create a tracker object of DeepSORT
tracker=Tracker()

# List of random colors (BGR)
colors=[(random.randint(0,255),random.randint(0,255),random.randint(0,200)) for j in range(10)]

# Load input file
video_path = "robbery2.mp4"
cap = cv2.VideoCapture(0)

while cap.isOpened():
     success, frame= cap.read()
     if success:
          results=pose_model(frame,conf=0.2,classes=[0],show=False)
          detections=[]
          threats=[]
          try:
             for result in results:
               for r,pr in zip(result.boxes.data.tolist(),result.keypoints.data.tolist()):
                    x1,y1,x2,y2,probs,cls=r
                    x1=int(x1)
                    y1=int(y1)
                    x2=int(x2)
                    y2=int(y2)
                    cls=int(cls)
                    detections.append([x1,y1,x2,y2,probs])
                    threats.append(estimate_threat(pr))
               tracker.update(frame,detections)
               for track,threat in zip(tracker.tracks,threats):
                    x1,y1,x2,y2=track.bbox
                    id=track.track_id
                    font = cv2.FONT_HERSHEY_SIMPLEX
                    if threat:
                         cv2.rectangle(frame,(int(x1) , int(y1)),(int(x2) , int(y2)), 
                                  (0,0,255),3)
                         cv2.putText(frame,'THREAT',(int(x1) , int(y1)),font,1,(0,0,255),3,cv2.LINE_AA)
                    else:
                         cv2.rectangle(frame,(int(x1) , int(y1)),(int(x2) , int(y2)), 
                                  (colors[id % len(colors)]),3)
                         cv2.putText(frame,'SAFE',(int(x1) , int(y1)),font,1,(0,255,0),3,cv2.LINE_AA)
          except Exception as e:
               cap.release()
               cv2.destroyAllWindows()
               raise e

          cv2.imshow("YOLOv8 Inference", frame)

          if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
     else:
          print('Failed!!')
cap.release()
cv2.destroyAllWindows()

In [7]:
from PIL import Image
import cv2 
import ultralytics
from ultralytics import YOLO
import numpy as np
import matplotlib.pyplot as plt
from tracker import Tracker
import random

'''
Function to create a local coordinate system
'''
def relative_pos(o,p2,p3):
    p2=np.array(o)-np.array(p2)
    p3=np.array(o)-np.array(p3)
    return (p2,p3)

'''
Function to calculate angle between two vectors
'''
def angle_between(v1, v2):
  cos_theta = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
  return np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))

'''
Function to estimate whether a person is a threat or not
based on location of his joints
'''
def estimate_threat(kypts):
    flag=False
    kypts=[[x,y] if p>0.5 else [0,0] for x,y,p in kypts]
    rs,ls=kypts[6],kypts[5] #shoulders c,y coord
    rw,lw=kypts[10],kypts[9] #wrist x,y coord
    rel,lel=kypts[8],kypts[7] #elbow x,y coord
    
    '''
    Check if any of the wrist is raised above the elbow level and the angle 
    formed between the shoulder and hand of the respective hand is greater than 90
    then we may consider it as a threating gesture.
    '''
    if (rel!=[0,0]) & (rs!=[0,0]) & (rw!=[0,0]):
        if rw[1]<=rel[1]:
            p1,q1=relative_pos(rel,rw,rs)
            angle=angle_between(q1,p1)
            if angle>90:
                flag=True
                return flag
    if (lel!=[0,0]) & (ls!=[0,0]) & (lw!=[0,0]):
        if lw[1]<=lel[1]:
            p1,q1=relative_pos(lel,lw,ls)
            angle=angle_between(q1,p1)
            if angle>90:
                flag=True
                return flag
    return flag


# Load the YOLOv8-pose model for object detection & gathering coordinate data of joints
pose_model=YOLO('yolov8n-pose.pt')

# Create a tracker object of DeepSORT
tracker=Tracker()

# List of random colors (BGR)
colors=[(random.randint(0,255),random.randint(0,255),random.randint(0,200)) for j in range(10)]

# Load input file
cap = cv2.VideoCapture(0)
success, frame= cap.read()

while cap.isOpened():
     success, frame= cap.read()
     if success:
          results=pose_model(frame,conf=0.2,classes=[0],show=False)
          detections=[]
          font = cv2.FONT_HERSHEY_SIMPLEX
          try:
             for result in results:
               for r,pr in zip(result.boxes.data.tolist(),result.keypoints.data.tolist()):
                    x1,y1,x2,y2,probs,cls=r
                    x1=int(x1)
                    y1=int(y1)
                    x2=int(x2)
                    y2=int(y2)
                    cls=int(cls)
                    detections.append([x1,y1,x2,y2,probs])
                    threat=estimate_threat(pr)
                    if threat:
                        cv2.putText(frame,'THREAT',(int(x1) , int(y1)),font,1,(0,0,255),3,cv2.LINE_AA)
                    else:
                        cv2.putText(frame,'SAFE',(int(x1) , int(y1)),font,1,(0,255,0),3,cv2.LINE_AA)

               tracker.update(frame,detections)
               for track in tracker.tracks:
                    x1,y1,x2,y2=track.bbox
                    id=track.track_id
                    cv2.rectangle(frame,(int(x1) , int(y1)),(int(x2) , int(y2)), 
                            (colors[id % len(colors)]),3)
                         
          except Exception as e:
               cap.release()
               cv2.destroyAllWindows()
               raise e
           
          cv2.imshow("YOLOv8 Inference", frame)

          if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
     else:
          print('Failed!!')
cap.release()
cv2.destroyAllWindows()


0: 480x640 1 person, 227.1ms
Speed: 5.0ms preprocess, 227.1ms inference, 15.6ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 271.7ms
Speed: 0.0ms preprocess, 271.7ms inference, 4.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 226.7ms
Speed: 0.0ms preprocess, 226.7ms inference, 2.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 231.2ms
Speed: 0.0ms preprocess, 231.2ms inference, 3.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 243.6ms
Speed: 2.5ms preprocess, 243.6ms inference, 8.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 308.0ms
Speed: 6.1ms preprocess, 308.0ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 268.4ms
Speed: 4.0ms preprocess, 268.4ms inference, 2.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 234.3ms
Speed: 5.3ms preprocess, 234.3ms inference, 3.0ms postprocess per image a

In [None]:
'''
Function to create a local coordinate system
'''
def relative_pos(o,p2,p3):
    p2=np.array(o)-np.array(p2)
    p3=np.array(o)-np.array(p3)
    return (p2,p3)

'''
Function to calculate angle between two vectors
'''
def angle_between(v1, v2):
  cos_theta = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
  return np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))

'''
Function to estimate whether a person is a threat or not
based on location of his joints
'''
def estimate_threat(kypts):
    flag=False
    kypts=[[x,y] if p>0.5 else [0,0] for x,y,p in kypts]
    rs,ls=kypts[6],kypts[5] #shoulders c,y coord
    rw,lw=kypts[10],kypts[9] #wrist x,y coord
    rel,lel=kypts[8],kypts[7] #elbow x,y coord
    
    '''
    Check if any of the wrist is raised above the elbow level and the angle 
    formed between the shoulder and hand of the respective hand is greater than 90
    then we may consider it as a threating gesture.
    '''
    if (rel!=[0,0]) & (rs!=[0,0]) & (rw!=[0,0]):
        if rw[1]<=rel[1]:
            p1,q1=relative_pos(rel,rw,rs)
            angle=angle_between(q1,p1)
            if angle>90:
                flag=True
                return flag
    if (lel!=[0,0]) & (ls!=[0,0]) & (lw!=[0,0]):
        if lw[1]<=lel[1]:
            p1,q1=relative_pos(lel,lw,ls)
            angle=angle_between(q1,p1)
            if angle>90:
                flag=True
                return flag
    return flag


# Load the YOLOv8-pose model for object detection & gathering coordinate data of joints
pose_model=YOLO('yolov8n-pose.pt')

# Create a tracker object of DeepSORT
tracker=Tracker()

# List of random colors (BGR)
colors=[(random.randint(0,255),random.randint(0,255),random.randint(0,200)) for j in range(10)]

# Load input file
success, frame= cap.read()
video_path = "robbery2.mp4"
cap = cv2.VideoCapture(video_path)
success, frame= cap.read()
sav = cv2.VideoWriter('videoplayback_out.mp4',  
                         cv2.VideoWriter_fourcc(*'MP4V'), 
                         cap.get(cv2.CAP_PROP_FPS),(frame.shape[1],frame.shape[0]))

while cap.isOpened():
     success, frame= cap.read()
     if success:
          results=pose_model(frame,conf=0.2,classes=[0],show=False)
          detections=[]
          font = cv2.FONT_HERSHEY_SIMPLEX
          try:
             for result in results:
               for r,pr in zip(result.boxes.data.tolist(),result.keypoints.data.tolist()):
                    x1,y1,x2,y2,probs,cls=r
                    x1=int(x1)
                    y1=int(y1)
                    x2=int(x2)
                    y2=int(y2)
                    cls=int(cls)
                    detections.append([x1,y1,x2,y2,probs])
                    threat=estimate_threat(pr)
                    if threat:
                        cv2.putText(frame,'THREAT',(int(x1) , int(y1)),font,0.5,(0,0,255),3,cv2.LINE_AA)
                    else:
                        cv2.putText(frame,'SAFE',(int(x1) , int(y1)),font,0.5,(0,255,0),3,cv2.LINE_AA)

               tracker.update(frame,detections)
               for track in tracker.tracks:
                    x1,y1,x2,y2=track.bbox
                    id=track.track_id
                    cv2.rectangle(frame,(int(x1) , int(y1)),(int(x2) , int(y2)), 
                            (colors[id % len(colors)]),3)
                         
          except Exception as e:
               cap.release()
               cv2.destroyAllWindows()
               raise e
          
          sav.write(frame) 

          if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
     else:
          print('Failed!!')
cap.release()
sav.release()
cv2.destroyAllWindows()

In [None]:
cap.release()
sav.release()
cv2.destroyAllWindows()