<a href="https://colab.research.google.com/github/nghess/607-sensory-coding/blob/main/Copy_of_3dCNN_v1_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import matplotlib.pyplot as plt
import tifffile as tiff
import pandas as pd
import numpy as np
import os

In [2]:
from google.colab import drive
drive.mount('/content/drive')
path = "/content/drive/My Drive/607_sensory_coding"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Navigate to the repository's directory
repo_path = os.path.join(path, '607-sensory-coding')
os.chdir(repo_path)

In [4]:
class TiffDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        img_name = os.path.join(self.root_dir, self.annotations.iloc[index, 0])
        image = tiff.imread(img_name)  # Load stack

        if self.transform:
            image = self.transform(image)

        # Ensure the image is a float tensor
        if not isinstance(image, torch.FloatTensor):
            image = image.type(torch.FloatTensor)

        # Convert labels to numerical format
        rotation_class = 1 if self.annotations.iloc[index, 1] == 'clockwise' else 0
        input_class_mapping = {'cross': 0, 'tee': 1, 'elbow': 2, 'radius': 3, 'diameter': 4}
        input_class = input_class_mapping[self.annotations.iloc[index, 2]]

        # Combine the labels (e.g., using one-hot encoding for the input class)
        label = torch.tensor([rotation_class, input_class], dtype=torch.long)

        return image, label

# Run transforms. Normalizes image stacks between 0 and 1.
transform = transforms.Compose([
    transforms.ToTensor(),
])

dataset = TiffDataset(csv_file='dataset/scatter/labels_scatter.csv', root_dir='dataset/', transform=transform)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)

## Define Model

