In [1]:
import os
import torch 
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets
import torch.optim as optim
from torchvision.transforms import ToTensor
import numpy as np
import cv2
import pickle as pkl
import matplotlib.pyplot as plt
from torchvision import transforms
import torchvision.models as models

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print(torch.__version__)

print(device)

1.10.2
cpu


In [3]:
class LazyLoadDataset(Dataset):
    def __init__(self, path, train=True, transform=None):
        self.transform = transform
        if train:
            path = path + "train\\" 
        else:
            path = path + "test\\"
        
        self.pathX = path + "X\\"
        self.pathY = path + "Y\\"
        
        self.data = os.listdir(self.pathX)
        
    def __getitem__(self, idx):
        f = self.data[idx]
        
        # X
        # read rgb images
        
        img0 = cv2.imread(self.pathX + f + "\\rgb\\0.png")
        img1 = cv2.imread(self.pathX + f + "\\rgb\\1.png")
        img2 = cv2.imread(self.pathX + f + "\\rgb\\2.png")
        
        img = np.vstack((img0, img1, img2))
        
        # read depth images
        depth = np.load(self.pathX + f + "\\depth.npy")
        
        depth = np.divide(depth, 1000)
                
        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)
            img2 = self.transform(img2)
            img = self.transform(img)
        
        # read field ID
        field_id = pkl.load(open(self.pathX + f + "\\field_id.pkl", "rb"))
        
        # Y
        Y = np.load(self.pathY + f + ".npy")
        
        return (img0, img1, img2, img, depth, field_id), Y
    
    def __len__(self):
        return len(self.data)

In [4]:
input_size  = 224*224     # images are 224x224 pixels for each image
output_size = 12          # (x,y,z) coordinates for the 4 fingers

dataset = LazyLoadDataset("C:\\Users\\jason_00wr0b7\\Downloads\\lazydata\\lazydata\\", train=True,
                               transform = transforms.ToTensor())

dataset_size = len(dataset)

# split train dataset into train and test data
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=34, shuffle=False)

# data loader for calculating normalization statistics
normal_loader = torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False)

In [5]:
(img0, img1, img2, img, depth, field_id), Y = dataset[0]

In [6]:
Y

array([ 0.02667237,  0.05455598,  0.12987244,  0.02514074, -0.00112136,
        0.13262471,  0.03066019, -0.05429448,  0.12896113,  0.05547796,
        0.01084437, -0.06837047])

In [7]:
field_id

'3320'

In [8]:
# get mean and std for 0 mean unit std normalization

def get_mean_std(loader):
    mean_img0 = 0
    std_img0 = 0
    mean_img1 = 0
    std_img1 = 0
    mean_img2 = 0
    std_img2 = 0
    mean_depth = 0
    std_depth = 0
    
    for batch_idx, ((img0, img1, img2, img, depth, field_id), Y) in enumerate(loader):
        mean_img0 += img0.mean()
        mean_img1 += img1.mean()
        mean_img2 += img2.mean()
        mean_depth += depth.mean()
        std_img0 += img0.std()
        std_img1 += img1.std()
        std_img2 += img2.std()
        std_depth += depth.std()
        
    mean_img0 /= dataset_size
    std_img0 /= dataset_size
    mean_img1 /= dataset_size
    std_img1 /= dataset_size
    mean_img2 /= dataset_size
    std_img2 /= dataset_size
    mean_depth /= dataset_size
    std_depth /= dataset_size
    
    return mean_img0, mean_img1, mean_img2, mean_depth, std_img0, std_img1, std_img2, std_depth
    

In [9]:
mean_img0, mean_img1, mean_img2, mean_depth, std_img0, std_img1, std_img2, std_depth = get_mean_std(normal_loader)

In [10]:
print(mean_img0)
print(std_img0)
print(mean_depth)
print(std_depth)

tensor(0.4161)
tensor(0.2040)
tensor(0.9125)
tensor(0.8012)


In [11]:
# custom RMSE Loss function

class RMSELoss(torch.nn.Module):
    def __init__(self):
        super(RMSELoss,self).__init__()

    def forward(self,x,y):
        criterion = nn.MSELoss()
        loss = torch.sqrt(criterion(x, y))
        return loss

In [12]:
# normalization transforms

t0 = transforms.Compose([transforms.Normalize((mean_img0,), (std_img0,)),
                        
                        ])
t1 = transforms.Compose([transforms.Normalize((mean_img1,), (std_img1,)),
                        
                        ])
t2 = transforms.Compose([transforms.Normalize((mean_img2,), (std_img2,)),
                        
                        ])
