In [1]:
import pandas as pd
from numpy.linalg import inv
import numpy as np
import math
import cv2
import torch

from matplotlib import pyplot as plt
%matplotlib inline

# Loading Model

For object detection we are using You Only Look Once (YOLO) v5 with meduim size weight. The model is loaded directly from torch hub with the weights already trained on MS COCO dataset. 

The main concept behind YOLO is passing th full image to a single nueral network and letting the network decide the the regions and predict bounding boxes and probablity for each region. The following approach make it faster than even Fast R-CNN.

Here, we used YOLO to detect the sports ball with standard confidence threshold of 0.25. Then all the detected regions are passed through a non-maximum suppression (NMS) on the boxes according to their intersection-over-union (IoU). We set IoU threshold to 0.45. The model returns the confidence score accompanied by ```(xmin,ymin)``` and ```(xmax,ymax)``` for each bounding box for each frame.

In [2]:
model = torch.hub.load('ultralytics/yolov5', 'yolov5m', verbose=False)
device = torch.device('cuda')
model = model.to(device)

YOLOv5 🚀 2022-11-20 Python-3.9.13 torch-1.13.0+cu117 CUDA:0 (NVIDIA GeForce MX130, 4046MiB)

Fusing layers... 
YOLOv5m summary: 290 layers, 21172173 parameters, 0 gradients
Adding AutoShape... 


# Helper Functions

