In [None]:
import pandas as pd
from PIL import Image, ImageDraw
from pathlib import Path
import os
import numpy as np
import matplotlib.pyplot as plt
import cv2 as cv

In [None]:
def get_frame_from_video(frame, video):
    frame = frame - 1
    !ffmpeg \
        -hide_banner \
        -loglevel fatal \
        -nostats \
        -i $video -vf "select=eq(n\,$frame)" -vframes 1 frame.png
    img = Image.open('frame.png')
    os.remove('frame.png')
    return img

def annotate_frame(img, xc, yc, r, col = (57, 255, 20)):
    draw = ImageDraw.Draw(img)
    for x, y in zip(xc, yc):
        draw.ellipse((x-r, y-r, x+r, y+r), fill=col, outline='black')
    return img

# code from: https://www.kaggle.com/robikscube/nfl-helmet-assignment-getting-started-guide
def add_track_features(tracks, fps=59.94, snap_frame=10):
    """
    Add column features helpful for syncing with video data.
    """
    tracks = tracks.copy()
    tracks["game_play"] = (
        tracks["gameKey"].astype("str")
        + "_"
        + tracks["playID"].astype("str").str.zfill(6)
    )
    tracks["time"] = pd.to_datetime(tracks["time"])
    snap_dict = (
        tracks.query('event == "ball_snap"')
        .groupby("game_play")["time"]
        .first()
        .to_dict()
    )
    tracks["snap"] = tracks["game_play"].map(snap_dict)
    tracks["isSnap"] = tracks["snap"] == tracks["time"]
    tracks["team"] = tracks["player"].str[0].replace("H", "Home").replace("V", "Away")
    tracks["snap_offset"] = (tracks["time"] - tracks["snap"]).astype(
        "timedelta64[ms]"
    ) / 1_000
    # Estimated video frame
    tracks["est_frame"] = (
        ((tracks["snap_offset"] * fps) + snap_frame).round().astype("int")
    )
    return tracks

def add_video_features(videos):
    videos['game_play'] = videos['video_frame'].apply(lambda x: '_'.join(x.split('_')[:2]))
    videos['camera'] = videos['video_frame'].apply(lambda x: x.split('_')[2])
    videos['frame'] = videos['video_frame'].apply(lambda x: x.split('_')[-1])
    videos['xc'] = (videos['left'] + videos['width']/2).astype(int).values
    videos['yc'] = (videos['top'] + videos['height']/2).astype(int).values
    return videos


def annotate_field(xc, yc, player, r = 10, width = 3, col = [(27, 3, 163), (255, 7, 58)], crop = None):
    field = Image.open('../input/nflhelmet-helper-dataset/field.png')
    w, h = field.size
    zero = (68,68)
    fs = (2424,1100)
    draw = ImageDraw.Draw(field)
    xc, yc = xc*fs[0]/120 + zero[0], (1 - yc/53.3)*fs[1] + zero[1]
    for x, y, p in zip(xc, yc, player):
        c = col[0] if p[0] == 'H' else col[1]
        draw.ellipse((x-r, y-r, x+r, y+r), fill=c, width=width, outline = 'black')
    if isinstance(crop, float):
#         cp = [xc.min() - crop*w, yc.min() - crop*h, xc.max() + crop*w, yc.max() + crop*h]
        cp = [xc.min() - crop*w, 0, xc.max() + crop*2*w, h]
        return field.crop(cp)
    else:
        return field
    
    
class show_play_with_tracking():
    
    def __init__(self, video_df = None, track_df = None):
        if video_df is None:
            video_df = pd.read_csv('../input/nfl-health-and-safety-helmet-assignment/train_baseline_helmets.csv')
        self.video_df = add_video_features(video_df)
        if track_df is None:
            tracking_df = pd.read_csv('../input/nfl-health-and-safety-helmet-assignment/train_player_tracking.csv')
            tracking_df = add_track_features(tracking_df)
        self.tracking_df = tracking_df.query("est_frame > 0")
       
    def __call__(self, game_play, frame, img_size = 800, video_folder = '../input/nfl-health-and-safety-helmet-assignment/train/'):
        
        camera = 'Sideline'
        frame_side = get_frame_from_video(frame, video_folder + game_play + '_' + camera + '.mp4')
        df = self.video_df.query(f"game_play == '{game_play}' and frame == '{frame}' and camera == '{camera}'")
        frame_side = annotate_frame(frame_side, df.xc, df.yc, 10)

        camera = 'Endzone'
        frame_end = get_frame_from_video(frame, video_folder + game_play + '_' + camera + '.mp4')
        df = self.video_df.query(f"game_play == '{game_play}' and frame == '{frame}' and camera == '{camera}'")
        frame_end = annotate_frame(frame_end, df.xc, df.yc, 10)

        frames = self.tracking_df['est_frame'].values
        if frame not in frames:
            index = np.absolute(frames-frame).argmin()
            frame = frames[index]
        df = self.tracking_df.query(f"game_play == '{game_play}' and est_frame == {frame}")
        field = annotate_field(df.x, df.y, df.player, 20, crop = 0.01)


        wf, hf = field.size
        wc, hc = frame_side.size
        field = field.resize((int(wf*2*hc/hf), 2*hc))
        wf, hf = field.size

        img = Image.new('RGB', (wf+wc+20, 2*hc+20))
        img.paste(im=field, box=(5, 10))
        img.paste(im=frame_side, box=(wf+15, 5))
        img.paste(im=frame_end, box=(wf+15, hc+15))
        img.thumbnail((img_size,img_size))
        return img
    