In [5]:
class Simple3DCNN(nn.Module):
    def __init__(self):
        super(Simple3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool3d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv3d(16, 32, 3, padding=1)
        self.fc1 = nn.Linear(32 * 32 * 45 * 32, 512)  # Adjust the size
        self.fc_rotation = nn.Linear(512, 2)  # Two classes for rotation
        self.fc_input = nn.Linear(512, 5)  # Five classes for input type

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        #print(x.shape) #torch.Size([32, 64, 45, 64]) for 256x256 [32, 32, 45, 32]) for 128x128
        x = x.view(-1, 32 * 32 * 45 * 32)  # Flatten the tensor
        x = F.relu(self.fc1(x))
        rotation_output = self.fc_rotation(x)
        input_output = self.fc_input(x)
        return rotation_output, input_output

model = Simple3DCNN()

## Train Model

In [None]:
epochs = 100

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# Initialize a list to keep track of loss values
loss_values = []

# Check if CUDA is available
if torch.cuda.is_available():
    print("CUDA is active.")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move the model to the device (GPU or CPU)
model = model.to(device)

for epoch in range(epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(dataloader):

        # Move to GPU
        inputs, labels = inputs.to(device), labels.to(device)

        # Split the labels
        rotation_labels = labels[:, 0]
        input_labels = labels[:, 1]

        # Reset gradient
        optimizer.zero_grad()
        outputs = model(inputs)

        # Assuming your model's output is designed to handle both types of labels
        loss_rotation = criterion(outputs[0], rotation_labels)
        loss_input = criterion(outputs[1], input_labels)
        loss = loss_rotation + loss_input  # Combine losses, or handle as you see fit

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # At the end of each epoch, store the average loss
    epoch_loss = running_loss / len(dataloader)
    loss_values.append(epoch_loss)
    print(f'Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss:.4f}')

CUDA is active.
Epoch [1/100], Loss: 2.3362
Epoch [2/100], Loss: 2.3229
Epoch [3/100], Loss: 2.2806
Epoch [4/100], Loss: 2.1236
Epoch [5/100], Loss: 1.3884
Epoch [6/100], Loss: 1.2566
Epoch [7/100], Loss: 1.0779
Epoch [8/100], Loss: 0.8261
Epoch [9/100], Loss: 0.4619
Epoch [10/100], Loss: 1.0847
Epoch [11/100], Loss: 0.9883
Epoch [12/100], Loss: 0.5957
Epoch [13/100], Loss: 0.2838
Epoch [14/100], Loss: 0.1377
Epoch [15/100], Loss: 0.0841
Epoch [16/100], Loss: 0.0389
Epoch [17/100], Loss: 0.0074
Epoch [18/100], Loss: 0.0017
Epoch [19/100], Loss: 0.0010
Epoch [20/100], Loss: 0.0008
Epoch [21/100], Loss: 0.0007
Epoch [22/100], Loss: 0.0006
Epoch [23/100], Loss: 0.0005
Epoch [24/100], Loss: 0.0004
Epoch [25/100], Loss: 0.0004
Epoch [26/100], Loss: 0.0004
Epoch [27/100], Loss: 0.0003
Epoch [28/100], Loss: 0.0003
Epoch [29/100], Loss: 0.0003
Epoch [30/100], Loss: 0.0003
Epoch [31/100], Loss: 0.0002
Epoch [32/100], Loss: 0.0002
Epoch [33/100], Loss: 0.0002
Epoch [34/100], Loss: 0.0002
Epoch [

### Save Model to File

In [None]:
#torch.save(model, '/content/drive/My Drive/607_sensory_coding/test_model_2.pt')

In [None]:
print(model)

## Test Model

In [None]:
test_dataset = TiffDataset(csv_file='dataset/scatter/labels_scatter.csv', root_dir='dataset/', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
# Test label prediction performance
def test_model(model, test_loader, device):
    model.eval()  # Set the model to evaluation mode
    correct_rotation, correct_input, total = 0, 0, 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            rotation_labels, input_labels = labels[:, 0], labels[:, 1]

            rotation_output, input_output = model(inputs)

            _, predicted_rotation = torch.max(rotation_output.data, 1)
            _, predicted_input = torch.max(input_output.data, 1)

            total += labels.size(0)
            correct_rotation += (predicted_rotation == rotation_labels).sum().item()
            correct_input += (predicted_input == input_labels).sum().item()

    print(f'Accuracy of the network on rotation prediction: {100 * correct_rotation / total}%')
    print(f'Accuracy of the network on input type prediction: {100 * correct_input / total}%')

test_model(model, test_loader, device)

## Visualize Trained Layers

In [None]:
# Function to pull a single sample input out for layer visualization
def get_sample_input(data_loader):

    for inputs, _ in data_loader:
        # Select one instance in the batch
        sample_input = inputs[0]
        return sample_input

sample_input = get_sample_input(test_loader)

In [None]:
def plot_feature_maps(model, input_tensor, selected_layers, ncols=6):

    model.eval()

    # Function to get the output of a layer
    def get_features_map(layer, input, output):
        feature_maps.append(output.cpu().data.numpy())

    # Attach hooks to the selected layers
    hooks = []
    for name, layer in model.named_modules():
        if name in selected_layers:
            hooks.append(layer.register_forward_hook(get_features_map))

    # Initialize the feature maps list and pass the input through the model
    feature_maps = []
    with torch.no_grad():
        model(input_tensor.unsqueeze(0).to(device))

    # Remove hooks (important to avoid memory leak)
    for hook in hooks:
        hook.remove()

    # Plotting
    for ii in range(32):
      for layer_maps in feature_maps:
          n_features = layer_maps.shape[1]
          nrows = n_features // ncols + int(n_features % ncols != 0)
          fig, axes = plt.subplots(nrows, ncols, figsize=(20, 2 * nrows))
          for i in range(n_features):
              row = i // ncols
              col = i % ncols
              ax = axes[row, col] if nrows > 1 else axes[col]
              ax.imshow(layer_maps[ii, i, :, :], cmap='gray')
              ax.axis('off')

          #plt.show()

          # Save the figure
          plt.savefig(f"epoch_{epochs}_conv2[{ii}].png")  # Saves the plot as a PNG file
          plt.close()  # Closes the current figure

# Pull out a layer and plot slices
selected_layers = ['conv2']
plot_feature_maps(model, sample_input, selected_layers, ncols=6)


In [None]:
plt.close('all')

In [None]:
# print(dataset[0])
# fig, axes = plt.subplots(nrows, ncols, figsize=(20, 2 * nrows))

ncols = 4
nrows = 4
fig, axes = plt.subplots(nrows, ncols, figsize=(20, 2 * nrows))

# Plot a 4x4 grid of images from dataset
for i in range(16):
    row = i // ncols
    col = i % ncols
    ax = axes[row, col] if nrows > 1 else axes[col]
    # add correspnding label from labels list
    #rotation_labels, input_labels = labels[:, 0], labels[:, 1]
    ax.set_title(f"{rotation_labels[0]}, {input_labels[0]}")
    ax.imshow(dataset[i][0][30], cmap='gray')
    ax.axis('off')