In [1]:
import os
import numpy as np
import torch
import torchvision.transforms as transforms
import torch.utils.data as data
import matplotlib.pyplot as plt
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.metrics import accuracy_score
import pandas as pd
import pickle

In [2]:
def labels2cat(label_encoder, list):
    return label_encoder.transform(list)

def labels2onehot(OneHotEncoder, label_encoder, list):
    return OneHotEncoder.transform(label_encoder.transform(list).reshape(-1, 1)).toarray()

def onehot2labels(label_encoder, y_onehot):
    return label_encoder.inverse_transform(np.where(y_onehot == 1)[1]).tolist()

def cat2labels(label_encoder, y_cat):
    return label_encoder.inverse_transform(y_cat).tolist()

In [3]:
# for CRNN
class Dataset_CRNN(data.Dataset):
    "Characterizes a dataset for PyTorch"
    def __init__(self, data_path, folders, labels, frames, transform=None):
        "Initialization"
        self.data_path = data_path
        self.labels = labels
        self.folders = folders
        self.transform = transform
        self.frames = frames

    def __len__(self):
        "Denotes the total number of samples"
        return len(self.folders)

    def read_images(self, path, selected_folder, use_transform):
        X = []
        for i in self.frames:
            image = Image.open(os.path.join(path, selected_folder, '{:05d}.jpg'.format(i)))

            if use_transform is not None:
                image = use_transform(image)

            X.append(image)
        X = torch.stack(X, dim=0)

        return X

    def __getitem__(self, index):
        "Generates one sample of data"
        # Select sample
        folder = self.folders[index]

        # Load data
        X = self.read_images(self.data_path, folder, self.transform)     # (input) spatial images
        y = torch.LongTensor([self.labels[index]])                  # (labels) LongTensor are for int64 instead of FloatTensor

        # print(X.shape)
        return X, y

In [48]:
def CRNN_final_prediction(model, device, loader):
    cnn_encoder, rnn_decoder = model
    cnn_encoder.eval()
    rnn_decoder.eval()

    all_y_pred = []
    with torch.no_grad():
        for batch_idx, (X, y) in enumerate(tqdm(loader)):
            # distribute data to device
            X = X.to(device)
            output = rnn_decoder(cnn_encoder(X))
            y_pred = output.max(1, keepdim=True)[1]  # location of max log-probability as prediction
            y_pred_list = y_pred.cpu().data.squeeze().numpy().tolist()
            if type(y_pred_list) == int:
                all_y_pred.append(y_pred_list)
                break
            all_y_pred.extend(y_pred_list)
            # all_y_pred.extend(y_pred)

    return all_y_pred

In [47]:
type(10) == int

True

In [21]:
def test(model, device, test_loader):
    # set model as testing mode
    cnn_encoder, rnn_decoder = model
    cnn_encoder.eval()
    rnn_decoder.eval()

    test_loss = 0
    all_y = []
    all_y_pred = []
    with torch.no_grad():
        for X, y in test_loader:
            # distribute data to device
            X, y = X.to(device), y.to(device).view(-1, )

            output = rnn_decoder(cnn_encoder(X))

            loss = F.cross_entropy(output, y, reduction='sum')
            test_loss += loss.item()                 # sum up batch loss
            y_pred = output.max(1, keepdim=True)[1]  # (y_pred != output) get the index of the max log-probability

            # collect all y and y_pred in all batches
            all_y.extend(y)
            all_y_pred.extend(y_pred)

    test_loss /= len(test_loader.dataset)

    # compute accuracy
    all_y = torch.stack(all_y, dim=0)
    all_y_pred = torch.stack(all_y_pred, dim=0)
    test_score = accuracy_score(all_y.cpu().data.squeeze().numpy(), all_y_pred.cpu().data.squeeze().numpy())

    # show information
    print('\nTest set ({:d} samples): Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(len(all_y), test_loss, 100* test_score))

    return test_loss, test_score

In [16]:
# set path
data_path = "F:/STAT453-Project/sub_dataset"    # define UCF-101 RGB data path

# action_name_path = './UCF101actions.pkl'
save_model_path = "./CRNN_ckpt/"

# EncoderCNN architecture
CNN_fc_hidden1, CNN_fc_hidden2 = 1024, 768
CNN_embed_dim = 512      # latent dim extracted by 2D CNN
img_x, img_y = 100, 176  # resize video 2d frame size
dropout_p = 0.0          # dropout probability

# DecoderRNN architecture
RNN_hidden_layers = 3
RNN_hidden_nodes = 512
RNN_FC_dim = 256

# training parameters
k = 10             # number of target category
epochs = 120        # training epochs
batch_size = 30  
learning_rate = 1e-4
log_interval = 10   # interval for displaying training info

# Select which frame to begin & end in videos
begin_frame, end_frame, skip_frame = 1, 10, 1

In [17]:
# Detect devices
use_cuda = torch.cuda.is_available()                   # check if GPU exists
device = torch.device("cuda" if use_cuda else "cpu")   # use CPU or GPU

# Data loading parameters
params = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 0, 'pin_memory': True} if use_cuda else {}

# load actions names
action_names = pd.read_csv('labels.csv')['Label'].tolist()

# convert labels -> category
le = LabelEncoder()
le.fit(action_names)

# show how many classes there are
list(le.classes_)

