# Convolutional Autoencoders as Segmenter

In [None]:
#Importing necessary packages

import torch
from torchvision import datasets
from torchvision import transforms

import matplotlib.pyplot as plt
import h5py 
import numpy as np
import pandas as pd
import json
import os

from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from tqdm import tqdm 


from sklearn.model_selection import train_test_split


# Importing Autoencoder
from autoencoder import Autoencoder

# Data Loading
The videos are contained in .h5 format. The frames can be accessed in 'frame_data' group. While time can accessed in 'time' group

In [None]:
file_path = 'infrared-pulses/Infrared_RE/KLDT-E5WC_95775.h5'
file = h5py.File(file_path,'r')
dset = file['frame_data']
video = np.zeros(dset.shape,dtype =  dset.dtype)
video = dset
print(video.shape)

# Padding
The usual expectation for the shape of the video is (n,176,120) where n is number of frames. 
In case the video does not satisfy the requirement, the video is reshaped into the respective using padding of 0's.

In [None]:
temp = np.zeros((video.shape[0], 176,120))
temp[:, : video.shape[1], :video.shape[1]] = video
video = temp

In [None]:
def normalize(video):
    """
    Normalizes each frame in a video.

    This function takes a video represented as a NumPy array and normalizes each frame 
    to have values between 0 and 1. Normalization is performed on a per-frame basis, 
    meaning that each frame is normalized independently.

    Parameters:
    -----------
    video : numpy.ndarray
        A 3D NumPy a63rray representing the video, where the first dimension is the 
        frame index, and the second and third dimensions are the height and width 
        of the frames, respectively.

    Returns:
    --------
    numpy.ndarray
        A 3D NumPy array of the same shape as the input, where each frame has been 
        normalized to the range [0, 1].

    """
    import numpy as np

    temp = np.empty_like(video, dtype=np.float64)  # Use float64 for more precise calculations
    for i in range(len(video)):
        frame = video[i]
        temp[i] = (frame - np.min(frame)) / (np.max(frame) - np.min(frame))
    
    return temp


In [None]:
video_inv = np.zeros(video.shape)  # Initialize an array for inverted video
video_norm = normalize(video)  # Normalize the video

# Invert each frame
for i in range(len(video)):
    video_inv[i] = 1 - video_norm[i]

In [None]:
# Visualizing frame index i 
i = 600
plt.figure(figsize=[10,10])
plt.subplot(1,2,1)
plt.imshow(video[i])
plt.title("Original Video")
plt.subplot(1,2,2)
plt.imshow(video_inv[i])
plt.title("Inverted Video")

