<a href="https://colab.research.google.com/github/lisaong/hss/blob/master/06_dozing_or_not.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Video Activity Classifier Workshop

In this workshop, we will be training a classifier using body positions extracted from video.

![photo](https://github.com/lisaong/hss/blob/master/assets/istockphoto-476741742.jpg?raw=1)

This follows up from [04_pose_estimation.ipynb](04_pose_estimation.ipynb).

In [1]:
# to run in colab, uncomment this
!git clone https://github.com/nicholashojunhui/hss

Cloning into 'hss'...
remote: Enumerating objects: 773, done.[K
remote: Counting objects: 100% (367/367), done.[K
remote: Compressing objects: 100% (304/304), done.[K
remote: Total 773 (delta 79), reused 72 (delta 29), pack-reused 406 (from 1)[K
Receiving objects: 100% (773/773), 72.57 MiB | 15.60 MiB/s, done.
Resolving deltas: 100% (123/123), done.


In [2]:
# to run in colab, uncomment this
HSS_DIR='/content/hss/PartC_OpenPose'

In [3]:
import json
import matplotlib.pyplot as plt
import pandas as pd
import glob
import os
import sys
import numpy as np
from sklearn.metrics.pairwise import paired_distances

# requires: conda install opencv
import cv2

#plt.style.use('seaborn-white')

In [4]:
# Source: https://github.com/CMU-Perceptual-Computing-Lab/openpose/blob/master/doc/output.md#pose-output-format-coco
Pose_part_pairs = [
    (1,8), (1,2), (1,5), (2,3), (3,4), (5,6), (6,7), (8,9), (9,10), (10,11),
    (8,12), (12,13), (13,14), (1,0), (0,15), (15,17), (0,16), (16,18), (2,17), (5,18),
    (14,19), (19,20), (14,21), (11,22), (22,23), (11,24)
]

def draw_skeleton(ax, df):
    """Connects keypoints into a skeleton"""
    for p, q in Pose_part_pairs:
        if df.x[p] != 0 and df.x[q] != 0 and df.y[p] != 0 and df.y[q] != 0:
            ax.plot([df.x[p], df.x[q]], [df.y[p], df.y[q]], color='red')

def keypoints_to_dataframe(keypoints):
    """Converts a flat keypoints list (x1, y1, c1, x2, y2, c2) into a pandas DataFrame"""
    return pd.DataFrame({'x': keypoints[::3], 'y': keypoints[1::3], 'c': keypoints[2::3]})

def get_centroid(coordinates, threshold=0.1):
    """Computes the centroid of a given 2 dimensional vector"""
    x = coordinates[coordinates.c > threshold].x
    y = coordinates[coordinates.c > threshold].y

    return [sum(x)/len(x), sum(y)/len(y)]

def get_centroids(frame):
    """Returns the centroid for each person as a list of (x, y) coordinates"""
    return np.array([get_centroid(keypoints_to_dataframe(person['pose_keypoints_2d'])) for person in frame['people']])

def get_closest_index(centroid, other_frame):
    """Find closest index in other_frame from a given centroid"""
    other_centroids = get_centroids(other_frame)
    return np.argmin(paired_distances(np.ones(other_centroids.shape) * centroid, other_centroids))

def plot_keypoints(video_path, keypoints_path, frame_first, frame_step, max_frame_last):
    """Displays the keypoints overlaid on the video"""
    video = cv2.VideoCapture(video_path)

    # fast forward video to frame_first
    for i in range(frame_first):
        _, image = video.read()

    frame = json.load(open(f'{keypoints_path}_{frame_first:012d}_keypoints.json', 'rb'))
    df = keypoints_to_dataframe(frame['people'][0]['pose_keypoints_2d'])
    centroid = get_centroid(df)

    # Create the matplotlib axes
    fig, ax = plt.subplots(nrows=3, ncols=2, figsize=(15, 15), sharex=True, sharey=True)
    ax = ax.flatten()

    for i, t in zip(range(len(ax)), range(frame_first, max_frame_last, frame_step)):
        _, image = video.read()

        frame = json.load(open(f'{keypoints_path}_{t:012d}_keypoints.json', 'rb'))
        index = get_closest_index(centroid, frame) # find the closest person

        # load keypoints for the closest person
        df = keypoints_to_dataframe(frame['people'][index]['pose_keypoints_2d'])

        axis = ax[i]
        axis.scatter(df.x, df.y, s=df.c*10, color='yellow')
        axis.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        axis.set(title=f'Frame {t}')
        draw_skeleton(axis, df)

        centroid = get_centroid(df) # update centroid since person may have moved

## Examine the keypoints

Plot a sample from each class, with 6 frames and skeleton overlay.

**Important**: make sure `HSS_DIR` is set correctly so that the subsequent code can find the keypoint JSON files.

In [5]:
# Use this path if running on Windows
#HSS_DIR = r'D:\S-HSS\Workshop\hss'

# Use this path if running on Colab
HSS_DIR = r'/content/hss/PartC_OpenPose'

In [6]:
def plot_sample(classname, sample_index):
    """Plots first few frames of a sample video"""
    data_path = os.path.join(HSS_DIR, 'data', 'speed', classname)
    video_path = os.path.join(data_path, f'{sample_index}.mp4')
    keypoints_path = os.path.join(data_path, str(sample_index), str(sample_index))

    frame_first = 20
    frame_step = 1
    frame_last = 50

    plot_keypoints(video_path, keypoints_path, frame_first, frame_step,
                   max_frame_last=frame_last)

plot_sample('run', 1)

FileNotFoundError: [Errno 2] No such file or directory: '/content/hss/PartC_OpenPose/data/speed/run/1/1_000000000020_keypoints.json'

In [None]:
plot_sample('walk', 1)

## Extract Features

This part and the part below can be run from any environment.

The next step is to convert the keypoints into features for training a model.


In [None]:
def get_part_candidates_as_features(keypoints_path, frame_first, frame_step,
                                    max_frame_last):
    """Convert keypoints into features for body parts

    BODY_25 format:
    https://github.com/CMU-Perceptual-Computing-Lab/openpose/blob/master/doc/output.md#keypoint-ordering-in-cpython
        //     {0,  "Nose"},
        //     {1,  "Neck"},
        //     {2,  "RShoulder"},
        //     {3,  "RElbow"},
        //     {4,  "RWrist"},
        //     {5,  "LShoulder"},
        //     {6,  "LElbow"},
        //     {7,  "LWrist"},
        //     {8,  "MidHip"},
        //     {9,  "RHip"},
        //     {10, "RKnee"},
        //     {11, "RAnkle"},
        //     {12, "LHip"},
        //     {13, "LKnee"},
        //     {14, "LAnkle"},
        //     {15, "REye"},
        //     {16, "LEye"},
        //     {17, "REar"},
        //     {18, "LEar"},
        //     {19, "LBigToe"},
        //     {20, "LSmallToe"},
        //     {21, "LHeel"},
        //     {22, "RBigToe"},
        //     {23, "RSmallToe"},
        //     {24, "RHeel"},
        //     {25, "Background"}

    Result:
        pandas Dataframe: 3 columns for each part, 1 row per frame
        Nose_x, Nose_y, Nose_conf, Neck_x, Neck_y, Neck_confidence, ..
    """
    PARTS = ["Nose", "Neck", "RShoulder", "RElbow", "RWrist",
             "LShoulder", "LElbow", "LWrist", "MidHip", "RHip",
             "RKnee", "RAnkle", "LHip", "LKnee", "LAnkle", "REye",
             "LEye", "REar", "LEar", "LBigToe", "LSmallToe",
             "LHeel", "RBigToe", "RSmallToe", "RHeel", "Background"]
    PARTS_INDEX = {PARTS[i]: i for i in range(len(PARTS))}

    selected_parts = ["RWrist", "RShoulder", "RElbow", "RWrist",
             "LShoulder", "LElbow", "LWrist", "MidHip", "RHip",
             "RKnee", "RAnkle", "LHip", "LKnee", "LAnkle", "LBigToe",
             "LSmallToe", "LHeel", "RBigToe", "RSmallToe", "RHeel"]

    selected_indices = [PARTS_INDEX[p] for p in selected_parts]

    colnames = []
    for p in selected_parts:
        colnames += [f'{p}_x', f'{p}_y', f'{p}_c']

    rows = []

    for t in range(frame_first, max_frame_last, frame_step):
        frame = json.load(open(f'{keypoints_path}_{t:012d}_keypoints.json', 'rb'))
        part_candidates = frame['part_candidates'][0]

        row = []
        # list comprehension can't be used as some part may be missing
        for i in selected_indices:
            part = part_candidates[str(i)]
            if len(part) == 0:
                # part not found
                part = [0, 0, 0]
            row += part[:3] # some parts appear twice, just pick the first entry
        rows.append(row)

    return pd.DataFrame(rows, columns=colnames)

def get_features(classname, sample_index):
    data_path = os.path.join(HSS_DIR, 'data', 'speed', classname)
    video_path = os.path.join(data_path, f'{sample_index}.mp4')
    keypoints_path = os.path.join(data_path, str(sample_index), str(sample_index))

    frame_first = 20
    frame_step = 1
    frame_last = 30

    return get_part_candidates_as_features(keypoints_path, frame_first,
                                    frame_step, max_frame_last=frame_last)

In [None]:
df_run = get_features('run', 1)
df_run.head()

In [None]:
df_walk = get_features('walk', 1)
df_walk.head()

##<font color='red'>Exercise</font>

Currently the body parts selected are "Nose", "Neck", "Right Wrist", and "Left Wrist".

1. Consider whether other body parts are more suitable for distinguishing between the two classes (sleeping or awake).

2. Under `get_part_candidates_as_features` function above, update `selected_parts` to use those body parts. Once you are done, re-run the above cells and proceed to the next section to create the dataset.

## Create our dataset

* X - features
* y - target

In [None]:
num_samples = 3

# row-wise concat
df_run = pd.concat([get_features('run', i) for i in range(1, num_samples+1)])
df_walk = pd.concat([get_features('walk', i) for i in range(1, num_samples+1)])

X = pd.concat([df_run, df_walk])
X.shape

In [None]:
classes = ['run', 'walk']
y = np.array([0] * df_run.shape[0] + [1] * df_walk.shape[0])
y.shape

## Visualise our training set

We will use a technique called PCA to reduce the features into 2 dimensions, then plot the two classes.

This is useful to indicate whether the model used is going perform terribly. If there is overlap for the samples, then a model cannot tell between them. If there is spacing and clear separation between the samples (social distancing!), then the model can distinguish them by drawing boundary curves, lines, etc.


In [None]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

pca = PCA(n_components=2)
scaler = StandardScaler()

X_2d = pca.fit_transform(scaler.fit_transform(X))

In [None]:
fig, ax = plt.subplots()
ax.set_title('2-d visualization of data')
ax.scatter(X_2d[y==0, 0], X_2d[y==0, 1], label=classes[0])
ax.scatter(X_2d[y==1, 0], X_2d[y==1, 1], label=classes[1])
ax.legend()
plt.show()

In [None]:
# analogous to the degree of 2-d "compression". A high number (close to 1)
# indicates that the 2-d PCA is non-lossy (so we can trust the above diagram more).
# above 50% is quite "okay", meaning that about 50% loss. Typically, for these
# dimensionality reduction methods, it can be as low as 20-30%.

pca.explained_variance_ratio_.sum()

## Random Forest Classifier

Looks like a Random Forest should be able to distinguish between the orange and blue samples, because there are some boundaries that can be drawn to separate them. You can imagine a classifier as a program that automatically tries to draw some line between the clusters.

Let's give it a try.

https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# Train-test split into test and training sets
# stratify: preserves the proportions of the classes
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y)

rf = RandomForestClassifier(n_estimators=10, max_depth=3, random_state=25)
rf.fit(X_train, y_train)
rf.score(X_test, y_test)

In [None]:
# Get predictions from the classifier
y_pred = rf.predict(X_test)

# classification report:
#   support: number of samples in each class
#   accuracy - how many are correct
#   precision - how well the classifier avoided false positives
#   recall - how well the classifier avoided false negatives
#   weighted avg - average weighted based on the proportion of samples in each class
#   macro avg - global average

print(classification_report(y_test, y_pred))

In [None]:
X_test

In [None]:
y_test

In [None]:
y_pred

## <font color='red'> Exercise </font>

1. Instead of asleep vs. awake, replace the videos with two other types of activity.

2. Train a classifier to predict between your chosen activities.

### <font color='red'> Submission </font>
Submit your modified .ipynb to the Workshop 4 folder by the submission deadline.

## Appendix: Hidden Markov Models

*This section is for your reference*

Instead of a Random Forest Classifier, which is a machine learning model that tries to look at the spatial features of each frame (row) independently to predict the classes, we can try a model that looks at the time-sequence, treating each row as a step in time.

HMMs are statistical models that can learn sequences. Alternatively, an RNN (Recurrent Neural Network) is a deep learning model that learns sequences.

To use HMMs, we need to preserve the sequence ordering in the rows. Previously, when we did `train_test_split`, the rows are shuffled so ordering is lost.

In [None]:
!pip install hmmlearn

In [None]:
# Rows in their original frame (row) order
X.head()

In [None]:
# (rows, features)
X.shape

In [None]:
# split into separate classes to build separate HMMs, one per class

X_class0 = X[y==0]
X_class1 = X[y==1]

X_class0.shape, X_class1.shape

In [None]:
# split into train and test without shuffling
X_class0_train, X_class0_test = train_test_split(X_class0,
                                                 shuffle=False,
                                                 test_size=.1)
X_class0_train.shape, X_class0_test.shape

In [None]:
# split into train and test without shuffling
X_class1_train, X_class1_test = train_test_split(X_class1,
                                                 shuffle=False,
                                                 test_size=.1)
X_class1_train.shape, X_class1_test.shape

In [None]:
# Train our hmms
import hmmlearn.hmm as hmm

def fit_hmm(X, window_size):
  """Fits a Gaussian HMM using training set
  X: training set in (rows, features)
  window_size: length of each sequence to consider, number of rows
               should be divisible by it
  """
  rows = X.shape[0]
  lengths = [window_size] * (rows//window_size) # integer division
  return hmm.GaussianHMM(n_components=window_size,
                         algorithm='viterbi',
                         random_state=42,
                         verbose=True).fit(X, lengths)

window_size = 2
training_set = [X_class0_train, X_class1_train]
hmms = [fit_hmm(X_t, window_size) for X_t in training_set]
hmms

In [None]:
def predict(hmms, X_test, window_size):
  """Predict the class that the test set belongs to
  hmms: list of already fitted HMMs
  X_test: test set in (rows, features)
  window_size: length of each sequence to consider. Should be
               same as what was used during fitting
  """
  lengths = [window_size] * (X_test.shape[0]//window_size)
  scores = np.array([hmm.score(X_test, lengths) for hmm in hmms])
  print(scores)
  print(f'Prediction: class {scores.argmax()}, log probability: {scores.max()}')

#
# Check if the correct class is predicted.
# To tune HMMs, you would play around with the window_size, the algorithm
# ('map' or 'viterbi'), and so on
# https://hmmlearn.readthedocs.io/en/latest/api.html#hmmlearn.hmm.GaussianHMM
#
predict(hmms, X_class0_test, window_size=window_size)
predict(hmms, X_class1_test, window_size=window_size)