In [None]:
import torch.utils.data as data
import numpy as np
from pandas import read_csv
from torchvision.io import read_image
from torchvision.transforms import Resize, Compose, ToTensor, Normalize, ToPILImage
from PIL import Image
from torch.utils.data.sampler import SubsetRandomSampler


class NYUv2Dataset(data.Dataset):
    def __init__(self, train=True):

        self.root = "/kaggle/input/nyu-depth-v2/nyu_data"
        self.train = train
        

        if train:
            csv_data_train = read_csv(self.root + "/data/nyu2_train.csv", header=None)
            self.rgb_paths = csv_data_train[0]
        else:
            csv_data_test = read_csv(self.root + "/data/nyu2_test.csv", header=None)
            self.rgb_paths = csv_data_test[0]

        self.length = len(self.rgb_paths)
        self.transform = Compose(
            [
                Resize((240, 320)),
                ToTensor(),
                Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ]
        )

        self.target_transform = Compose(
            [
                Resize((120, 160)),
                ToTensor(),
            ]
        )

    def __getitem__(self, index):
        path = self.root + "/" + self.rgb_paths[index]

        rgb = Image.open(path)
        if self.train:
            path = path.replace(".jpg", ".png")
        else:
            path = path.replace("colors", "depth")
        
        depth = Image.open(path)

        return self.transform(rgb), self.target_transform(depth)

    def __len__(self):
        return self.length


if __name__ == "__main__":
    # Testing
    train_dataset = NYUv2Dataset(train=True)
    test_dataset = NYUv2Dataset(train=False)

    print('train dataset',len(train_dataset))
    print('test dataset',len(test_dataset))
    
    for item in train_dataset[0]:
        print(item.size())
    for item in test_dataset[0]:
        print(item.size())


In [None]:
import torchvision
import torch
from torch import nn

DenseNet161 = torchvision.models.densenet161(pretrained=False)

batch_size = 10

device = (
    torch.device('cuda')
    if (torch.cuda.is_available())
    else torch.device("cpu")
)

print("Using device:", device.type.upper())



In [None]:
train_dataset = NYUv2Dataset(
    train=True,
)
test_dataset = NYUv2Dataset(
    train=False,
)

test_split = .2
shuffle_dataset = True
random_seed = 42

dataset_size = len(train_dataset)
indices = list(range(dataset_size))

split = int(np.floor(test_split * dataset_size))

if(shuffle_dataset):
    np.random.seed(random_seed)
    np.random.shuffle(indices)
    

train_indices, test_indices = indices[split:], indices[:split]

train_sampler = SubsetRandomSampler(train_indices)
test_sampler = SubsetRandomSampler(test_indices)

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, sampler = train_sampler
)
test_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, sampler = test_sampler
)

print(len(train_loader))
print(len(test_loader))


In [None]:
# bottleneck layer
class Bottleneck(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super().__init__()
        self.bnorm = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv = nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
        )

    def forward(self, x):
        x = self.bnorm(x)
        x = self.relu(x)
        x = self.conv(x)
        return x

# deconvolution layer
class DeConv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super().__init__()
        self.bnorm = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.deconv = nn.ConvTranspose2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
        )

    def forward(self, x):
        x = self.bnorm(x)
        x = self.relu(x)
        x = self.deconv(x)
        return x


