# 2D to 3D Lifting Network

## Imports and Setup

In [59]:
import os

import numpy as np
import pandas as pd
from skimage import io, transform
import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import matplotlib.pyplot as plt
plt.ion()   # interactive mode

<matplotlib.pyplot._IonContext at 0x247a763d1c0>

In [60]:
%matplotlib widget
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

def set_axes_equal(ax):
    # workaround, as matplotlib's 3D plot has no option for equisised axes (10/2021)
    x_limits = ax.get_xlim3d()
    y_limits = ax.get_ylim3d()
    z_limits = ax.get_zlim3d()

    x_range = abs(x_limits[1] - x_limits[0])
    x_middle = np.mean(x_limits)
    y_range = abs(y_limits[1] - y_limits[0])
    y_middle = np.mean(y_limits)
    z_range = abs(z_limits[1] - z_limits[0])
    z_middle = np.mean(z_limits)

    plot_radius = 0.5*max([x_range, y_range, z_range])

    ax.set_xlim3d([x_middle - plot_radius, x_middle + plot_radius])
    ax.set_ylim3d([y_middle - plot_radius, y_middle + plot_radius])
    ax.set_zlim3d([z_middle - plot_radius, z_middle + plot_radius])


In [61]:
class BugDataset(Dataset):
    """Bug dataset."""

    def __init__(self, df, root_dir, transform=None):
        """
        Args:
            hdf_file (string): Path to the hdf file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.bugs_frame = df
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.bugs_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir,
                                self.bugs_frame.iloc[idx, 0])
        image = io.imread(img_name)
        df_columns = self.bugs_frame.columns.values.tolist()
        sample = {'image':image}
 
        for x in range(len(df_columns)):    
            sample[df_columns[x]] = self.bugs_frame.iloc[idx,x]

        if self.transform:
            sample = self.transform(sample)

        return sample
    
class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, sample):
        image = sample['image']
        # swap color axis because
        # numpy image: H x W x C
        # torch image: C X H X W
        image = image.transpose((2, 0, 1))
        sample_keys = list(sample.keys())
        sample_data = list(sample.values())

        dic ={'image': torch.from_numpy(image)}
        dic[sample_keys[1]] =  sample_data[1]
        for x in range(2,len(sample_keys)):
            dic[sample_keys[x]] = torch.FloatTensor(sample_data[x])
        return dic
class Normalize(object):
    def __init__(self,means_2, means_3, std_2, std_3):
        self.means_2 =means_2
        self.means_3 = means_3
        self.std_2 = std_2
        self.std_3 = std_3
    def __call__(self, sample):
        image = sample['image']
        dic ={'image': image}
        sample_keys = list(sample.keys())
        sample_data = list(sample.values())

        dic[sample_keys[1]] =  sample_data[1]
        for x in range(2,len(sample_keys)):
            if sample_keys[x]== 'key_points_2D':
                dic[sample_keys[x]] = (sample_data[x]-self.means_2)/self.std_2
            elif sample_keys[x]== 'key_points_3D':
                dic[sample_keys[x]] = (sample_data[x]-self.means_3)/self.std_3
            else:
                dic[sample_keys[x]] = sample_data[x]
        return dic

class Centralize(object):
    def __init__(self, keypoint=3):
        self.center_keypoint = keypoint
    def __call__(self, sample):
        image = sample['image']
        dic ={'image': image}
        sample_keys = list(sample.keys())
        sample_data = list(sample.values())

        dic[sample_keys[1]] =  sample_data[1]
        for x in range(2,len(sample_keys)):
            if sample_keys[x]== 'key_points_2D':
                x_diff,y_diff = sample_data[x][self.center_keypoint-1][0],sample_data[x][self.center_keypoint-1][1]
                # print(x_diff,y_diff)
                # print("2D Init",sample_data[x][0])
                for i in range(len(sample_data[x])):
                    sample_data[x][i][0] = sample_data[x][i][0] - x_diff
                    sample_data[x][i][1] = sample_data[x][i][1] - y_diff
                # print("2D Out", sample_data[x][0])
                dic[sample_keys[x]] = sample_data[x]

            elif sample_keys[x]== 'key_points_3D':
                x_diff,y_diff,z_diff = sample_data[x][self.center_keypoint-1][0],sample_data[x][self.center_keypoint-1][1],sample_data[x][self.center_keypoint-1][2]
                # print(x_diff,y_diff, z_diff)
                # print("3D Init",sample_data[x][0])
                for i in range(len(sample_data[x])):
                    sample_data[x][i][0] = sample_data[x][i][0] - x_diff
                    sample_data[x][i][1] = sample_data[x][i][1] - y_diff
                    sample_data[x][i][2] = sample_data[x][i][2] - z_diff
                # print("3D Out", sample_data[x][0])
                dic[sample_keys[x]] = sample_data[x]
            else:
                dic[sample_keys[x]] = sample_data[x]
        return dic


In [62]:
target_dir = "../data/single_sungaya/"
out_df = pd.read_hdf(os.path.join(target_dir, "Data_3D_Pose.hdf5"))
new_df = pd.DataFrame()
for col, x in out_df.iterrows():
    if x['visibility'][2]==1:
        new_df = new_df.append(x)

In [63]:
array_2d = np.array(new_df['key_points_2D'].to_numpy())
array_3d = np.array(new_df['key_points_3D'].to_numpy())

for x in range(len(array_2d)):
    array_2d[x] = np.array(array_2d[x])
    array_3d[x] = np.array(array_3d[x])

fixed_array_2d = np.empty((len(array_2d),124))
fixed_array_3d = np.empty((len(array_3d),186))
for x in range(len(fixed_array_2d)):
    z = array_2d[x].reshape(1,124)
    fixed_array_2d[x] = z
for x in range(len(fixed_array_3d)):
    z = array_3d[x].reshape(1,186)
    fixed_array_3d[x] = z

means_2d = np.mean(fixed_array_2d, axis=0).reshape((62,2))

means_3d = np.mean(fixed_array_3d, axis=0).reshape((62,3))
std_2d =  np.std(fixed_array_2d, axis=0).reshape((62,2))

std_3d = np.std(fixed_array_3d, axis=0).reshape((62,3))

In [64]:
sungaya_dataset = BugDataset(df=new_df,
                             root_dir=target_dir,transform=transforms.Compose([
                                Centralize(),
                                Normalize(means_2d, means_3d, std_2d, std_3d),
                                ToTensor()
                                   ]))

In [65]:
sample = sungaya_dataset[1]
# print(sample)
image = sample["file_name"]
plt.figure()
plt.imshow(io.imread(os.path.join(target_dir,image)))
plt.show()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [66]:
train_size = int(0.8 * len(sungaya_dataset))
test_size = len(sungaya_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(sungaya_dataset, [train_size, test_size])

In [67]:
batch_size = 64

train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

In [68]:
# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))    
class Net(nn.Module):
    
    def __init__(self,n_inputs,hidden,n_output):
        super(Net, self).__init__()
        self.flatten = nn.Flatten()
        self.seq1 = nn.Sequential(
            nn.Linear(n_inputs, hidden),
            nn.BatchNorm1d(hidden),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(hidden, hidden),
            nn.BatchNorm1d(hidden),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(hidden, n_inputs)
        )
        self.seq2 = nn.Sequential(
            nn.Linear(n_inputs, hidden),
            nn.BatchNorm1d(hidden),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(hidden, hidden),
            nn.BatchNorm1d(hidden),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(hidden, n_inputs)
        )
        self.out = nn.Linear(n_inputs, n_output)
    def forward(self, x):
        x = self.flatten(x)
        residual = x
        out = self.seq1(x)
        out+=residual
        out = self.seq2(out)
        return self.out(x)

# model = NeuralNetwork().to(device)

Using cuda device


In [69]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, data in enumerate(dataloader):
        X = data['key_points_2D']
        y = data['key_points_3D'][:,:,2]
        X, y = X.to(device, dtype=torch.float), y.to(device, dtype=torch.float)
        
        # Compute prediction error
        # print(X.shape)
        pred = model(X)
        loss = loss_fn(pred, y)
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for data in dataloader:
#             print(data['key_points_2D'])
            X = data['key_points_2D']
            y = data['key_points_3D'][:,:,2]
            # print(y)
            X, y = X.to(device, dtype=torch.float), y.to(device, dtype=torch.float)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            
            correct += (abs(pred - y)<0.72).type(torch.float).sum().item()
    test_loss /= num_batches
    print(f"Test Error: \n Accuracy: {(correct / size):>4f}%, Avg loss: {test_loss:>8f} \n")

In [70]:
model = Net(2*62,2054,62).to(device)

In [71]:
epochs = 5
learning_rate =5e-3
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 4421144987204023517970432.000000  [    0/ 2612]
Test Error: 
 Accuracy: 37.585627%, Avg loss: 105427526430778252591104.000000 

Epoch 2
-------------------------------
loss: 82560952139276189958144.000000  [    0/ 2612]
Test Error: 
 Accuracy: 39.561162%, Avg loss: 92196403132617222782976.000000 

Epoch 3
-------------------------------
loss: 62472591958194564104192.000000  [    0/ 2612]
Test Error: 
 Accuracy: 40.056575%, Avg loss: 85594101513803188404224.000000 

Epoch 4
-------------------------------
loss: 56061952075813286313984.000000  [    0/ 2612]
Test Error: 
 Accuracy: 40.567278%, Avg loss: 79875874516240508649472.000000 

Epoch 5
-------------------------------
loss: 50749357336579346006016.000000  [    0/ 2612]
Test Error: 
 Accuracy: 41.032110%, Avg loss: 74859165044270363049984.000000 

Done!


In [80]:
model.eval()
with torch.no_grad():
    for data in test_dataloader:
        image = data['file_name']
        print(image)
        vis = data['visibility'].numpy()
        X = data['key_points_2D']
        y = data['key_points_3D']
        # print(y)
        X, y = X.to(device, dtype=torch.float), y.to(device, dtype=torch.float)
        pred = model(X)
        break


1370_Img_synth.png


In [81]:
def plot_stick_bug(ax, points, simple=False):
    limb_ranges=[[0,7],[8,14],[15,21],[22,28],[29,35],[36,42],[43,49],[53,56],[59,62]]
    if len(points) < 62:
        return
    elif len(points) == 62:
        points = np.array(points).T
        if not simple:
            for (fr,end) in limb_ranges:
                ax.plot(points[0][fr:end], points[1][fr:end], points[2][fr:end])
                ax.scatter(points[0][fr:end], points[1][fr:end], points[2][fr:end], marker='o',s=10)
            return ax


In [85]:
sample = 59
print(image[sample])
prediction = (pred[sample].cpu()).numpy()
actual = (y[sample].cpu()).numpy()


mean = means_3d[:,2]
std = std_3d[:,2]

unnormalised_est = (prediction*std)+mean
unnormalised_acc = (actual*std_3d)+means_3d
# print(unnormalised_est)

fig = plt.figure()
ax = fig.add_subplot(projection='3d')

for i in range(len(unnormalised_acc)):
    # ax.scatter(unnormalised_acc[i,0], unnormalised_acc[i,1], unnormalised_acc[i,2], marker='o',s=10)
    ax.scatter(unnormalised_acc[i,0], unnormalised_acc[i,1], unnormalised_est[i], marker='x',s=10)
    # print("Differences Point:", i, np.abs(unnormalised_acc[i,2]-unnormalised_est[i]))

plot_stick_bug(ax, unnormalised_acc)

ax.set_xlabel('X axis')
ax.set_ylabel('Y axis')
ax.set_zlabel('Z axis')

# use custom function to ensure equal axis proportions
set_axes_equal(ax)

# opens external plot
plt.show()

3274_Img_synth.png


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [75]:
(50-mean[0])/std[0]

-1.2019235329074087