t_depth = transforms.Compose([transforms.Normalize((mean_depth,), (std_depth,)),
                        
                        ])

In [13]:
# define train and test functions

loss_fn = RMSELoss()

def train(epoch, model, optimizer, multiplier):
    """
    Train the model for one epoch

    Args:
        epoch (int): current epoch
        model (nn.Module): model to train
        optimizer (torch.optim): optimizer to use
        multipier (int): value to multiply ground truth and prediction by
    """
    model.train()

    for batch_idx, ((img0, img1, img2, img, depth, field_id), Y) in enumerate(train_loader):   
                
        optimizer.zero_grad()
        
        # apply transformations
        img0 = t0(img0)
        img1 = t1(img1)
        img2 = t2(img2)
        depth = t_depth(depth)
                
        # send to device
        img0 = img0.to(device)
        img1 = img1.to(device)
        img2 = img2.to(device)
        depth = depth.to(device)
        
        # obtain output, multiply ground truth by multiplier
        output = model(img0, img1, img2, depth) 
        output = torch.mul(output, multiplier)
        Y = torch.mul(Y, multiplier)
        
        # calculate loss
        loss = loss_fn(output.double(), Y)
                                        
        loss.backward()
        
        optimizer.step()
        
        if batch_idx % 10 == 0:
            print(output[0, :])
            print(Y[0, :])
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(img), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            
def test(model):
    """
    Test the model

    Args:
        model (nn.Module): model to test
    """
    model.eval()
    test_loss = 0
    
    for batch_idx, ((img0, img1, img2, img, depth, field_id), Y) in enumerate(test_loader):
        
        # apply transformations
        img0 = t0(img0)
        img1 = t1(img1)
        img2 = t2(img2)
        depth = t_depth(depth)
        
        # send to device
        img0 = img0.to(device)
        img1 = img1.to(device)
        img2 = img2.to(device)
        depth = depth.to(device)
        
        output = model(img0, img1, img2, depth)
        
        loss = loss_fn(output, Y)
        test_loss += loss.item() # sum up batch loss  
        
        if batch_idx % 10 == 0:
            print(output[0, :])
            print(Y[0, :])
            print('Test Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(img), len(test_loader.dataset),
                100. * batch_idx / len(test_loader), loss.item()))
        
    test_loss /= batch_idx + 1
    
    print('\nTest set: Average loss: {:.4f}\n'.format(
        test_loss,
        ))

In [14]:
class CNN1(nn.Module):
    
    def __init__(self, hidden_layers1, hidden_layers2, output_size):
        super().__init__()
        
        # Setup resnet18s feature extractor
        m0 = models.resnet18(pretrained=False)
        m0.fc = nn.Identity()
        self.resnet18_img0 = m0
        
        m1 = models.resnet18(pretrained=False)
        m1.fc = nn.Identity()
        self.resnet18_img1 = m1
        
        m2 = models.resnet18(pretrained=False)
        m2.fc = nn.Identity()
        self.resnet18_img2 = m2
        
        m3 = models.resnet18(pretrained=False)
        m3.fc = nn.Identity()
        self.resnet18_depth = m3
                
        # resnet18 provide (1000, ) features x 4 images concatenated together 
        
        self.fc_layers = nn.Sequential(
            nn.Linear(512 * 4, hidden_layers1),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(hidden_layers1, hidden_layers1),
            nn.ReLU(),
            nn.Linear(hidden_layers1, hidden_layers2),
            nn.ReLU(),
            nn.Linear(hidden_layers2, hidden_layers2),
            nn.ReLU(),
            nn.Linear(hidden_layers2, output_size)
        )
        
    def forward(self, img0, img1, img2, depth):
        # have resnet18s process images seperately
        y0 = self.resnet18_img0(img0)
        y1 = self.resnet18_img1(img1)
        y2 = self.resnet18_img2(img2)
        y_depth = self.resnet18_depth(depth)
       
        # combine resnet outputs with full connected layers
        y = torch.concat([y0, y1, y2, y_depth], dim=-1)
        
        return self.fc_layers(y)