In [3]:
def draw_prediction(img: np.ndarray,
                    class_name: str,
                    df: pd.core.series.Series,
                    color: tuple = (255, 0, 0)):
    '''
    Function to draw prediction around the bounding box identified by the YOLO
    The Function also displays the confidence score top of the bounding box 
    '''

    cv2.rectangle(img, (int(df.xmin), int(df.ymin)),
                  (int(df.xmax), int(df.ymax)), color, 2)
    cv2.putText(img, class_name + " " + str(round(df.confidence, 2)),
                (int(df.xmin) - 10, int(df.ymin) - 10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    return img

In [4]:
def convert_video_to_frame(path: str):
    '''
    The function take input as video file and returns a list of images for every video
    '''

    cap = cv2.VideoCapture(path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    img = []
    while cap.isOpened():
        ret, frame = cap.read()
        if ret == True:
            img.append(frame)
        else:
            break

    cap.release()
    return img, fps

# Extracting Data

In [5]:
# Converting Video to image frame by frame for a single and multiple ball

img_multi, fps_multi = convert_video_to_frame('./multiple_balls.avi')
img_sin, fps_sin = convert_video_to_frame('./single_ball.mp4')

# Predictions

Running the model on single ball and multiple ball video respectively

Storing the results in form of pandas dataframe

In [6]:
results_sin = model(img_sin)
results_multi = model(img_multi)

df_sin = results_sin.pandas().xyxy
df_multi = results_multi.pandas().xyxy

# Kalman Filter

Below, we init the Kalman filter class. The Kalman filter basically comprises of the following five equations

- State Correction Equation(S)
- Predicting next state(S_pred)
- Kalman Gain Equation(K)
- Estimate Correction Equation(P)
- Predicting next uncertainity(P_pred)

For the following problem, we assume the state to be $[x,x',x'',y,y',y'']$ where $x'$ and $x''$ denotes acceleration and velocity in x direction(same for y). Also, the Kalman filter takes into account the process noise as ```Q``` and measurement uncertainity as ```R``` 

**Note**: We are not updating the state correction equation and estimate uncertainity correction equation in case no ball is detected in the frame. In that case we are predicting the next state on the basis of the previous state in which ball is shown(which would be the case mathematically)

## Custom Kalman Filter Class

In [7]:
class KalmanFilter():
    def __init__(self,
                 xinit: int = 0,
                 yinit: int = 0,
                 fps: int = 30,
                 std_a: float = 0.001,
                 std_x: float = 0.0045,
                 std_y: float = 0.01,
                 cov: float = 100000) -> None:

        # State Matrix
        self.S = np.array([xinit, 0, 0, yinit, 0, 0])
        self.dt = 1 / fps

        # State Transition Model
        # Here, we assume that the model follow Newtonian Kinematics
        self.F = np.array([[1, self.dt, 0.5 * (self.dt * self.dt), 0, 0, 0],
                           [0, 1, self.dt, 0, 0, 0], [0, 0, 1, 0, 0, 0],
                           [0, 0, 0, 1, self.dt, 0.5 * self.dt * self.dt],
                           [0, 0, 0, 0, 1, self.dt], [0, 0, 0, 0, 0, 1]])

        self.std_a = std_a

        # Process Noise
        self.Q = np.array([
            [
                0.25 * self.dt * self.dt * self.dt * self.dt, 0.5 * self.dt *
                self.dt * self.dt, 0.5 * self.dt * self.dt, 0, 0, 0
            ],
            [
                0.5 * self.dt * self.dt * self.dt, self.dt * self.dt, self.dt,
                0, 0, 0
            ], [0.5 * self.dt * self.dt, self.dt, 1, 0, 0, 0],
            [
                0, 0, 0, 0.25 * self.dt * self.dt * self.dt * self.dt,
                0.5 * self.dt * self.dt * self.dt, 0.5 * self.dt * self.dt
            ],
            [
                0, 0, 0, 0.5 * self.dt * self.dt * self.dt, self.dt * self.dt,
                self.dt
            ], [0, 0, 0, 0.5 * self.dt * self.dt, self.dt, 1]
        ]) * self.std_a * self.std_a

        self.std_x = std_x
        self.std_y = std_y

        # Measurement Noise
        self.R = np.array([[self.std_x * self.std_x, 0],
                           [0, self.std_y * self.std_y]])

        self.cov = cov

        # Estimate Uncertainity
        self.P = np.array([[self.cov, 0, 0, 0, 0, 0],
                           [0, self.cov, 0, 0, 0, 0],
                           [0, 0, self.cov, 0, 0, 0],
                           [0, 0, 0, self.cov, 0, 0],
                           [0, 0, 0, 0, self.cov, 0],
                           [0, 0, 0, 0, 0, self.cov]])

        # Observation Matrix
        # Here, we are observing X & Y (0th index and 3rd Index)
        self.H = np.array([[1, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0]])

        self.I = np.array([[1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0],
                           [0, 0, 1, 0, 0, 0], [0, 0, 0, 1, 0, 0],
                           [0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 1]])

        # Predicting the next state and estimate uncertainity
        self.S_pred = None
        self.P_pred = None

        # Kalman Gain
        self.K = None

        # Storing all the State, Kalman Gain and Estimate Uncertainity
        self.S_hist = [self.S]
        self.K_hist = []
        self.P_hist = [self.P]

    def pred_new_state(self):
        self.S_pred = self.F.dot(self.S)

    def pred_next_uncertainity(self):
        self.P_pred = self.F.dot(self.P).dot(self.F.T) + self.Q

    def get_Kalman_gain(self):
        self.K = self.P_pred.dot(self.H.T).dot(
            inv(self.H.dot(self.P_pred).dot(self.H.T) + self.R))
        self.K_hist.append(self.K)

    def state_correction(self, z):
        if z == [None, None]:
            self.S = self.S_pred
        else:
            self.S = self.S_pred + +self.K.dot(z - self.H.dot(self.S_pred))

        self.S_hist.append(self.S)

    def uncertainity_correction(self, z):
        if z != [None, None]:
            self.l1 = self.I - self.K.dot(self.H)
            self.P = self.l1.dot(self.P_pred).dot(self.l1.T) + self.K.dot(
                self.R).dot(self.K.T)
        self.P_hist.append(self.P)

### Init Filters

Initializing Kalman Filters for both single and multi object. In case of multiple objects we have two balls, therefore two filters are initialized

In [8]:
filter_sin = KalmanFilter(fps=fps_sin, xinit=60,
                          yinit=150, std_x=0.000025, std_y=0.0001)

filter_multi = [
    KalmanFilter(fps=fps_multi, xinit=60, yinit=150,
                 std_x=0.000025, std_y=0.0001),
    KalmanFilter(fps=fps_multi, xinit=620, yinit=150,
                 std_x=0.000025, std_y=0.0001)
]

## Kalman Filter for single sports ball

### Running

Running the Kalman filter for single ball video

**Note**: Here Kalman filters predict the center of the bounding box which is equal to center of the ball in most cases

In [9]:
for df in df_sin:
    df = df.loc[df['name'] == 'sports ball']
    x_cen, y_cen = None, None

    if len(df) > 0:
        x_cen = (df.xmin.values[0] + df.xmax.values[0]) / 2
        y_cen = (df.ymin.values[0] + df.ymax.values[0]) / 2

    filter_sin.pred_new_state()
    filter_sin.pred_next_uncertainity()
    filter_sin.get_Kalman_gain()
    filter_sin.state_correction([x_cen, y_cen])
    filter_sin.uncertainity_correction([x_cen, y_cen])

### Visualization

Visualzing the bounding box and Kalman filter prediction. Note that the bounding box has been represented in blue color and Kalman Filter has been represented as a blue dot on the ball

The resulting video is save in file ```single_ball_kalman.avi```

In [19]:
out = cv2.VideoWriter('single_ball_kalman.mp4', cv2.VideoWriter_fourcc(*'mp4v'), 10,
                      (img_sin[0].shape[1], img_sin[0].shape[0]))

for i in range(len(img_sin)):
    x, y = filter_sin.S_hist[i][0], filter_sin.S_hist[i][3]
    df = df_sin[i].loc[df_sin[i]['name'] == 'sports ball']
    tmp_img = img_sin[i]

    for j in df.index.values:
        tmp_img = draw_prediction(tmp_img, 'Ball', df.loc[j])

    tmp_img = cv2.circle(tmp_img, (math.floor(
        filter_sin.S_hist[i][0]), math.floor(filter_sin.S_hist[i][3])),
        radius=1,
        color=(255, 0, 0),
        thickness=3)

    out.write(tmp_img)

out.release()

In [24]:
from IPython.display import Video
Video("single_ball_kalman.mp4")

## Kalman Filter for Multiple Balls (2 balls)

### Running 

Running the Kalman filter for multiple ball video

Here we used a cost function to choose the correct filter for the ball. The cost function here uses the euclidean distance for filter assignment. For multiple balls the below approach can be modified with the help of hungarian algorithm.

**Note**: Here Kalman filters predict the center of the bounding box which is equal to center of the ball in most cases

In [11]:
def cost_fun(a, b):
    '''
    Cost function for filter Assignment
    Uses euclidean distance for choosing the filter
    '''

    sm = 0
    for i in range(len(a)):
        sm += (a[i] - b[i])**2
    return sm

In [12]:
assig = []

for df in df_multi:
    df = df.loc[df['name'] == 'sports ball']
    x_cen, y_cen = [None, None], [None, None]

    for i in df.index.values:
        coord = [(df.at[i, 'xmin'] + df.at[i, 'xmax']) / 2,
                 (df.at[i, 'ymin'] + df.at[i, 'ymax']) / 2]

        if cost_fun([
                filter_multi[0].S_hist[-1][0], filter_multi[0].S_hist[-1][3]
        ], coord) < cost_fun(
            [filter_multi[1].S_hist[-1][0], filter_multi[1].S_hist[-1][3]],
                coord) and x_cen[0] == None and y_cen[0] == None:
            x_cen[0], y_cen[0] = coord[0], coord[1]
            assig.append(0)
        else:
            x_cen[1], y_cen[1] = coord[0], coord[1]
            assig.append(1)

    for i in range(2):
        filter_multi[i].pred_new_state()
        filter_multi[i].pred_next_uncertainity()
        filter_multi[i].get_Kalman_gain()
        filter_multi[i].state_correction([x_cen[i], y_cen[i]])
        filter_multi[i].uncertainity_correction([x_cen[i], y_cen[i]])

### Visualization

Visualzing the bounding box and Kalman filter prediction. Note that the bounding box has been represented in blue color and Kalman Filter has been represented as a blue dot on the ball

The resulting video is save in file ```multiple_balls_kalman.avi```

In [27]:
ind = 0
out = cv2.VideoWriter('multiple_balls_kalman.mp4', cv2.VideoWriter_fourcc(*'mp4v'), 10,
                      (img_multi[0].shape[1], img_multi[0].shape[0]))

for i in range(len(img_multi)):
    tmp_img = img_multi[i]
    df = df_multi[i].loc[df_multi[i]['name'] == 'sports ball']

    tmp_img = cv2.circle(tmp_img, (math.floor(filter_multi[0].S_hist[i][0]),
                                   math.floor(filter_multi[0].S_hist[i][3])),
                         radius=1,
                         color=(255, 0, 0),
                         thickness=3)
    tmp_img = cv2.circle(tmp_img, (math.floor(filter_multi[1].S_hist[i][0]),
                                   math.floor(filter_multi[1].S_hist[i][3])),
                         radius=1,
                         color=(0, 0, 255),
                         thickness=3)

    for j in df.index.values:
        if assig[ind] == 0:
            tmp_img = draw_prediction(tmp_img,
                                      'Ball 1',
                                      df.loc[j],
                                      color=(255, 0, 0))
        else:
            tmp_img = draw_prediction(tmp_img,
                                      'Ball 2',
                                      df.loc[j],
                                      color=(0, 0, 255))
        ind += 1

    out.write(tmp_img)

out.release()

In [28]:
from IPython.display import Video
Video("multiple_balls_kalman.mp4",embed=True)