In [1]:
import os
import torch
import numpy as np
from torch import nn
import torch.optim as optim
from torchmetrics import Accuracy
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# device = 'cpu'

In [2]:
# path = os.path.abspath('data/')
path = os.path.abspath('../1_Simulation_Results/ML_numpy_files/')

path

'/home/upadesh/3_Codes/6_Au_Au_Laser/1_Simulation_Results/ML_numpy_files'

In [3]:
X, y = np.load(path+'/X.npy'), np.load(path+'/y.npy')

In [4]:
X.shape, y.shape

((406, 201, 401), (406, 201, 401))

In [8]:
# Reshape X and y to match PyTorch's Conv2d input format: (batch_size, channels, height, width)
X_reshaped = X[:, np.newaxis, :, :]  # Shape: (406, 1, 201, 401)
y_reshaped = y[:, np.newaxis, :, :]  # Shape: (406, 1, 201, 401)

# Split data into training and testing sets
X_, X_test, y_, y_test = train_test_split(X_reshaped, y_reshaped, test_size=0.1, random_state=69)
X_train, X_val, y_train, y_val = train_test_split(X_, y_, test_size=0.11, random_state=69)

# Convert numpy arrays to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)

In [9]:
print(f'Feature [batch, channel, width, height] => Train: {X_train.shape} | Test: {X_test.shape} | Validation {X_val.shape}')
print(f'Label   [batch, channel, width, height] => Train: {y_train.shape} | Test: {y_test.shape} | Validation {y_val.shape}')

Feature [batch, channel, width, height] => Train: torch.Size([324, 1, 201, 401]) | Test: torch.Size([41, 1, 201, 401]) | Validation torch.Size([41, 1, 201, 401])
Label   [batch, channel, width, height] => Train: torch.Size([324, 1, 201, 401]) | Test: torch.Size([41, 1, 201, 401]) | Validation torch.Size([41, 1, 201, 401])


In [None]:
import torch
import torch.nn as nn

class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()
        
        def conv_block(in_channels, out_channels):
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
                nn.ReLU(inplace=True)
            )
        
        self.encoder1 = conv_block(in_channels, 64)
        self.encoder2 = conv_block(64, 128)
        self.encoder3 = conv_block(128, 256)
        self.encoder4 = conv_block(256, 512)
        
        self.pool = nn.MaxPool2d(2)
        
        self.middle = conv_block(512, 1024)
        
        self.upconv4 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.decoder4 = conv_block(1024, 512)
        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.decoder3 = conv_block(512, 256)
        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.decoder2 = conv_block(256, 128)
        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder1 = conv_block(128, 64)
        
        self.final = nn.Conv2d(64, out_channels, kernel_size=1)
    
    def forward(self, x):
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool(enc1))
        enc3 = self.encoder3(self.pool(enc2))
        enc4 = self.encoder4(self.pool(enc3))
        
        middle = self.middle(self.pool(enc4))
        
        dec4 = self.upconv4(middle)
        dec4 = self.pad_and_crop(enc4, dec4)
        dec4 = torch.cat((dec4, dec4), dim=1)
        dec4 = self.decoder4(dec4)
        
        dec3 = self.upconv3(dec4)
        dec3 = self.pad_and_crop(enc3, dec3)
        dec3 = torch.cat((dec3, enc3), dim=1)
        dec3 = self.decoder3(dec3)
        
        dec2 = self.upconv2(dec3)
        dec2 = self.pad_and_crop(enc2, dec2)
        dec2 = torch.cat((dec2, enc2), dim=1)
        dec2 = self.decoder2(dec2)
        
        dec1 = self.upconv1(dec2)
        dec1 = self.pad_and_crop(enc1, dec1)
        dec1 = torch.cat((dec1, enc1), dim=1)
        dec1 = self.decoder1(dec1)
        
        out = self.final(dec1)
        return out
    
    def pad_and_crop(self, target, tensor):
        """Pad the tensor and crop to match the target dimensions."""
        _, _, target_height, target_width = target.size()
        _, _, tensor_height, tensor_width = tensor.size()
        
        # Padding
        pad_h = max(0, target_height - tensor_height)
        pad_w = max(0, target_width - tensor_width)
        
        pad_left = pad_w // 2
        pad_right = pad_w - pad_left
        pad_top = pad_h // 2
        pad_bottom = pad_h - pad_top
        
        tensor = F.pad(tensor, (pad_left, pad_right, pad_top, pad_bottom), mode='constant', value=0)
        
        # Cropping
        delta_h = (tensor.size(2) - target_height) // 2
        delta_w = (tensor.size(3) - target_width) // 2
        
        return tensor[:, :, delta_h: delta_h + target_height, delta_w: delta_w + target_width]


