In [None]:
#Notebook intended for a Google Colab Session
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [None]:
cd gdrive/My\ Drive/

/content/gdrive/My Drive


In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import os

In [None]:
device = torch.device("cuda")
num_epochs = 5000000
lin_s = 1024
nstage = 2
pdrop = 0.67
loss_rate = 0.0001

train_2d_x_path = './mCBF/train/mouse-wheel_revisions_synthetic-v2_camera-2_calibrated_2d.csv' # path to 2D pixel coords saved by Blender
train_3d_y_path = './mCBF/train/mouse-wheel_revisions_synthetic-v2_camera-2_calibrated_3d.csv' # path to 3D realtive coords saved by Blender
model_save_path = './mCBF/checkpoints/' # path to models
model_name = 'mouse-wheel_camera2_model' # name of model to save/load
test_path = './mCBF/camera2_2d/' # path to exported .CSVs from DeepLabCut to lift to 3D
save_path = './mCBF/camera2_lifted3d/' # path to save lifted 3d coords
best_loss = 0.00008 # Model will begin saving once loss is below this level and saved each subsequent better model. We halted training at around a loss of 5e-5 to 8e-5 (Interrupt execution when you want to halt)
noise_amount = 2 # Amount of noise to add to pixel coordinates +=noise_amount/2 for better generalizability 

In [None]:
## MODEL BASED ON Martinez et al. 2017 https://github.com/una-dinosauria/3d-pose-baseline
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function


def weight_init(m):
    if isinstance(m, nn.Linear):
        nn.init.kaiming_normal(m.weight)


class Linear(nn.Module):
    def __init__(self, linear_size, p_dropout=0.5):
        super(Linear, self).__init__()
        self.l_size = linear_size

        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(p_dropout)

        self.w1 = nn.Linear(self.l_size, self.l_size)
        self.batch_norm1 = nn.BatchNorm1d(self.l_size)

        self.w2 = nn.Linear(self.l_size, self.l_size)
        self.batch_norm2 = nn.BatchNorm1d(self.l_size)

    def forward(self, x):
        y = self.w1(x)
        y = self.batch_norm1(y)
        y = self.relu(y)
        y = self.dropout(y)

        y = self.w2(y)
        y = self.batch_norm2(y)
        y = self.relu(y)
        y = self.dropout(y)

        out = x + y

        return out


class LinearModel(nn.Module):
    def __init__(self,
                 linear_size=1024,
                 num_stage=2,
                 p_dropout=0.5):
        super(LinearModel, self).__init__()

        self.linear_size = linear_size
        self.p_dropout = p_dropout
        self.num_stage = num_stage

        # 2d joints
        self.input_size =  num_bones * 2
        # 3d joints
        self.output_size = num_bones * 3

        # process input to linear size
        self.w1 = nn.Linear(self.input_size, self.linear_size)
        self.batch_norm1 = nn.BatchNorm1d(self.linear_size)

        self.linear_stages = []
        for l in range(num_stage):
            self.linear_stages.append(Linear(self.linear_size, self.p_dropout))
        self.linear_stages = nn.ModuleList(self.linear_stages)

        # post processing
        self.w2 = nn.Linear(self.linear_size, self.output_size)

        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(self.p_dropout)

    def forward(self, x):
        # pre-processing
        y = self.w1(x)
        y = self.batch_norm1(y)
        y = self.relu(y)
        y = self.dropout(y)

        # linear layers
        for i in range(self.num_stage):
            y = self.linear_stages[i](y)

        y = self.w2(y)

        return y

In [None]:
train_x = np.genfromtxt(train_2d_x_path,delimiter=',',dtype=str)
train_y = np.genfromtxt(train_3d_y_path,delimiter=',',dtype=str)

#Initialize numpy arrays with input variables and expected output currently it is pixel coordinates of markers becoming mCBF coordinates
num_frames = train_x.shape[0] - 3
num_bones = int((train_x.shape[1] - 1)/2)

print("Training with " + str(num_frames) + " frames and " + str(num_bones) + " bones")

x_array = np.zeros((num_frames,num_bones*2),dtype="float32")
noise_array = noise_amount * np.random.random_sample((num_frames,num_bones*2)) - (noise_amount/2)

for i in range(num_frames):
    x_array[i] = train_x[i+3][1:]

x_array = np.add(x_array,noise_array) #Add a bit of noise to make it more generalizable

y_array = np.zeros((num_frames,num_bones*3),dtype=float)
for i in range(num_frames):
    y_array[i] = train_y[i+3][1:]
    
X = torch.from_numpy(x_array).float().to(device)
Y = torch.from_numpy(y_array).float().to(device)

Training with 1000 frames and 28 bones


In [None]:
model = LinearModel(linear_size=lin_s,num_stage=nstage,p_dropout=pdrop)
model.to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=loss_rate)
epoch = 0

checkpoint = {
    "epoch" : epoch,
    "model_state": model.state_dict(),
    "optim_state": optimizer.state_dict()
}