spwt = show_play_with_tracking()

# TODO, add interpolation of tracking_df and replace nearest
class get_keypoints():
    
    def __init__(self, video_df = None, track_df = None):
        if video_df is None:
            video_df = pd.read_csv('../input/nfl-health-and-safety-helmet-assignment/train_baseline_helmets.csv')
        self.video_df = add_video_features(video_df)
        if track_df is None:
            tracking_df = pd.read_csv('../input/nfl-health-and-safety-helmet-assignment/train_player_tracking.csv')
            tracking_df = add_track_features(tracking_df)
        self.tracking_df = tracking_df.query("est_frame > 0")
            
    def __call__(self, game_play, frame, normalized = True):
        keypoints = dict()
        keypoints['Sideline'] = self.video_df.query(
            f"game_play == '{game_play}' and frame == '{frame}' and camera == 'Sideline'")[['xc', 'yc']].values
        keypoints['Endzone'] = self.video_df.query(
            f"game_play == '{game_play}' and frame == '{frame}' and camera == 'Endzone'")[['xc', 'yc']].values
        
        frames = self.tracking_df['est_frame'].values
        if frame not in frames:
            index = np.absolute(frames-frame).argmin()
            frame = frames[index]
        keypoints['Tracking'] = self.tracking_df.query(
            f"game_play == '{game_play}' and est_frame == {frame}")[['x', 'y']].values
    
        if normalized:
            for k, v in keypoints.items():
                keypoints[k] = (v - v.min(axis = 0)) / (v.max(axis = 0) - v.min(axis = 0))
                
        keypoints['Sideline'][:,1] = 1-keypoints['Sideline'][:,1]
                
        self.keypoints = keypoints
            
        return keypoints
    
    def plot(self, add_no = False):
        if not hasattr(self, 'keypoints'):
            print('you must run the function first...')
        else:
            kp = self.keypoints
            plt.figure(figsize=(6, 6))
            plt.scatter(kp['Endzone'][:,0], kp['Endzone'][:,1], marker = 'x', color = 'red', label = 'Endzone')
            plt.scatter(kp['Sideline'][:,0], kp['Sideline'][:,1], marker = '^', color = 'red', label = 'Sideline')
            plt.scatter(kp['Tracking'][:,0], kp['Tracking'][:,1], marker = 'o', color = 'green', label = 'Tracking')  
            plt.legend();
    
get_kp = get_keypoints()

In [None]:
spwt('57583_000082', 10)

In [None]:
videoframes

What I wanted to do is to map each player from tracking data to the camera data like this:

<img src="https://media.discordapp.net/attachments/874736660103962726/878082285667237898/unknown.png?width=911&height=702" width=800 height=800 />


The same data can be normalized and viewed on the same plot:

In [None]:
k = get_kp('57583_000082', 12)
get_kp.plot(True)

# Point Cloud Registration

So, a good portion of this comp comes down to point-cloud registration. There are plenty of methods that perform this taks. For example, OpenCV have `cv.findHomography`. As you can see below, this doesn't perform well. The main reason is that although the function uses Random sample consensus (RANSAC) to filter outliers, the clouds of points must be roughly alinged already.

In [None]:
srcPoints = k['Tracking'].astype('float32').reshape(-1,1,2)
dstPoints = k['Sideline'].astype('float32').reshape(-1,1,2)
M, mask = cv.findHomography(srcPoints, dstPoints, cv.RANSAC)
print(mask.sum())
tfmdPoints = cv.perspectiveTransform(srcPoints,M)
plt.scatter(srcPoints[:,0,0], srcPoints[:,0,1], marker = 'o', color = 'red', label = 'source')
plt.scatter(dstPoints[:,0,0], dstPoints[:,0,1], marker = '^', color = 'green', label = 'target')  
plt.scatter(tfmdPoints[:,0,0], tfmdPoints[:,0,1], marker = 'o', color = 'blue', label = 'result')
plt.legend();

# Pure Pytorch implementation (using gradient descent)

So, isntead of using open CV, I decided to do this my won using pytorch. The idea here is to solve the folowing problem:

\begin{equation}
\begin{bmatrix} x^{'} \\ y^{'} \\ 1 \end{bmatrix} = H \begin{bmatrix} x \\ y \\ 1 \end{bmatrix} = \begin{bmatrix} h_{11} & h_{12} & h_{13} \\ h_{21} & h_{22} & h_{23} \\ h_{31} & h_{32} & h_{33} \end{bmatrix} \begin{bmatrix} x \\ y \\ 1 \end{bmatrix}
\end{equation}

where $H$ is a transformation matrix (that I want to find) and $(x', y')$ is the transformed coorinates of a point $(x, y)$.