In [None]:
from torch.utils.data import Dataset, DataLoader
import numpy as np

class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        feature = torch.tensor(self.features[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return feature, label

# Example usage:
# features = np.random.randn(406, 1, 201, 401)  # Replace with your feature data
# labels = np.random.randint(0, 2, (406, 1, 201, 401))  # Replace with your label data

dataset = CustomDataset(X_train, y_train)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)


In [None]:
import torch.optim as optim
from sklearn.model_selection import train_test_split

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = UNet(in_channels=1, out_channels=1).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Example training loop
for epoch in range(10):  # Adjust number of epochs
    model.train()
    running_loss = 0.0
    for inputs, targets in dataloader:
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * inputs.size(0)
    
    epoch_loss = running_loss / len(dataset)
    print(f'Epoch {epoch+1}, Loss: {epoch_loss:.4f}')


In [None]:
torch.save(model, 'Unet_model.pth')
print('Model saved successfully!')

In [None]:
# Load the entire model
model = torch.load('Unet_model.pth')
model.eval()  # Set the model to evaluation mode


In [None]:
from sklearn.metrics import accuracy_score

model.eval()
all_preds = []
all_targets = []

with torch.no_grad():
    for inputs, targets in dataloader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        preds = torch.sigmoid(outputs).cpu().numpy()
        all_preds.append(preds)
        all_targets.append(targets.cpu().numpy())

all_preds = np.concatenate(all_preds, axis=0)
all_targets = np.concatenate(all_targets, axis=0)

# Binary predictions
all_preds = (all_preds > 0.5).astype(np.float32)

# Example evaluation metric
accuracy = accuracy_score(all_targets.flatten(), all_preds.flatten())
print(f'Accuracy: {accuracy:.4f}')


In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

# Assuming your test data is stored in test_features and has shape (num_samples, 1, 201, 401)
# Replace test_features with your actual test data array
test_features = X_test[:,:,:,:]  # Example data, replace with actual test data

# Convert the test features to PyTorch tensors
test_tensor = torch.tensor(test_features, dtype=torch.float32)

# Create a DataLoader for your test set
test_dataset = TensorDataset(test_tensor)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Set the model to evaluation mode
model.eval()

# Move model to the correct device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Store predictions
all_predictions = []

# Disable gradient computation for testing
with torch.no_grad():
    for inputs in test_loader:
        inputs = inputs[0].to(device)  # Get the input tensor and move it to the device
        
        # Forward pass to get the output
        outputs = model(inputs)
        
        # Apply sigmoid to get probabilities if using BCEWithLogitsLoss
        predictions = torch.sigmoid(outputs)
        
        # Convert probabilities to binary predictions (0 or 1)
        predicted_labels = (predictions > 0.5).float()  # Threshold at 0.5
        
        # Move predictions back to the CPU and convert to numpy
        all_predictions.append(predicted_labels.cpu().numpy())

# Concatenate all predictions
all_predictions = np.concatenate(all_predictions, axis=0)

# Output predictions
print("Predictions shape:", all_predictions.shape)
# print(all_predictions)

error = all_predictions - np.array(y_test)

In [None]:
t_step = 20
# laser_pos = (125 + time[t_step]*laser_speed)* 401/1000  # Laser actual position in true dimension

fig, (ax1, ax2, ax3) = plt.subplots(3,1, figsize=(12,12), frameon=True)
cmap = plt.get_cmap('RdYlGn_r')
cmap.set_under('white', alpha=0)
hmap1 = ax1.imshow(all_predictions[t_step][0], cmap=cmap, vmin=0.5, vmax=1.0, aspect=0.5,  interpolation='quadric')
cmap = plt.get_cmap('Wistia')
cmap.set_under('white', alpha=0) 
hmap2 = ax1.imshow(1-all_predictions[t_step][0], cmap=cmap, vmin=0.5, vmax=1.5, aspect=0.5, interpolation='quadric')


cmap = plt.get_cmap('RdYlGn_r')
cmap.set_under('white', alpha=0)
hmap1 = ax2.imshow(y_test[t_step][0] , cmap=cmap, vmin=0.5, vmax=1.0, aspect=0.5,  interpolation='quadric')
cmap = plt.get_cmap('Wistia')
cmap.set_under('white', alpha=0) 
hmap2 = ax2.imshow(1-y_test[t_step][0], cmap=cmap, vmin=0.5, vmax=1.5, aspect=0.5, interpolation='quadric')

cmap = plt.get_cmap('RdYlGn_r')
cmap.set_under('white', alpha=0)
hmap1 = ax3.imshow(error[t_step][0], cmap=cmap, aspect=0.5,  interpolation='quadric')
cmap = plt.get_cmap('Wistia')
cmap.set_under('white', alpha=0) 
hmap2 = ax3.imshow(1-error[t_step][0], cmap=cmap, aspect=0.5, interpolation='quadric')


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# U-Net Model
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        
        # Encoder: Contracting path
        self.enc_conv1 = self.conv_block(1, 64)
        self.enc_conv2 = self.conv_block(64, 128)
        self.enc_conv3 = self.conv_block(128, 256)
        self.enc_conv4 = self.conv_block(256, 512)
        
        # Decoder: Expanding path
        self.upconv3 = self.upconv_block(512, 256)
        self.upconv2 = self.upconv_block(512, 128)  # Adjusted channels
        self.upconv1 = self.upconv_block(256, 64)   # Adjusted channels
        
        # Final output layer
        self.out_conv = nn.Conv2d(128, 1, kernel_size=1)  # Adjusted channels
    
    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
    
    def upconv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
    
    def crop_and_concat(self, enc_output, dec_output):
        """
        Crops enc_output to match the size of dec_output, and then concatenates along the channel axis.
        """
        enc_output_size = enc_output.size()[2:]  # Get H, W of encoder output
        dec_output_size = dec_output.size()[2:]  # Get H, W of decoder output
        
        # Compute the difference in sizes (crop to match decoder size)
        crop_h = (enc_output_size[0] - dec_output_size[0]) // 2
        crop_w = (enc_output_size[1] - dec_output_size[1]) // 2
        
        enc_output_cropped = enc_output[:, :, crop_h:enc_output_size[0] - crop_h, crop_w:enc_output_size[1] - crop_w]
        
        # In case of odd differences, crop one more pixel from the bottom/right
        if enc_output_cropped.size()[2] != dec_output.size()[2]:
            enc_output_cropped = enc_output_cropped[:, :, :-1, :]
        if enc_output_cropped.size()[3] != dec_output.size()[3]:
            enc_output_cropped = enc_output_cropped[:, :, :, :-1]
        
        return torch.cat([enc_output_cropped, dec_output], dim=1)
    
    def forward(self, x):
        # Encoding
        enc1 = self.enc_conv1(x)   # Output: (batch_size, 64, 201, 401)
        enc2 = self.enc_conv2(F.max_pool2d(enc1, 2))  # Output: (batch_size, 128, 100, 200)
        enc3 = self.enc_conv3(F.max_pool2d(enc2, 2))  # Output: (batch_size, 256, 50, 100)
        enc4 = self.enc_conv4(F.max_pool2d(enc3, 2))  # Output: (batch_size, 512, 25, 50)
        
        # Decoding
        dec3 = self.upconv3(enc4)  # Output: (batch_size, 256, 50, 100)
        dec3 = self.crop_and_concat(enc3, dec3)  # Concatenate with enc3 (batch_size, 512, 50, 100)
        
        dec2 = self.upconv2(dec3)  # Output: (batch_size, 128, 100, 200)
        dec2 = self.crop_and_concat(enc2, dec2)  # Concatenate with enc2 (batch_size, 256, 100, 200)
        
        dec1 = self.upconv1(dec2)  # Output: (batch_size, 64, 201, 401)
        dec1 = self.crop_and_concat(enc1, dec1)  # Concatenate with enc1 (batch_size, 128, 201, 401)
        
        # Output
        out = self.out_conv(dec1)  # Output: (batch_size, 1, 201, 401)
        return torch.sigmoid(out)  # Binary classification (pixel-wise)


In [None]:
# # Device configuration
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# # Initialize model
# model = UNet().to(device)

# # Loss and optimizer
# criterion = nn.BCELoss()  # Binary Cross-Entropy loss
# optimizer = optim.Adam(model.parameters(), lr=0.001)

# X_train = X_train.to(device)  # Move input tensor to GPU if available
# y_train = y_train.to(device)  


# # Train the model
# epochs = 10
# batch_size = 2

# for epoch in range(epochs):
#     model.train()
#     total_loss = 0
#     for i in range(0, len(X_train), batch_size):
#         X_batch = X_train[i:i+batch_size]
#         y_batch = y[i:i+batch_size]
        
#         # Forward pass
#         outputs = model(X_batch)
#         loss = criterion(outputs, y_batch)
        
#         # Backward pass and optimization
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
        
#         total_loss += loss.item()
    
#     print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(X_train):.4f}')


In [None]:
import torch
import torch.nn as nn

class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        
        # Encoder
        self.enc_conv1 = self.conv_block(1, 64)
        self.enc_conv2 = self.conv_block(64, 128)
        self.enc_conv3 = self.conv_block(128, 256)
        self.enc_conv4 = self.conv_block(256, 512)
        
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Decoder
        self.up_conv4 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec_conv4 = self.conv_block(512, 256)
        
        self.up_conv3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec_conv3 = self.conv_block(256, 128)
        
        self.up_conv2 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec_conv2 = self.conv_block(128, 64)
        
        self.up_conv1 = nn.ConvTranspose2d(64, 64, kernel_size=2, stride=2)
        self.dec_conv1 = self.conv_block(128, 64)
        
        # Output layer
        self.final_conv = nn.Conv2d(64, 1, kernel_size=1)  # Output channels: 1 for binary classification

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        # Encoder path
        enc1 = self.enc_conv1(x)
        enc2 = self.enc_conv2(self.pool(enc1))
        enc3 = self.enc_conv3(self.pool(enc2))
        enc4 = self.enc_conv4(self.pool(enc3))
        
        print(f'Encoder feature sizes: {enc1.size()}, {enc2.size()}, {enc3.size()}, {enc4.size()}')
        
        # Decoder path
        dec4 = self.up_conv4(enc4)
        dec4 = self.crop_and_concat(enc3, dec4)
        dec4 = self.dec_conv4(dec4)
        
        dec3 = self.up_conv3(dec4)
        dec3 = self.crop_and_concat(enc2, dec3)
        dec3 = self.dec_conv3(dec3)
        
        dec2 = self.up_conv2(dec3)
        dec2 = self.crop_and_concat(enc1, dec2)
        dec2 = self.dec_conv2(dec2)
        
        dec1 = self.up_conv1(dec2)
        dec1 = self.crop_and_concat(x, dec1)
        dec1 = self.dec_conv1(dec1)
        
        print(f'Decoder feature sizes: {dec4.size()}, {dec3.size()}, {dec2.size()}, {dec1.size()}')
        
        return torch.sigmoid(self.final_conv(dec1))  # Output activation for binary classification
    
    def crop_and_concat(self, enc_feature, dec_feature):
        """
        Crop the encoder feature map to match the size of the decoder feature map and concatenate.
        """
        enc_size = enc_feature.size()
        dec_size = dec_feature.size()
        print(f'Cropping: enc_feature size {enc_size}, dec_feature size {dec_size}')
        if enc_size[2] > dec_size[2] or enc_size[3] > dec_size[3]:
            # Center-crop the encoder feature map
            start_x = (enc_size[3] - dec_size[3]) // 2
            start_y = (enc_size[2] - dec_size[2]) // 2
            enc_feature = enc_feature[:, :, start_y:start_y + dec_size[2], start_x:start_x + dec_size[3]]
        elif enc_size[2] < dec_size[2] or enc_size[3] < dec_size[3]:
            # Pad the encoder feature map
            pad_x = (dec_size[3] - enc_size[3]) // 2
            pad_y = (dec_size[2] - enc_size[2]) // 2
            enc_feature = nn.functional.pad(enc_feature, (pad_x, pad_x, pad_y, pad_y))
        return torch.cat((enc_feature, dec_feature), dim=1)


In [None]:
batch_size = 8
dataset = TensorDataset(X_train, y_train)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast

# Instantiate and move the model to the appropriate device
model = UNet().to(device)

# Define the loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scaler = GradScaler()

# Training loop
num_epochs = 10
accumulation_steps = 2

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for i, (X_batch, y_batch) in enumerate(data_loader):
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        
        optimizer.zero_grad()
        
        with autocast():
            output = model(X_batch)
            loss = criterion(output, y_batch)
        
        scaler.scale(loss).backward()
        
        if (i + 1) % accumulation_steps == 0 or (i + 1) == len(data_loader):
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
        
        running_loss += loss.item()
    
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss/len(data_loader)}')