# Define model
class Main(nn.Module):
    def __init__(self):
        super().__init__()
        self.densenet161 = nn.Sequential(*list(DenseNet161.children())[:-1])
        # first bottleneck layer
        self.btl1 = Bottleneck(
            in_channels=2208, out_channels=512, kernel_size=1, stride=1, padding=0
        )

        # 1st deconvolution layer
        self.deconv1 = DeConv(
            in_channels=512, out_channels=512, kernel_size=5, stride=2, padding=1
        )
        self.avgpool1 = nn.AvgPool2d(kernel_size=(1, 2), stride=1, padding=0)

        # 2nd deconvolution layer
        self.deconv2 = DeConv(
            in_channels=512, out_channels=256, kernel_size=5, stride=2, padding=1
        )
        self.avgpool2 = nn.AvgPool2d(kernel_size=(2, 2), stride=1, padding=0)

        # 3rd deconvolution layer
        self.deconv3 = DeConv(
            in_channels=256, out_channels=128, kernel_size=5, stride=2, padding=1
        )
        self.avgpool3 = nn.AvgPool2d(kernel_size=(2, 2), stride=1, padding=0)

        # 4th deconvolution layer
        self.deconv4 = DeConv(
            in_channels=128, out_channels=1, kernel_size=5, stride=2, padding=1
        )
        self.avgpool4 = nn.AvgPool2d(kernel_size=(2, 2), stride=1, padding=0)

    def forward(self, x):
        x = self.densenet161(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after densenet161", x.size())

        x = self.btl1(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after btl1", x.size())

        x = self.deconv1(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after deconv1", x.size())

        x = self.avgpool1(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after avgpool1", x.size())

        x = self.deconv2(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after deconv2", x.size())

        x = self.avgpool2(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after avgpool2", x.size())

        x = self.deconv3(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after deconv3", x.size())

        x = self.avgpool3(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after avgpool3", x.size())

        x = self.deconv4(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after deconv4", x.size())

        x = self.avgpool4(x)
        # print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ after avgpool4", x.size())

        return x



In [None]:
all_training_loss_1 = []
all_val_loss_1 = []
all_val_accuracy_1 = []

In [None]:
model = Main().to(device)

loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)

epochs = 10

In [None]:
# ####################### TRAINING AND TESTINGA ###################
for epoch in range(epochs):
    print(f"\nEpoch {epoch+1}\n-------------------------------")
    training_loss_1 = 0
    val_loss_1 = 0
    val_accuracy_1 = 0
    for batch_no,(images_1, labels_1) in enumerate(train_loader):
#     for i in range(10):
#     if False:
        if torch.cuda.is_available():
            images, labels = images_1.cuda(), labels_1.cuda()
        images = images.float()
        optimizer.zero_grad()
        output_1 = model(images)
        loss_1 = loss_fn(output_1, labels)
        loss_1.backward()
        training_loss_1 += loss_1.item()
        optimizer.step()
        if batch_no % 100 == 0:
            print(f"-> TRAIN batch_no: {batch_no}, batch_loss: {loss_1.item()}")
        
    else:
        # entering evaluation mode
        model.eval()
        
        with torch.no_grad():
            for val_images_1, val_labels_1 in test_loader:
               
                if torch.cuda.is_available():
                    val_images, val_labels = val_images_1.cuda(), val_labels_1.cuda()
                
                val_images = val_images.float()
                log_ps_1 = model(val_images)
           
        
                batch_val_loss_1 = loss_fn(log_ps_1, val_labels)
                val_loss_1 += batch_val_loss_1.item()

                
                ps_1 = torch.exp(log_ps_1)
                top_p_1, top_class_1 = ps_1.topk(k=1, dim=1)
                accuracy_array_1 = (top_class_1 == val_labels.view(*top_class_1.shape))
                accuracy_1 = torch.mean(accuracy_array_1.type(torch.FloatTensor))
                val_accuracy_1 += accuracy_1.item()


                   
        
        log = f"\nTraining loss: {training_loss_1/len(train_loader):.4f}\t Test loss {val_loss_1/len(test_loader):.4f}\t"
        log += f"\tTest accuracy: {val_accuracy_1/len(test_loader):.4f}"
        
        print(log)
  
        all_training_loss_1.append(training_loss_1/len(train_loader))
        all_val_loss_1.append(val_loss_1/len(test_loader))
        all_val_accuracy_1.append(val_accuracy_1/len(test_loader))

        
        model.train()

torch.save(model.state_dict(),"model.pth")
print("Saved this model state model.pth")


In [None]:
model = Main()
model.load_state_dict(torch.load("model.pth", map_location=torch.device('cpu')))

In [None]:
# import matplotlib.pyplot as plt 

# plt.plot(all_training_loss_1, label="training loss") 
# plt.plot(all_val_loss_1, label="validation loss") 
# #plt.plot(all_test_accuracy, label="testing accuracy") plt.legend()

In [None]:
import matplotlib.pyplot as plt
from torchvision.transforms import functional as F
from torchvision import transforms


model.eval()
idx = 2342
x, y = train_dataset[idx][0], train_dataset[idx][1]
    


with torch.no_grad():
    x_prev = x
    x = x.unsqueeze(0)
    pred = model(x)
    pred = pred.squeeze(0)
    
    fig = plt.figure(figsize=(10, 7))
    
    rows = 2
    columns = 2
    
    fig.add_subplot(rows, columns, 1)
    plt.imshow(x_prev.permute(1, 2, 0))
    
    fig.add_subplot(rows, columns, 2)
    plt.imshow(pred.permute(1, 2, 0))






    