The optimization problem is to find $H$ such as $(x', y')$ is as close as possible to some ground truth $(x_t, y_t)$ cloud of points.

In [None]:
import torch

In [None]:
def min_mse(preds, targets):
    d = torch.cdist(preds.squeeze(2), targets.squeeze(2))
    loss = (d.min(dim = 1).values**2).mean().sqrt()
    return loss

In [None]:
def step(src, trg, m, lr = 3e-3, prt = True):
    preds = torch.matmul(m, src) # Homography transform
    loss = min_mse(preds, trg)   # mse between the closes pair of points
    if prt: print(f'loss: {(loss.item()):.5f}')
    loss.backward()
    m.data -= lr * m.grad.data
    m.grad = None

In [None]:
def fit_predict(src, trg, init_rot = 0, lr = 3e-3, n_steps = 10000, verbose = True):
    t = np.pi * init_rot / 180
    m = torch.tensor([[np.cos(t),-np.sin(t), 0],
                      [np.sin(t), np.cos(t), 0],
                      [        0,         0, 1]], dtype = torch.double)
    m.requires_grad_()
    for i in range(n_steps): 
        if i % (n_steps//10) and verbose:
            step(src, trg, m, lr=lr, prt=False)
        else:
            step(src, trg, m, lr=lr)
            
    with torch.no_grad():
        tfm = torch.matmul(m, src)
        
    if verbose:
        plt.scatter(src[:,0], src[:,1], marker = 'o', color = 'red', label = 'source')
        plt.scatter(trg[:,0], trg[:,1], marker = '^', color = 'green', label = 'target')  
        plt.scatter(tfm[:,0], tfm[:,1], marker = 'o', color = 'blue', label = 'result')
        plt.legend();
        
    return tfm

In [None]:
k = get_kp('57583_000082', 1, True)
src = torch.cat([torch.tensor(k['Tracking']), torch.ones(len(k['Tracking'])).unsqueeze(1)], axis = -1).unsqueeze(2)
trg = torch.cat([torch.tensor(k['Sideline']), torch.ones(len(k['Sideline'])).unsqueeze(1)], axis = -1).unsqueeze(2)
tfm = fit_predict(src, trg)

In [None]:
k = get_kp('57597_000658', 1, True)
src = torch.cat([torch.tensor(k['Tracking']), torch.ones(len(k['Tracking'])).unsqueeze(1)], axis = -1).unsqueeze(2)
trg = torch.cat([torch.tensor(k['Sideline']), torch.ones(len(k['Sideline'])).unsqueeze(1)], axis = -1).unsqueeze(2)
tfm = fit_predict(src, trg)

In [None]:
k = get_kp('57781_000252', 1, True)
src = torch.cat([torch.tensor(k['Tracking']), torch.ones(len(k['Tracking'])).unsqueeze(1)], axis = -1).unsqueeze(2)
trg = torch.cat([torch.tensor(k['Sideline']), torch.ones(len(k['Sideline'])).unsqueeze(1)], axis = -1).unsqueeze(2)
tfm = fit_predict(src, trg, 0)

In [None]:
spwt('57781_000252', 1)

In [None]:
spwt

In [None]:
video_df = pd.read_csv('../input/nfl-health-and-safety-helmet-assignment/train_baseline_helmets.csv')
video_df = add_video_features(video_df)
video_df.query(f"game_play == '57781_000252' and frame == '1' and camera == 'Sideline'").head(10)

In [None]:
video_df = pd.read_csv('../input/nfl-health-and-safety-helmet-assignment/train_baseline_helmets.csv')
video_df = video_df.query('conf > 0.8')
video_df = add_video_features(video_df)
get_kp_highconf = get_keypoints(video_df)

In [None]:
k = get_kp_highconf('57781_000252', 1, True)
src = torch.cat([torch.tensor(k['Tracking']), torch.ones(len(k['Tracking'])).unsqueeze(1)], axis = -1).unsqueeze(2)
trg = torch.cat([torch.tensor(k['Sideline']), torch.ones(len(k['Sideline'])).unsqueeze(1)], axis = -1).unsqueeze(2)
tfm = fit_predict(src, trg, 0)

In [None]:
k = get_kp('57583_000082', 1, True)
src = torch.cat([torch.tensor(k['Tracking']), torch.ones(len(k['Tracking'])).unsqueeze(1)], axis = -1).unsqueeze(2)
trg = torch.cat([torch.tensor(k['Endzone']), torch.ones(len(k['Endzone'])).unsqueeze(1)], axis = -1).unsqueeze(2)
print(src.shape, trg.shape)
tfm = fit_predict(src, trg)

In [None]:
k = get_kp('57583_000082', 1, True)
src = torch.cat([torch.tensor(k['Tracking']), torch.ones(len(k['Tracking'])).unsqueeze(1)], axis = -1).unsqueeze(2)
trg = torch.cat([torch.tensor(k['Endzone']), torch.ones(len(k['Endzone'])).unsqueeze(1)], axis = -1).unsqueeze(2)
print(src.shape, trg.shape)
tfm = fit_predict(src, trg, 90)