print("Training complete.")


In [None]:
X_train.shape, y_train.shape

In [None]:
torch.cuda.empty_cache()

In [None]:
test_predictions_np.shape

In [None]:
np.unique(test_predictions_np), np.unique(y_test_np)

In [None]:
X_test_tensor_np = X_test_tensor.cpu().numpy()

In [None]:
t_step = 5
# laser_pos = (125 + time[t_step]*laser_speed)* 401/1000  # Laser actual position in true dimension

fig, (ax1, ax2, ax3) = plt.subplots(3,1, figsize=(12,12), frameon=True)
cmap = plt.get_cmap('RdYlGn_r')
cmap.set_under('white', alpha=0)
hmap1 = ax1.imshow(test_predictions_np[t_step][0], cmap=cmap, vmin=0.5, vmax=1.0, aspect=0.5,  interpolation='quadric')
cmap = plt.get_cmap('Wistia')
cmap.set_under('white', alpha=0) 
hmap2 = ax1.imshow(1-test_predictions_np[t_step][0], cmap=cmap, vmin=0.5, vmax=1.5, aspect=0.5, interpolation='quadric')


cmap = plt.get_cmap('RdYlGn_r')
cmap.set_under('white', alpha=0)
hmap1 = ax2.imshow(y_test_np[t_step][0], cmap=cmap, vmin=0.5, vmax=1.0, aspect=0.5,  interpolation='quadric')
cmap = plt.get_cmap('Wistia')
cmap.set_under('white', alpha=0) 
hmap2 = ax2.imshow(1-y_test_np[t_step][0], cmap=cmap, vmin=0.5, vmax=1.5, aspect=0.5, interpolation='quadric')

cmap = plt.get_cmap('RdYlGn_r')
cmap.set_under('white', alpha=0)
hmap1 = ax3.imshow(X_test_tensor_np[t_step][0], cmap=cmap, vmin=0.5, vmax=1.0, aspect=0.5,  interpolation='quadric')
cmap = plt.get_cmap('Wistia')
cmap.set_under('white', alpha=0) 
hmap2 = ax3.imshow(1-X_test_tensor_np[t_step][0], cmap=cmap, vmin=0.5, vmax=1.5, aspect=0.5, interpolation='quadric')
