In [1]:
import os

import matplotlib.pyplot as plt
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms

In [2]:
class ImageDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.image_files = [
            f for f in os.listdir(image_dir) 
            if os.path.isfile(os.path.join(image_dir, f))
        ]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(img_name).convert('L')
        if self.transform:
            image = self.transform(image)
        return image

In [3]:
class NumpyMatrixDataset(Dataset):
    def __init__(self, matrix_dir, transform=None):
        self.matrix_dir = matrix_dir
        self.matrix_files = [
            f for f in os.listdir(matrix_dir) 
            if os.path.isfile(os.path.join(matrix_dir, f)) and f.endswith('.npy')
        ]
        self.transform = transform

    def __len__(self):
        return len(self.matrix_files)

    def __getitem__(self, idx):
        matrix_name = os.path.join(self.matrix_dir, self.matrix_files[idx])
        matrix = np.load(matrix_name)
        matrix = torch.from_numpy(matrix).float().unsqueeze(0)  # Convert to 1xHxW tensor
        if self.transform:
            matrix = self.transform(matrix)
        return matrix

In [4]:
class Autoencoder(torch.nn.Module):
    def __init__(self, input_size):
        super(Autoencoder, self).__init__()
        self.encoder = torch.nn.Sequential(
            torch.nn.Conv2d(1, 8, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2),
            torch.nn.Conv2d(8, 8, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2),
            torch.nn.Conv2d(8, 1, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.decoder = torch.nn.Sequential(
            torch.nn.ConvTranspose2d(1, 32, kernel_size=2, stride=2),
            torch.nn.ReLU(),
            torch.nn.ConvTranspose2d(32, 32, kernel_size=2, stride=2),
            torch.nn.ReLU(),
            torch.nn.ConvTranspose2d(32, 1, kernel_size=2, stride=2),
            # torch.nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [5]:
def train_autoencoder(model, optimizer, dataloader, epochs=20):
    criterion = nn.BCEWithLogitsLoss()    
    for epoch in range(epochs):
        for data in dataloader:
            img = data
            output = model(img)
            loss = criterion(output, img)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

In [6]:
def test_autoencoder(autoencoder, dataloader):
    autoencoder.eval()
    with torch.no_grad():
        for data in dataloader:
            encoded_data = autoencoder.encoder(data)
            decoded_data = autoencoder.decoder(encoded_data)
            fig, axes = plt.subplots(1, 3, figsize=(20, 10))
            axes[0].imshow(data[0][0].cpu().numpy(), cmap='gray')
            axes[0].set_title("Original")
            axes[0].axis('off')
            axes[1].imshow(encoded_data[0][0].cpu().numpy(), cmap='gray')
            axes[1].set_title("Encoded")
            axes[1].axis('off')
            axes[2].imshow(decoded_data[0][0].cpu().numpy(), cmap='gray')
            axes[2].set_title("Decoded")
            axes[2].axis('off')
            plt.show()
            break  

In [7]:
# if __name__ == "__main__":
#     dataset = NumpyMatrixDataset(
#         matrix_dir="/Users/rohitbokade/Desktop/projects/mappo_arterial_analysis/binary_matrix/"
#     )
#     dataloader = DataLoader(dataset, batch_size=4, shuffle=True)
#     sample_matrix = next(iter(dataloader))
#     n_channels, height, width = sample_matrix.shape[1], sample_matrix.shape[2], sample_matrix.shape[3]
#     input_size = (n_channels, height, width)
#     print(f"Sample matrix size: {input_size}")
#     autoencoder = Autoencoder(input_size)
#     optimizer = optim.Adam(autoencoder.parameters(), lr=0.0003)
#     train_autoencoder(autoencoder, optimizer, dataloader, epochs=10)
#     test_autoencoder(autoencoder, dataloader)

In [8]:
# train_autoencoder(autoencoder, optimizer, dataloader, epochs=5)
# test_autoencoder(autoencoder, dataloader)

In [9]:
# train_autoencoder(autoencoder, optimizer, dataloader, epochs=5)
# test_autoencoder(autoencoder, dataloader)

In [10]:
# train_autoencoder(autoencoder, optimizer, dataloader, epochs=5)
# test_autoencoder(autoencoder, dataloader)

In [11]:
# train_autoencoder(autoencoder, optimizer, dataloader, epochs=5)
# test_autoencoder(autoencoder, dataloader)

In [12]:
# train_autoencoder(autoencoder, optimizer, dataloader, epochs=5)
# test_autoencoder(autoencoder, dataloader)

In [13]:
# train_autoencoder(autoencoder, optimizer, dataloader, epochs=5)
# test_autoencoder(autoencoder, dataloader)

In [14]:
# train_autoencoder(autoencoder, optimizer, dataloader, epochs=50)
# test_autoencoder(autoencoder, dataloader)

In [15]:
class Predictor(torch.nn.Module):
    def __init__(self, input_size):
        super(Predictor, self).__init__()
        self.encoder = models.resnet18(pretrained=True)
        self.encoder.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        # Replace the final fully connected layer
        num_ftrs = self.encoder.fc.in_features
        self.encoder.fc = nn.Linear(num_ftrs, 1)
            
    def forward(self, x):
        x = self.encoder(x)
        return x

In [16]:
def train_predictor(model, optimizer, dataloader, epochs=20):
    criterion = nn.MSELoss()    
    for epoch in range(epochs):
        total_loss = 0.0
        for data in dataloader:
            img = data
            target = img.sum(dim=(2, 3))  # Sum the values for target
            output = model(img)
            loss = criterion(output, target.unsqueeze(1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(dataloader):.4f}')

In [17]:
def resize_tensor(tensor, size):
    return F.interpolate(tensor.unsqueeze(0), size=size, mode='bilinear', align_corners=False).squeeze(0)

def normalize_tensor(tensor, mean, std):
    mean = torch.tensor(mean).view(1, 1, 1)
    std = torch.tensor(std).view(1, 1, 1)
    return (tensor - mean) / std

In [18]:
if __name__ == "__main__":
    transform = transforms.Compose([
        transforms.Lambda(lambda x: resize_tensor(x, (224, 224))),  # Resize to the input size expected by ResNet
        transforms.Lambda(lambda x: normalize_tensor(x, mean=[0.5], std=[0.5]))  # Normalize for single channel input
    ])
    dataset = NumpyMatrixDataset(
        matrix_dir="/Users/rohitbokade/Desktop/projects/mappo_arterial_analysis/binary_matrix/",
        transform=transform,
    )
    dataloader = DataLoader(dataset, batch_size=4, shuffle=True)
    sample_matrix = next(iter(dataloader))
    n_channels, height, width = sample_matrix.shape[1], sample_matrix.shape[2], sample_matrix.shape[3]
    input_size = (n_channels, height, width)
    print(f"Sample matrix size: {input_size}")
    # Initialize the model, optimizer, and train
    model = Predictor(input_size)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    train_predictor(model, optimizer, dataloader)

Sample matrix size: (1, 224, 224)


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [1/20], Loss: 2499954659.5556
Epoch [2/20], Loss: 2499222471.1111
Epoch [3/20], Loss: 2498773674.6667
Epoch [4/20], Loss: 2498211470.2222
Epoch [5/20], Loss: 2497596984.8889


KeyboardInterrupt: 

In [None]:
train_predictor(model, optimizer, dataloader, epochs=50)