In [None]:
class CustomDataset(Dataset):
    """
    A custom dataset class for handling image data.

    Parameters:
    -----------
    images : numpy.ndarray or list
        An array or list of images. Each image should be in a format compatible with the transforms applied
        (e.g., 2D array for grayscale images or 3D array for RGB images).
    transform : callable, optional
        A function/transform to apply to each image. Default is None (no transform).

    Methods:
    --------
    __len__():
        Returns the number of images in the dataset.
    __getitem__(idx):
        Returns the image at the specified index after applying the transform (if any).

    """
    def __init__(self, images, transform=None):
        self.images = images
        self.transform = transform
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        image = self.images[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image


In [None]:
test_size = 0.3  # Percentage of the data to allocate to the test set
video_train, video_test = train_test_split(video_inv, test_size=test_size)

# Print shapes to verify the split
print(f"Training set shape: {video_train.shape}")
print(f"Testing set shape: {video_test.shape}")

In [None]:
batch_size  = 32

# Define transformations (converts to tensor)
transform = transforms.Compose([
    transforms.ToTensor(),
])

# Create CustomDataset instances
train_dataset = CustomDataset(video_train, transform=transform)
test_dataset = CustomDataset(video_test, transform=transform)

# Create DataLoader instances
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

In [None]:
# Determine device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"Selected device: {device}")

In [None]:
# Define parameters
inputs = video.shape[1] * video.shape[2]  # Assuming video has shape (frames, height, width)
layer = 2
channels = 256
lr = 1e-4
weight_decay = 1e-8

# Initialize model
model = Autoencoder(c=channels, layer=layer)
model.to(device)  # Move model to selected device

# Define loss function
loss_function = nn.MSELoss()

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

In [None]:
# Define directory name based on model configuration
directory = f'autoencoder_{layer}l_{channels}c'

# Check if directory exists; if not, create it
if not os.path.exists(directory):
    os.makedirs(directory)


In [None]:
from tqdm import tqdm  # Import tqdm for progress bars

# Define epochs and lists to store losses
epochs = 150
epoch_losses = []
epoch_val_losses = []

# Training loop
for epoch in range(epochs):
    losses = []
    
    print(f'Epoch [{epoch + 1}/{epochs}]')
    
    # Training phase
    model.train()  # Set model to training mode
    train_progress = tqdm(enumerate(dataloader), total=len(dataloader), desc='Training')
    for i, image in train_progress:
        image = image.to(device).float()  # Move image to device and convert to float tensor
        
        optimizer.zero_grad()  # Clear gradients from previous iteration
        output = model(image).reshape(image.shape)  # Forward pass
        loss = loss_function(output, image)  # Compute loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update weights
        
        losses.append(loss.item())  # Record the loss
        train_progress.set_postfix({'loss': loss.item()})
    
    # Validation phase
    model.eval()  # Set model to evaluation mode
    val_losses = []
    val_progress = tqdm(enumerate(val_dataloader), total=len(val_dataloader), desc='Validation')
    with torch.no_grad():  # Disable gradient computation for validation
        for i, image in val_progress:
            image = image.to(device).float()  # Move image to device and convert to float tensor
            output = model(image).reshape(image.shape)  # Forward pass
            val_loss = loss_function(output, image)  # Compute loss
            
            val_losses.append(val_loss.item())  # Record the validation loss
            val_progress.set_postfix({'val_loss': val_loss.item()})
    
    # Compute average losses for the epoch
    epoch_loss = sum(losses) / len(losses)
    epoch_val_loss = sum(val_losses) / len(val_losses)
    
    epoch_losses.append(epoch_loss)
    epoch_val_losses.append(epoch_val_loss)
    
    # Print epoch summary
    print(f'Train Loss: {epoch_loss:.4f} | Val Loss: {epoch_val_loss:.4f}')


In [None]:
# Plot training and validation losses
plt.plot(epoch_losses, color='r', label='Training Loss')
plt.plot(epoch_val_losses, color='b', label='Validation Loss')

# Add labels and legend
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Losses')
plt.legend()

# Save plot as image file
plt.savefig(os.path.join(directory, 'loss_plot.png'))

# Display the plot
plt.show()

In [None]:
# Example data to save to a JSON file
data = {
    'layers': layer,
    'channels': channels,
    'learning rate': lr,
    'epoch': epochs,
    'loss': 'mse',
    'test size': test_size,
    'batch size': batch_size,
    'weight decay': weight_decay
}

# Specify the file path
file_path = os.path.join(directory, "parameters.json")

# Save data to the JSON file
with open(file_path, "w") as json_file:
    json.dump(data, json_file)

print("File saved successfully at:", file_path)

In [None]:
# Specify the file path for saving the model weights
weights_path = os.path.join(directory,"model_weights.pth")

# Save the model weights
torch.save(model.state_dict(), weights_path)

In [None]:
# Specify the file path for saving the entire model
model_path = os.path.join(directory,"entire_model.pth")

# Save the entire model
torch.save(model, model_path)

### Visualizing the results

In [None]:
# Set model to evaluation mode
model.eval()

# Move model to CPU for inference
model.to('cpu')


In [None]:
# Select frame index 600 from 'video_inv'
i = 600
original = video_inv[600]

# Reshape the frame to match the expected input shape of the model (assuming grayscale)
image = original.reshape(1, 1, video.shape[1], video.shape[2])

# Convert the numpy array to a PyTorch tensor of type float
image = torch.tensor(image).float()

# Perform inference using the model
with torch.no_grad():  # Context manager to disable gradient calculation
    output = model(image)  # Pass the input tensor through the model to get the output


In [None]:
# Reshape the output to match the dimensions of the original frame
output = output.reshape(video.shape[1], video.shape[2])

# Now 'output' contains the reconstructed frame reshaped to match 'original'

In [None]:
import matplotlib.pyplot as plt

# Create a new figure
plt.figure()

# Subplot 1: Original Frame
plt.subplot(1, 2, 1)
plt.imshow(original)
plt.title("Original")

# Subplot 2: Reconstructed Frame (Output)
plt.subplot(1, 2, 2)
plt.imshow(output)
plt.title("Filtered Image")

# Add a title for the entire figure
plt.suptitle("Original vs Filtered")

# Display the figure
plt.show()


In [None]:
# Compute the difference between the reconstructed frame ('output') and the original frame ('original')
temp = output - original

# 

In [None]:
import matplotlib.pyplot as plt

# Create a figure with a specific size
plt.figure(figsize=[10, 5])

# Subplot 1: Original Frame
plt.subplot(1, 3, 1)
plt.imshow(original)
plt.title("Original")

# Subplot 2: Reconstructed Frame (Output)
plt.subplot(1, 3, 2)
plt.imshow(output)
plt.title("Filtered Image")

# Subplot 3: Difference between Reconstructed and Original Frames (Temp)
plt.subplot(1, 3, 3)
plt.imshow(temp)
plt.title("Filtered - Original")

# Add a title to the entire figure
plt.suptitle(f'Autoencoder({layer} layers) with {channels} channels for N2V files')

# Save the figure to a file (assuming 'directory' is defined earlier)
plt.savefig(os.path.join(directory, 'fig.png'))

# Display the figure
plt.show()


# Inference on Videos

In [None]:
from PIL import Image
from autoencoder import Autoencoder

In [None]:
# Loading the data
# Define the pulse number and file path
pulse_number = 95775
file_path = f'infrared-pulses/Infrared_RE/KLDT-E5WC_{pulse_number}.h5'

# Open the HDF5 file for reading
data = h5py.File(file_path, 'r')

# Access the dataset named 'frame_data' within the HDF5 file
dset = data['frame_data']

# Create a NumPy array 'video_check' with the same shape as 'dset' and copy the data
video_check = np.zeros(dset.shape)
video_check[:, :, :] = dset

In [None]:
# Create a temporary array filled with zeros of shape (number of frames, 176, 120)
temp = np.zeros((video_check.shape[0], 176, 120))

# Copy the contents of 'video_check' into 'temp', maintaining the original data where possible
temp[:, :video_check.shape[1], :video_check.shape[2]] = video_check

# Assign the padded array 'temp' back to 'video_check'
video_check = temp


In [None]:
import torch
from autoencoder_model import Autoencoder  # Assuming Autoencoder class is defined in autoencoder_model.py

# Define the path to the model weights checkpoint
model_path = 'results/autoencoder_3l_128c/model_weights.pth'

# Define the number of channels and number of layers for the Autoencoder
channels = 128
layer = 3

# Instantiate the Autoencoder model with the specified number of channels and layers
model = Autoencoder(c=channels, layer=layer)

# Load the model weights from the checkpoint
checkpoint = torch.load(model_path, map_location=torch.device('cpu'))  # Load checkpoint onto CPU
model.load_state_dict(checkpoint)  # Load model weights into the model instance

# Set the model to evaluation mode
model.eval()

# Move the model to the CPU (if it's not already there)
device = torch.device('cpu')
model.to(device)


In [None]:
# Define the pulse number
pulse_number = 95775

# Define the output folder name for saving video as sequence of images for tracking
output_folder = f'Segmented Images_{pulse_number}'

# Check if the output folder exists; if not, create it
if not os.path.exists(output_folder):
    os.mkdir(output_folder)

In [None]:
# Define a transformation pipeline for data preprocessing
transform = transforms.Compose([
    transforms.ToTensor()  # Convert input data to PyTorch tensor
])

In [None]:

# Normalize the entire video and initialize arrays for inverted and processed videos
normalized_video = normalize(video_check)
inverted_video = np.zeros(video_check.shape)
filtered_video = np.zeros(video_check.shape)
processed_video = np.zeros(video_check.shape)
temp_video = np.zeros(video_check.shape)

num_frames = len(video_check)

with torch.no_grad():
    for frame_idx in range(590, 610):
        # Invert the normalized frame and store in inverted_video
        inverted_video[frame_idx] = 1 - normalized_video[frame_idx]
        
        # Apply the transform to the inverted frame and convert to PyTorch tensor
        transformed_frame = transform(inverted_video[frame_idx]).float()
        inverted_tensor = transformed_frame.reshape(1, 1, inverted_video.shape[1], inverted_video.shape[2])
        
        # Apply the model to the inverted tensor
        filtered_tensor = model(inverted_tensor)
        
        # Normalize the filtered tensor to the range [0, 1]
        filtered_tensor_normalized = (filtered_tensor - torch.min(filtered_tensor)) / (torch.max(filtered_tensor) - torch.min(filtered_tensor))
        filtered_video[frame_idx] = filtered_tensor_normalized.cpu().numpy().reshape(video_check.shape[1], video_check.shape[2])
        
    
        # Reshape the difference tensor and store in processed_video
        processed_video[frame_idx] = filtered_video[frame_idx] - inverted_video[frame_idx]
        
        # Normalize the processed frame
        temp_frame = processed_video[frame_idx]
        temp_frame = (temp_frame - np.min(temp_frame)) / (np.max(temp_frame) - np.min(temp_frame))
        temp_video[frame_idx] = temp_frame
        
        # Save the processed frame as a TIFF file (commented out, assuming you handle saving elsewhere)
        frame_path = os.path.join(output_folder, f"{pulse_number}_img_{frame_idx:04d}.tif")
        # Image.fromarray(processed_video[frame_idx]).save(frame_path)



In [None]:
# Display the original, inverted, and processed frames
i = 605
plt.figure(figsize=[10, 10])

# Plot the original frame
plt.subplot(1, 4, 1)
plt.imshow(video_check[i])
plt.title("Original")

# Plot the inverted frame
plt.subplot(1, 4, 2)
plt.imshow(inverted_video[i])
plt.title("Inverted")

# Plot the inverted frame
plt.subplot(1, 4, 3)
plt.imshow(filtered_video[i])
plt.title("Filtered")

# Plot the processed frame
plt.subplot(1, 4, 4)
plt.imshow(processed_video[i])
plt.title("Segmented Image")

# Show the plot
plt.show()


##### For visualization purpose, sequence of frames saved in .h5 file are converted into a video/animation saved in .mp4 file. 

In [None]:
from matplotlib.animation import ArtistAnimation, writers
import numpy as np

fig,(ax1,ax2) = plt.subplots(1,2)

im = []

for frame_id,frame in enumerate(video_save):
    plot1 = ax1.imshow(video_check[frame_id])
    plot2 = ax2.imshow(video_save[frame_id])
    
    im.append([plot1,plot2])
    
animation = ArtistAnimation(fig=fig,artists=im, repeat=False, interval = 50)
plt.draw()
plt.show()
animation.save('autoencoder_segmenter_trained_{pulse_number}.mp4', writer=writers['ffmpeg'](fps=1))