In [33]:
class CNN2(nn.Module):
    
    def __init__(self, hidden_layer, output_size):
        super().__init__()
        
        # Setup resnet18s feature extractor
        m0 = models.resnet18(pretrained=False)
        m0.fc =  nn.Sequential(
            nn.Linear(512, hidden_layer),
            nn.ReLU(),
            nn.Linear(hidden_layer, output_size),
            nn.ReLU()
        )
        self.resnet_img0 = m0
        
        m1 = models.resnet18(pretrained=False)
        m1.fc = nn.Sequential(
            nn.Linear(512, hidden_layer),
            nn.ReLU(),
            nn.Linear(hidden_layer, output_size),
            nn.ReLU()
        )
        self.resnet_img1 = m1
        
        m2 = models.resnet18(pretrained=False)
        m2.fc = nn.Sequential(
            nn.Linear(512, hidden_layer),
            nn.ReLU(),
            nn.Linear(hidden_layer, output_size),
            nn.ReLU()
        )
        self.resnet_img2 = m2
        
        m3 = models.resnet18(pretrained=False)
        m3.fc = nn.Sequential(
            nn.Linear(512, hidden_layer),
            nn.ReLU(),
            nn.Linear(hidden_layer, output_size),
            nn.ReLU()
        )
        self.resnet_depth = m3
                
        # resnet18 provide (12, ) features x 4 images concatenated together 
        
        self.fc_layers = nn.Sequential(
            nn.Linear(output_size * 4, output_size)
        )
        
    def forward(self, img0, img1, img2, depth):
        # have resnet18s process images seperately
        y0 = self.resnet_img0(img0)
        y1 = self.resnet_img1(img1)
        y2 = self.resnet_img2(img2)
        y_depth = self.resnet_depth(depth)
       
        # combine resnet outputs with full connected layers
        y = torch.concat([y0, y1, y2, y_depth], dim=-1)
        
        return self.fc_layers(y)

In [37]:
# intialize CNN

hidden_layers1 = 500
hidden_layers2 = 100
hidden_layers = 100

model = CNN2(hidden_layers, output_size)
model.to(device)

CNN2(
  (resnet_img0): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_ru

In [38]:
# setup optimizer

optimizer = optim.SGD(model.parameters(), lr=10 ** (-4), momentum = 0.5) 

In [39]:
# train and test model

for epoch in range(0, 3):
    
    train(epoch, model, optimizer, 1000)
    test(model)

tensor([  23.9962, -123.7691,  -88.4072,   36.4003,   73.3948, -111.6677,
          54.0759,  149.2507, -150.4455,   -2.0535,   14.1535,  -36.7543],
       grad_fn=<SliceBackward0>)
tensor([ 49.5241,  51.3114, 120.7183,  65.3726,  -2.5752,  96.9856,  71.9823,
        -48.1654,  73.5571,  56.3476, -17.6821, -55.3574], dtype=torch.float64)
tensor([nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan],
       grad_fn=<SliceBackward0>)
tensor([ 19.1967,  54.7317, 130.7740,  72.8490,  -3.0375, 109.3590,  89.8884,
        -31.0676,   0.2593,  68.7539,  68.9863,  -6.8523], dtype=torch.float64)
tensor([nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan],
       grad_fn=<SliceBackward0>)
tensor([ 19.6957,  54.7196, 131.4918,  87.7424,   3.8980,  96.7588,  52.7440,
        -48.7357,  -9.9186,  67.4000,  36.7304, -27.9597], dtype=torch.float64)


KeyboardInterrupt: 

In [42]:
import pickle
import pandas as pd

outfile = 'submission.csv'

output_file = open(outfile, 'w')

titles = ['ID', 'FINGER_POS_1', 'FINGER_POS_2', 'FINGER_POS_3', 'FINGER_POS_4', 'FINGER_POS_5', 'FINGER_POS_6',
         'FINGER_POS_7', 'FINGER_POS_8', 'FINGER_POS_9', 'FINGER_POS_10', 'FINGER_POS_11', 'FINGER_POS_12']
preds = []
    
test_data = torch.load('C:\\Users\\jason_00wr0b7\\Downloads\\finals_data\\test\\test\\testX.pt')
file_ids = test_data[-1]
rgb_data = test_data[0]
depth_data = test_data[1]

model.eval()


for i, data in enumerate(rgb_data):
    # Please remember to modify this loop, input and output based on your model/architecture
    data = torch.div(data, 255)
    depth = torch.div(depth_data[i], 1000)
    depth = depth.reshape(1,3,224,224)
    
    img0 = t0(data[0:1, :, :, :])
    img1 = t1(data[1:2, :, :, :])
    img2 = t2(data[2:3, :, :, :])
    depth = t_depth(depth)
    
    output = model(img0, img1, img2, depth)
    preds.append(output[0].cpu().detach().numpy())

df = pd.concat([pd.DataFrame(file_ids), pd.DataFrame.from_records(preds)], axis = 1, names = titles)
df.columns = titles
df.to_csv(outfile, index = False)
print("Written to csv file {}".format(outfile))
output_file.close()

Written to csv file submission.csv


In [25]:
output_file.close()