In [None]:
for t in range(epoch, num_epochs):
    # Forward pass: Compute predicted y by passing x to the model
    epoch = t
    y_pred = model(X)
    # Compute and print loss
    loss = criterion(y_pred, Y)

    if t % 300 == 299:
        print(t, loss.item())
        if(loss.item() < best_loss):
            best_loss = loss.item()
            path =  model_save_path + model_name +'.pth'
            torch.save(checkpoint, path)
            print("Saved checkpoint!")

    # Zero gradients, perform a backward pass, and update the weights.
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

299 0.19476483762264252
599 0.06873439997434616
899 0.027795903384685516
1199 0.012249166145920753
1499 0.005494280252605677
1799 0.0026363444048911333
2099 0.0013426363002508879
2399 0.0007987627177499235
2699 0.0005343255470506847
2999 0.0004119911463931203
3299 0.0003630859137047082
3599 0.0003217408957425505
3899 0.0003168782277498394
4199 0.00029598455876111984
4499 0.00027291892911307514
4799 0.0002637551515363157
5099 0.00025639100931584835
5399 0.0002502524002920836
5699 0.00024685278185643256
5999 0.00022546196123585105
6299 0.00022497435566037893
6599 0.00022146910487208515
6899 0.0002160643634852022
7199 0.00021489635400939733
7499 0.00020762631902471185
7799 0.0002002070687012747
8099 0.00019328604685142636
8399 0.00018308908329345286
8699 0.0001700902939774096
8999 0.0001742908643791452
9299 0.000160323572345078
9599 0.0001665660965954885
9899 0.00015771920152474195
10199 0.00015271453594323248
10499 0.00015887862537056208
10799 0.00014195943367667496
11099 0.0001456620084

KeyboardInterrupt: ignored

In [None]:
camera_paths = []
camera_filenames = []

for dirName, subdirList, fileList in os.walk(test_path):
    fileList.sort()
    for fname in fileList:
        camera_paths.append(dirName+fname)
        camera_filenames.append(fname[:-4])
       
print("There are {} files for the single camera".format(len(camera_filenames)))
print(camera_filenames)

There are 2 files for the single camera
['2020_12_2_MK1_1606954279_camera-1DLC_resnet50_camera_2_3Dec11shuffle1_1030000filtered', '2020_12_2_MK1_1606954279_camera-1DLC_resnet50_camera_2_3Dec11shuffle1_600000filtered']


In [None]:
model_path = model_save_path + model_name + '.pth'
epoch = 0
loaded = torch.load(model_path, map_location={'cuda:0': 'cpu'})
epoch = loaded["epoch"]

model = LinearModel(linear_size=lin_s,num_stage=nstage,p_dropout=pdrop)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=loss_rate)

model.load_state_dict(loaded["model_state"])
optimizer.load_state_dict(loaded["optim_state"])

for i in range(len(camera_filenames)):
    print("Lifting file number: ", (i+1))
    filename = camera_filenames[i]
    filepath = camera_paths[i]
    
    #LOAD DATA TO TORCH
    dlc_in_cam1 = np.genfromtxt(filepath,delimiter=',',dtype=str)

    joints = np.zeros(int((dlc_in_cam1.shape[1]-1)/3), dtype=object)
    test_rows = dlc_in_cam1.shape[0]-3
    test_cols = len(joints) * 2

    x_test = np.zeros((test_rows,test_cols),dtype=float)
    if(test_rows > 1):
        for c in range(len(joints)):
            joints[c] = dlc_in_cam1[1][c*3+1]
        for r in range(test_rows):
            for c in range(num_bones):
                x_test[r][c*2] = dlc_in_cam1[r+3][c*3+1]
                x_test[r][c*2+1] = dlc_in_cam1[r+3][c*3+2]
  

        X_test = torch.from_numpy(x_test).float().to(device)

        #Compute test
        model.to(device)
        y_pred_test = model(X_test)
        #print(y_pred_test)

        dlc_out = np.zeros((y_pred_test.shape[0]+3, num_bones*3+1), dtype=object)
        dlc_out[0][0] = "scorer"
        dlc_out[1][0] = "bodypart"
        dlc_out[2][0] = "relatice coordinates"
        for c in range(num_bones):
            dlc_out[0][c*3+1] = "pose-estimator"
            dlc_out[0][c*3+2] = "pose-estimator"
            dlc_out[0][c*3+3] = "pose-estimator"
            dlc_out[1][c*3+1] = joints[c]
            dlc_out[1][c*3+2] = joints[c]
            dlc_out[1][c*3+3] = joints[c]
            dlc_out[2][c*3+1] = "x"
            dlc_out[2][c*3+2] = "y"
            dlc_out[2][c*3+3] = "z"

        for r in range(y_pred_test.shape[0]):
            dlc_out[r+3][0] = "frame_"+str(r)
            dlc_out[r+3][1:] = y_pred_test.cpu().detach().numpy()[r]
        #print(dlc_out)

        outputfile = save_path + filename + "_LIFTED-3d.csv"

        import csv

        with open(outputfile, mode='w',newline='') as f:
            w = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            w.writerows(dlc_out)
    else:
        print("single frame video ignoring..." + file)
    
print("Done!")

Lifting file number:  1
Lifting file number:  2
