In [25]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import transforms
import torchvision.models as models
import torch.optim as optim

from tensorflow.keras.layers import Conv2D, Flatten, Dense, MaxPooling2D, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import Model
import matplotlib.pyplot as plt
import numpy as np
import os
from tqdm import tqdm
from PIL import Image

In [26]:

class block(nn.Module):
    def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
        super(block, self).__init__()
        self.expansion = 4
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample

    def forward(self, x):
        identity = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
                               
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)
            
        x += identity
        x = self.relu(x)
        return x

class ResNet(nn.Module): 
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        # ResNet layers
        self.layer1 = self._make_layers(block, layers[0], out_channels=64, stride=1)
        self.layer2 = self._make_layers(block, layers[1], out_channels=128, stride=2)
        self.layer3 = self._make_layers(block, layers[2], out_channels=256, stride=2)
        self.layer4 = self._make_layers(block, layers[3], out_channels=512, stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        
        return x
        
    def _make_layers(self, block, num_residual_blocks, out_channels, stride):
        identity_downsample = None
        layers = []
        
        if stride != 1 or self.in_channels != out_channels * 4:
            identity_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * 4, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * 4)
            )
        
        layers.append(block(self.in_channels, out_channels, identity_downsample, stride))
        self.in_channels = out_channels * 4 
        
        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, out_channels))
                
        return nn.Sequential(*layers)

def ResNet500(img_channels=3, num_classes=3):
    model = ResNet(block, [3, 4, 6, 3], img_channels, num_classes)
    pretrained_model = models.resnet50(pretrained=True)
     # Load pre-trained weights, except for the final fully connected layer
    pretrained_dict = pretrained_model.state_dict()
    model_dict = model.state_dict()

    # Filter out the final layer parameters
    pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict and "fc" not in k}
    model_dict.update(pretrained_dict)
    model.load_state_dict(model_dict)
    return model

def ResNet101(img_channels=3, num_classes=3):
    model = ResNet(block, [3, 4, 23, 3], img_channels, num_classes)
    pretrained_model = models.resnet101(pretrained=True)
    model.load_state_dict(pretrained_model.state_dict(), strict=False)
    return model

def ResNet152(img_channels=3, num_classes=3):
    model = ResNet(block, [3, 8, 36, 3], img_channels, num_classes)
    pretrained_model = models.resnet152(pretrained=True)
    model.load_state_dict(pretrained_model.state_dict(), strict=False)
    return model


In [27]:
class CustomDataset(Dataset):

    def __init__(self, root_dir, transform_image=None, transform_mask=None):

        self.root_dir = root_dir
        self.transform_image = transform_image
        self.transform_mask = transform_mask
        self.images_dir = os.path.join(root_dir, 'imgs')
        self.masks_dir = os.path.join(root_dir, 'masks')
        self.image_filenames = os.listdir(self.images_dir)
        self.image_filenames = [name for name in self.image_filenames if not name.startswith(".ipynb")]

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, idx):
        img_name = os.path.join(self.images_dir, self.image_filenames[idx])
        mask_name = os.path.join(self.masks_dir, self.image_filenames[idx].replace('img_', 'mask_').replace('.jpg', '.png'))

        image = Image.open(img_name).convert('RGB')
        mask = Image.open(mask_name).convert('RGB') 

        if self.transform_image:
            image = self.transform_image(image)
        if self.transform_mask:
            mask = self.transform_mask(mask)

        return image, mask

In [28]:
# Define Transforms
transform_image = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

transform_mask = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

In [29]:
# Defines dataset and dataloaders
train_dataset = CustomDataset(root_dir='CAVS/Main_Trail/Train', transform_image=transform_image, transform_mask=transform_mask)
test_dataset = CustomDataset(root_dir='CAVS/Main_Trail/Test', transform_image=transform_image, transform_mask=transform_mask)

train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [30]:
# Initialize model
#model = ResNet50(img_channels=3, num_classes=3) 
#device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#model = model.to(device)
#model.summary()

In [31]:
model = ResNet50(weights='imagenet', include_top=False, input_shape=(256,256,3))

2024-07-13 20:24:17.825640: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1928] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 13414 MB memory:  -> device: 0, name: Tesla V100-PCIE-16GB, pci bus id: 0000:3b:00.0, compute capability: 7.0
2024-07-13 20:24:17.826280: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1928] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 14612 MB memory:  -> device: 1, name: Tesla V100-PCIE-16GB, pci bus id: 0000:d8:00.0, compute capability: 7.0


In [32]:
# Function to calculate IoU
def calculate_iou(pred_mask, true_mask, num_classes):
    ious = []
    pred_mask = pred_mask.view(-1)
    true_mask = true_mask.view(-1)
    
    for cls in range(num_classes):
        pred_inds = (pred_mask == cls)
        target_inds = (true_mask == cls)
        
        intersection = (pred_inds[target_inds]).long().sum().item()
        union = pred_inds.long().sum().item() + target_inds.long().sum().item() - intersection
        
        if union == 0:
            ious.append(float('nan'))  # if there is no ground truth, do not include in the mean
        else:
            ious.append(intersection / union)
    
    return ious

def calculate_miou(pred_mask, true_mask, num_classes):
    ious = calculate_iou(pred_mask, true_mask, num_classes)
    miou = np.nanmean(ious)  # nanmean to ignore NaN values
    return miou, ious

In [33]:
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

In [39]:
model.fit(
    train_dataset,
    epochs=20,

)

ValueError: Unrecognized data type: x=<__main__.CustomDataset object at 0x7f29ddfc33d0> (of type <class '__main__.CustomDataset'>)