# convert category -> 1-hot
action_category = le.transform(action_names).reshape(-1, 1)
enc = OneHotEncoder()
enc.fit(action_category)

OneHotEncoder(categories='auto', drop=None, dtype=<class 'numpy.float64'>,
              handle_unknown='error', sparse=True)

In [54]:
train_df = pd.read_csv('train_set.csv')
    
train_list = list(map(str, train_df['Index'].tolist()))
train_label = labels2cat(le, train_df['Label'].tolist())
    
val_df = pd.read_csv('val_set.csv')
val_list = list(map(str, val_df['Index'].tolist()))
val_label = labels2cat(le, val_df['Label'].tolist())

test_df = pd.read_csv('test_set.csv')
test_list = list(map(str, test_df['Index'].tolist()))
test_label = labels2cat(le, test_df['Label'].tolist())

transform = transforms.Compose([transforms.Resize([img_x, img_y]),
                                    transforms.ToTensor(),
                                    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

selected_frames = np.arange(begin_frame, end_frame, skip_frame).tolist()

# ?
    
train_set, valid_set = Dataset_CRNN(os.path.join(data_path, "train"), train_list, train_label, selected_frames, transform=transform), \
                           Dataset_CRNN(os.path.join(data_path, "val"), val_list, val_label, selected_frames, transform=transform)
test_set= Dataset_CRNN(os.path.join(data_path, "test"), test_list, test_label, selected_frames, transform=transform)

train_loader = data.DataLoader(train_set, **params)
valid_loader = data.DataLoader(valid_set, **params)

test_data_params = {'batch_size': batch_size, 'shuffle': False, 'num_workers': 0, 'pin_memory': True} if use_cuda else {}

test_loader = data.DataLoader(test_set, **test_data_params)

# Create model
cnn_encoder = EncoderCNN(img_x=img_x, img_y=img_y, fc_hidden1=CNN_fc_hidden1, fc_hidden2=CNN_fc_hidden2,
                             drop_p=dropout_p, CNN_embed_dim=CNN_embed_dim).to(device)

rnn_decoder = DecoderRNN(CNN_embed_dim=CNN_embed_dim, h_RNN_layers=RNN_hidden_layers, h_RNN=RNN_hidden_nodes, 
                             h_FC_dim=RNN_FC_dim, drop_p=dropout_p, num_classes=k).to(device)

In [55]:
cnn_encoder.load_state_dict(torch.load(os.path.join(save_model_path, 'cnn_encoder_epoch10.pth')))
rnn_decoder.load_state_dict(torch.load(os.path.join(save_model_path, 'rnn_decoder_epoch10.pth')))
print('CRNN model reloaded!')

CRNN model reloaded!


In [56]:
test([cnn_encoder, rnn_decoder], device, test_loader)


Test set (1411 samples): Average loss: 2.0780, Accuracy: 26.44%



(2.078038921261577, 0.2643515237420269)

In [57]:
fnames = os.listdir(data_path + '/test')

In [58]:
cat2labels(le, test_label)

['Sliding Two Fingers Right',
 'Shaking Hand',
 'Shaking Hand',
 'Sliding Two Fingers Down',
 'Pushing Two Fingers Away',
 'Thumb Up',
 'Rolling Hand Forward',
 'Shaking Hand',
 'Sliding Two Fingers Left',
 'Thumb Up',
 'Thumb Up',
 'Pulling Hand In',
 'Shaking Hand',
 'Sliding Two Fingers Right',
 'Rolling Hand Forward',
 'Pushing Two Fingers Away',
 'Sliding Two Fingers Right',
 'Thumb Up',
 'Pushing Two Fingers Away',
 'Pulling Hand In',
 'Sliding Two Fingers Left',
 'Pushing Two Fingers Away',
 'Shaking Hand',
 'Shaking Hand',
 'Sliding Two Fingers Right',
 'Sliding Two Fingers Down',
 'Sliding Two Fingers Down',
 'Sliding Two Fingers Down',
 'Sliding Two Fingers Left',
 'Shaking Hand',
 'Turning Hand Counterclockwise',
 'Pulling Hand In',
 'Shaking Hand',
 'Shaking Hand',
 'Pulling Hand In',
 'Turning Hand Counterclockwise',
 'Pulling Hand In',
 'Sliding Two Fingers Right',
 'Rolling Hand Forward',
 'Thumb Up',
 'No gesture',
 'Sliding Two Fingers Left',
 'Sliding Two Fingers Righ

In [59]:
# make all video predictions by reloaded model
print('Predicting all {} videos:'.format(len(test_loader.dataset)))
all_y_pred = CRNN_final_prediction([cnn_encoder, rnn_decoder], device, test_loader)


# write in pandas dataframe
df = pd.DataFrame(data={'filename': fnames, 'y': cat2labels(le, test_label), 'y_pred': cat2labels(le, all_y_pred)})
df.to_pickle("./UCF101_videos_prediction.pkl")  # save pandas dataframe
# pd.read_pickle("./all_videos_prediction.pkl")
print('video prediction finished!')

  0%|                                                                                           | 0/48 [00:00<?, ?it/s]

Predicting all 1411 videos:


 98%|████████████████████████████████████████████████████████████████████████████████▎ | 47/48 [00:21<00:00,  2.23it/s]

video prediction finished!



