CNN to predict subtrate stiffness

In [None]:
import os
import glob
import numpy as np
from PIL import Image
import torch
import torchvision
from torchvision.models import resnet50, ResNet50_Weights
import torch.optim as optim
import torch.nn.functional as F
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.model_selection import StratifiedKFold
from sklearn import preprocessing
from torch.utils.data import DataLoader
from sklearn.metrics import precision_score, recall_score, f1_score
from torchvision import transforms

img_width, img_height = 128, 128
num_classes = 1
seed = 42
cv_splits = 5
batch_size = 32
epochs = 100

do_not_keep_frozen = True

initial_lr = 1e-3
LR_patience = 5

class NumpyToTensorTransform:
    def __call__(self, pic):
        return torch.from_numpy(np.array(pic, np.float32, copy=False)).permute(2, 0, 1)


# Define the training and test transforms
train_transforms = transforms.Compose([
    NumpyToTensorTransform(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation(30),
    transforms.RandomResizedCrop((img_width, img_height), scale=(0.8, 1.0), antialias= True),
])

test_transforms = transforms.Compose([
    NumpyToTensorTransform(),
])

class Dataset(torch.utils.data.Dataset):
    def __init__(self, data, targets, transform=None):
        self.data = data
        self.targets = torch.LongTensor(targets)
        self.transform = transform

    def __getitem__(self, index):
        x = self.data[index]
        y = self.targets[index]

        # ensure the numpy array has the shape HxWxC
        if x.shape[0] == 1:
            x = np.transpose(x, (1, 2, 0))

        # Normalize to 0-1
        #x = x / 65535.0

        if self.transform:
            x = self.transform(x)

        return x, y

    def __len__(self):
        return len(self.data)


# Function to load images and labels
def load_images(directory):
    image_paths = glob.glob(os.path.join(directory, '*.tif'))
    images = []
    labels = []

    for image_path in image_paths:
        image = Image.open(image_path)
        image = image.resize((img_width, img_height))
        image = np.array(image)
        image = np.stack((image,)*3, axis= -1) #original images are single channel, making it 3 channels
        # Append the image to the list
        images.append(image)

        # Extract the label from the directory name
        label = os.path.basename(os.path.dirname(image_path))
        labels.append(label)

    return np.array(images), np.array(labels)

# Load the images and labels
data_dir = '/content/drive/MyDrive/Just the images/*'  # Replace with the path to your data directory
x_data, y_data = load_images(data_dir)

skf = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=seed)
fold = 1
precision_list = []
recall_list = []
f1_list = []
accuracy_list = []

for train_index, test_index in skf.split(x_data, y_data):
    print(f"Fold: {fold}")

    # Split the data into train and validation sets for this fold
    x_train, x_test = x_data[train_index], x_data[test_index]
    y_train, y_test = y_data[train_index], y_data[test_index]

    #Encode labels from text to integers.
    le = preprocessing.LabelEncoder()
    le.fit(y_train)
    y_train = le.transform(y_train)
    y_test = le.transform(y_test)


    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    train_data = Dataset(x_train, y_train, transform=train_transforms)
    test_data = Dataset(x_test, y_test, transform = test_transforms)

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

    model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
    for param in model.parameters():
      param.requires_grad = keep_frozen

    model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
    model = model.to(device)

    criterion = torch.nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=initial_lr)
    scheduler = ReduceLROnPlateau(optimizer, 'min', patience= LR_patience)

    model.train()
    for epoch in range(epochs):
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data[0].to(device)/255.0, data[1].to(device)
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs.view(-1), labels.float())
            loss.backward()
            optimizer.step()

        scheduler.step(loss)

    model.eval()
    correct = 0
    total = 0
    preds = []
    actual = []
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device)/255.0, data[1].to(device)
            outputs = model(images)
            predicted = (torch.sigmoid(outputs.view(-1)) > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # Append the labels and predictions for precision, recall and F1 score
            preds += predicted.tolist()
            actual += labels.tolist()

    fold += 1
    accuracy_ = correct / total
    print(f'Accuracy: {accuracy_ * 100: 0.2f}')
    precision = precision_score(actual, preds)
    recall = recall_score(actual, preds)
    f1 = f1_score(actual, preds)
    print(f'Precision: {precision:.2f}')
    print(f'Recall: {recall:.2f}')
    print(f'F1 Score: {f1:.2f}')

    # Append the precision, recall, F1 score and accuracy for this fold to the lists
    precision_list.append(precision)
    recall_list.append(recall)
    f1_list.append(f1)
    accuracy_list.append(accuracy_)

# Compute the average precision, recall, F1 score and accuracy across all folds
avg_precision = sum(precision_list) / len(precision_list)
avg_recall = sum(recall_list) / len(recall_list)
avg_f1_score = sum(f1_list) / len(f1_list)
avg_accuracy = sum(accuracy_list) / len(accuracy_list)

print(f'\nAverage accuracy across all folds: {avg_accuracy*100:.2f}')
print(f'Average precision across all folds: {avg_precision:.2f}')
print(f'Average recall across all folds: {avg_recall:.2f}')
print(f'Average F1 score across all folds: {avg_f1_score:.2f}')

Fold: 1
Accuracy:  100.00
Precision: 1.00%
Recall: 1.00%
F1 Score: 1.00%
Fold: 2
Accuracy:  100.00
Precision: 1.00%
Recall: 1.00%
F1 Score: 1.00%
Fold: 3
Accuracy:  90.00
Precision: 1.00%
Recall: 0.80%
F1 Score: 0.89%
Fold: 4
Accuracy:  100.00
Precision: 1.00%
Recall: 1.00%
F1 Score: 1.00%
Fold: 5
Accuracy:  100.00
Precision: 1.00%
Recall: 1.00%
F1 Score: 1.00%

Average accuracy across all folds: 98.00%
Average precision across all folds: 1.00
Average recall across all folds: 0.96
Average F1 score across all folds: 0.98


UNET to predict traction field from phase image

In [None]:
!pip install segmentation_models_pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
from torchvision.transforms import ToTensor, Normalize
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
import segmentation_models_pytorch as smp
import pandas as pd
from PIL import Image
import os
import numpy as np

class Regressor(nn.Module):
    def __init__(self, encoder_name='resnet34', encoder_weights='imagenet'):
        super(Regressor, self).__init__()
        self.unet = smp.Unet(
            encoder_name=encoder_name,
            encoder_weights=encoder_weights,
            in_channels=1,
            classes=2  # For 2D vector output
        )
        self.fc1 = nn.Linear(128 * 128 * 2, 256)  # Adjust dimensions accordingly
        self.fc2 = nn.Linear(256, 2)

    def forward(self, x):
        x = self.unet(x)
        x = x.view(x.size(0), -1)  # Flatten the output tensor
        x = nn.ReLU()(self.fc1(x))
        x = self.fc2(x)
        return x

def load_data(img_dir, csv_dir):
    img_filenames = sorted(os.listdir(img_dir))
    csv_filenames = sorted(os.listdir(csv_dir))
    images = []
    vectors = []
    vector_scaler = MinMaxScaler()  # Initialize MinMaxScaler for vector data

    for img_file, csv_file in zip(img_filenames, csv_filenames):
        img_path = os.path.join(img_dir, img_file)
        csv_path = os.path.join(csv_dir, csv_file)

        image = Image.open(img_path).convert('L')
        transform = transforms.Compose([
            transforms.Resize(128),
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
        ])
        image = transform(image)
        images.append(image)

        csv_data = pd.read_csv(csv_path, usecols=[2, 3]).values
        vectors.append(csv_data)

    vector_data = np.vstack(vectors)  # Stack vectors to fit the scaler
    vector_scaler.fit(vector_data)  # Fit the MinMaxScaler
    vectors = [vector_scaler.transform(vec) for vec in vectors]  # Normalize vectors

    images = torch.stack(images)  # Convert list of tensors to a single tensor
    vectors = [torch.tensor(vec, dtype=torch.float32) for vec in vectors]  # Convert numpy arrays to tensors
    vectors = torch.cat(vectors)  # Concatenate list of tensors to a single tensor

    return images, vectors, vector_scaler

# Load data
img_dir = '/content/drive/MyDrive/Regression_images_traction/Images'
csv_dir = '/content/drive/MyDrive/Regression_images_traction/Traction'
images, vectors, vector_scaler = load_data(img_dir, csv_dir)
images_np = images.numpy()
vectors_np = vectors.numpy()

# Prepare for 5-fold cross-validation
kf = KFold(n_splits=3, shuffle=True, random_state=42)
fold_results = []

for fold, (train_idx, val_idx) in enumerate(kf.split(images)):

    train_images = torch.tensor(images_np[train_idx], dtype=torch.float32)
    val_images = torch.tensor(images_np[val_idx], dtype=torch.float32)
    train_vectors = torch.tensor(vectors_np[train_idx], dtype=torch.float32)
    val_vectors = torch.tensor(vectors_np[val_idx], dtype=torch.float32)


    train_loader = DataLoader(TensorDataset(train_images, train_vectors), batch_size=4, shuffle=True)
    val_loader = DataLoader(TensorDataset(val_images, val_vectors), batch_size=4)

    # Model, loss function, optimizer
    regressor_model = Regressor()
    criterion = nn.SmoothL1Loss()
    optimizer = optim.Adam(regressor_model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)  # Step decay LR scheduler

    # Training loop
    for epoch in range(10):
        regressor_model.train()
        for batch in train_loader:
            images, traction_fields = batch
            optimizer.zero_grad()
            traction_preds = regressor_model(images)
            loss = criterion(traction_preds, traction_fields)
            loss.backward()
            optimizer.step()
        scheduler.step()  # Step decay

    # Validation and metrics computation
    regressor_model.eval()
    val_preds = []
    with torch.no_grad():
        for batch in val_loader:
            images, _ = batch
            traction_preds = regressor_model(images)
            val_preds.append(traction_preds)
    val_preds = torch.cat(val_preds).numpy()
    val_preds = vector_scaler.inverse_transform(val_preds)  # Inverse transform
    val_vectors = vector_scaler.inverse_transform(val_vectors.numpy())  # Inverse transform

    mae = mean_absolute_error(val_vectors, val_preds)
    mape = mean_absolute_percentage_error(val_vectors, val_preds)
    fold_results.append((mae, mape))

# Output results
for fold, (mae, mape) in enumerate(fold_results):
    print(f'Fold {fold + 1}: MAE = {mae}, MAPE = {mape}%')

Fold 1: MAE = 5.904709499127136e-10, MAPE = 106655.703125%
Fold 2: MAE = 1.4391685065895388e-10, MAPE = 37179.69921875%
Fold 3: MAE = 3.0239830084433095e-10, MAPE = 107908.96875%


UNET to predict traction vectors from area and bead displacement vectors

In [None]:
!pip install segmentation_models_pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
import segmentation_models_pytorch as smp
import pandas as pd
import os
import numpy as np

class Regressor(nn.Module):
    def __init__(self, encoder_name='resnet34', encoder_weights='imagenet'):
        super(Regressor, self).__init__()
        self.unet = smp.Unet(
            encoder_name=encoder_name,
            encoder_weights=encoder_weights,
            in_channels=2,
            classes=2
        )
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1935, 2))  # Adjust the pooling layer to match target dimensions

    def forward(self, x):
        x = self.unet(x)
        x = self.adaptive_pool(x)  # Apply Adaptive Average Pooling to match target dimensions
        x = x[..., 0]  # Remove the last dimension by selecting the first index along that dimension
        return x

def load_data(csv_dir):
    csv_filenames = sorted(os.listdir(csv_dir))
    inputs1 = []  # For columns 0, 1
    inputs2 = []  # For columns 4, 5
    targets = []  # For columns 2, 3

    input_scaler1 = MinMaxScaler()  # Initialize MinMaxScaler for input data (columns 0, 1)
    input_scaler2 = MinMaxScaler()  # Initialize MinMaxScaler for input data (columns 4, 5)
    target_scaler = MinMaxScaler()  # Initialize MinMaxScaler for target data (columns 2, 3)

    max_rows = 0
    for csv_file in csv_filenames:
        csv_path = os.path.join(csv_dir, csv_file)
        csv_data = pd.read_csv(csv_path, usecols=[0, 1, 2, 3, 4, 5])
        max_rows = max(max_rows, len(csv_data))

    for csv_file in csv_filenames:
        csv_path = os.path.join(csv_dir, csv_file)
        csv_data = pd.read_csv(csv_path, usecols=[0, 1, 2, 3, 4, 5])
        csv_data.fillna(0, inplace=True)  # Fill NaN values with 0
        csv_data = csv_data.values

        # Pad the data with zeros to match the largest array
        padded_data = np.zeros((max_rows, csv_data.shape[1]))
        padded_data[:csv_data.shape[0], :] = csv_data

        inputs1.append(padded_data[:, :2])  # Get columns 0, 1
        inputs2.append(padded_data[:, 4:])  # Get columns 4, 5
        targets.append(padded_data[:, 2:4])  # Get columns 2, 3 as target

    # Normalize data and convert to tensors
    inputs1 = [input_scaler1.fit_transform(i) for i in inputs1]
    inputs2 = [input_scaler2.fit_transform(i) for i in inputs2]
    targets = [target_scaler.fit_transform(t) for t in targets]

    return inputs1, inputs2, targets, input_scaler1, input_scaler2, target_scaler



csv_dir = '/content/drive/MyDrive/Clean_only_CSV/Regression'
inputs1, inputs2, targets, input_scaler1, input_scaler2, target_scaler = load_data(csv_dir)

max_len = max(max(len(i) for i in inputs1), max(len(i) for i in inputs2))
# Pad to ensure dimensions are divisible by 32
max_len_padded = ((max_len + 31) // 32) * 32
inputs1 = [np.pad(i, ((0, max_len_padded - len(i)), (0, 30))) for i in inputs1]  # Pad width to 32
inputs2 = [np.pad(i, ((0, max_len_padded - len(i)), (0, 30))) for i in inputs2]  # Pad width to 32



# Convert lists to numpy arrays
inputs1_np = np.array(inputs1, dtype=np.float32)
inputs2_np = np.array(inputs2, dtype=np.float32)
inputs1_np = inputs1_np.reshape(-1, 1, max_len_padded, 32)
inputs2_np = inputs2_np.reshape(-1, 1, max_len_padded, 32)
targets_np = np.array(targets, dtype=np.float32)

# Reshape the input data to have a single channel (to match the U-Net input requirements)

# Concatenate along the channel dimension
inputs_np = np.concatenate((inputs1_np, inputs2_np), axis=1)

# Check the shapes of your arrays
print(f'inputs_np shape: {inputs_np.shape}')
print(f'targets_np shape: {targets_np.shape}')

# Ensure the number of samples matches between inputs and targets
assert inputs_np.shape[0] == targets_np.shape[0], "Mismatched number of samples between inputs and targets"

# Prepare for 5-fold cross-validation
kf = KFold(n_splits=3, shuffle=True, random_state=42)
fold_results = []

def pad_to_match_shapes(tensor1, tensor2):
    # Get the maximum dimensions
    max_height = max(tensor1.size(1), tensor2.size(1))
    max_width = max(tensor1.size(2), tensor2.size(2))

    # Pad tensors
    pad_tensor1 = nn.functional.pad(tensor1, (0, max_width - tensor1.size(2), 0, max_height - tensor1.size(1)))
    pad_tensor2 = nn.functional.pad(tensor2, (0, max_width - tensor2.size(2), 0, max_height - tensor2.size(1)))

    return pad_tensor1, pad_tensor2

for fold, (train_idx, val_idx) in enumerate(kf.split(inputs_np)):
    # Get training and validation data
    train_inputs = torch.tensor(inputs_np[train_idx], dtype=torch.float32)
    val_inputs = torch.tensor(inputs_np[val_idx], dtype=torch.float32)
    train_targets = torch.tensor(targets_np[train_idx], dtype=torch.float32)
    val_targets = torch.tensor(targets_np[val_idx], dtype=torch.float32)

    # Create DataLoader instances
    train_loader = DataLoader(TensorDataset(train_inputs, train_targets), batch_size=4, shuffle=True)
    val_loader = DataLoader(TensorDataset(val_inputs, val_targets), batch_size=4)

    # Model, loss function, optimizer
    regressor_model = Regressor()
    criterion = nn.SmoothL1Loss()
    optimizer = optim.Adam(regressor_model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)  # Step decay LR scheduler

    # Training loop
    for epoch in range(10):
      regressor_model.train()
      for batch in train_loader:
        images, traction_fields = batch
        optimizer.zero_grad()
        traction_preds = regressor_model(images)

        # Remove the extra dimension from traction_preds
        traction_preds = traction_preds.squeeze(-1)

        # Now traction_preds should have a shape of [4, 1935, 2]

        # Pad outputs and targets to match shapes
        traction_preds, traction_fields = pad_to_match_shapes(traction_preds, traction_fields)
        #print(f'traction_preds dimensions: {traction_preds.size()}')
        #print(f'traction_fields dimensions: {traction_fields.size()}')
        # Calculate loss
        loss = criterion(traction_preds, traction_fields)
        loss.backward()
        optimizer.step()

      scheduler.step()  # Step decay

    # Validation and metrics computation
    regressor_model.eval()
    val_preds = []
    with torch.no_grad():
        for batch in val_loader:
            images, _ = batch
            traction_preds = regressor_model(images)
            val_preds.append(traction_preds)
    val_preds = torch.cat(val_preds).numpy()

    # Inverse transform the predictions and targets for metric calculations
    val_preds = target_scaler.inverse_transform(val_preds.reshape(-1, 2))
    val_targets = target_scaler.inverse_transform(val_targets.numpy().reshape(-1, 2))

    mae = mean_absolute_error(val_targets, val_preds)
    mape = mean_absolute_percentage_error(val_targets, val_preds)
    fold_results.append((mae, mape))

# Output results
for fold, (mae, mape) in enumerate(fold_results):
    print(f'Fold {fold + 1}: MAE = {mae}, MAPE = {mape}%')

inputs_np shape: (52, 2, 1952, 32)
targets_np shape: (52, 1935, 2)
Fold 1: MAE = 9.832324190939268e-12, MAPE = 17.874073028564453%
Fold 2: MAE = 1.015167290507879e-11, MAPE = 16.227169036865234%
Fold 3: MAE = 1.0226283361780553e-11, MAPE = 1704.95703125%


UNET to predict traction vectors from both phase image and bead displacement and area

In [None]:
#!pip install segmentation_models_pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
from torchvision.transforms import ToTensor, Normalize
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
import segmentation_models_pytorch as smp
import pandas as pd
from PIL import Image
import os
import numpy as np

class MultiModalRegressor(nn.Module):
    def __init__(self):
        super(MultiModalRegressor, self).__init__()
        # Image U-Net
        self.image_unet = smp.Unet(encoder_name='resnet34', encoder_weights='imagenet', in_channels=1, classes=2)
        # CSV U-Net
        self.csv_unet = smp.Unet(encoder_name='resnet34', encoder_weights='imagenet', in_channels=2, classes=2)
        # Fusion and Prediction
        self.image_fc = nn.Linear(2, 4)
        self.fc_fusion = nn.Linear(6, 256)  # Adjust dimensions accordingly
        self.fc_prediction = nn.Linear(256, 2)


    def forward(self, image_input, csv_input):
      image_features = self.image_unet(image_input)
      image_features = torch.mean(image_features, dim=[2, 3])
      image_features = self.image_fc(image_features)
      csv_features = self.csv_unet(csv_input)
      csv_features = torch.mean(csv_features, dim=[2, 3])



      # Concatenate features
      fused_features = torch.cat((image_features, csv_features), dim=1)

      # Fusion layer
      fused_features = nn.ReLU()(self.fc_fusion(fused_features))

      # Prediction
      predictions = self.fc_prediction(fused_features)

      return predictions


def load_data_img(img_dir, csv_dir):
    img_filenames = sorted(os.listdir(img_dir))
    csv_filenames = sorted(os.listdir(csv_dir))
    images = []
    vectors = []
    vector_scaler = MinMaxScaler()  # Initialize MinMaxScaler for vector data

    for img_file, csv_file in zip(img_filenames, csv_filenames):
        img_path = os.path.join(img_dir, img_file)
        csv_path = os.path.join(csv_dir, csv_file)

        image = Image.open(img_path).convert('L')
        transform = transforms.Compose([
            transforms.Resize(128),
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
        ])
        image = transform(image)
        images.append(image)

        csv_data = pd.read_csv(csv_path, usecols=[2, 3]).values
        vectors.append(csv_data)

    vector_data = np.vstack(vectors)  # Stack vectors to fit the scaler
    vector_scaler.fit(vector_data)  # Fit the MinMaxScaler
    vectors = [vector_scaler.transform(vec) for vec in vectors]  # Normalize vectors

    images = torch.stack(images)  # Convert list of tensors to a single tensor
    vectors = [torch.tensor(vec, dtype=torch.float32) for vec in vectors]  # Convert numpy arrays to tensors
    vectors = torch.cat(vectors)  # Concatenate list of tensors to a single tensor

    return images, vectors, vector_scaler


def load_data_csv(csv_dir):
    csv_filenames = sorted(os.listdir(csv_dir))
    inputs1 = []  # For columns 0, 1
    inputs2 = []  # For columns 4, 5
    targets = []  # For columns 2, 3

    input_scaler1 = MinMaxScaler()  # Initialize MinMaxScaler for input data (columns 0, 1)
    input_scaler2 = MinMaxScaler()  # Initialize MinMaxScaler for input data (columns 4, 5)
    target_scaler1 = MinMaxScaler()  # For column 2
    target_scaler2 = MinMaxScaler()  # Initialize MinMaxScaler for target data (columns 2, 3)

    max_rows = 0
    for csv_file in csv_filenames:
        csv_path = os.path.join(csv_dir, csv_file)
        csv_data = pd.read_csv(csv_path, usecols=[0, 1, 2, 3, 4, 5])
        max_rows = max(max_rows, len(csv_data))

    for csv_file in csv_filenames:
        csv_path = os.path.join(csv_dir, csv_file)
        csv_data = pd.read_csv(csv_path, usecols=[0, 1, 2, 3, 4, 5])
        csv_data.fillna(0, inplace=True)  # Fill NaN values with 0
        csv_data = csv_data.values

        # Pad the data with zeros to match the largest array
        padded_data = np.zeros((max_rows, csv_data.shape[1]))
        padded_data[:csv_data.shape[0], :] = csv_data

        inputs1.append(padded_data[:, :2])  # Get columns 0, 1
        inputs2.append(padded_data[:, 4:])  # Get columns 4, 5
        targets.append(padded_data[:, 2:4])  # Get columns 2, 3 as target

    # Normalize data and convert to tensors
    inputs1 = [input_scaler1.fit_transform(i) for i in inputs1]
    inputs2 = [input_scaler2.fit_transform(i) for i in inputs2]
    targets1 = [target_scaler1.fit_transform(t[:, 0].reshape(-1, 1)) for t in targets]
    targets2 = [target_scaler2.fit_transform(t[:, 1].reshape(-1, 1)) for t in targets]
    targets = [np.hstack((t1, t2)) for t1, t2 in zip(targets1, targets2)]

    return inputs1, inputs2, targets, input_scaler1, input_scaler2, target_scaler1, target_scaler2


csv_dir = '/content/drive/MyDrive/Clean_only_CSV/Regression'
inputs1, inputs2, targets, input_scaler1, input_scaler2, target_scaler1, target_scaler2 = load_data_csv(csv_dir)

max_len = max(max(len(i) for i in inputs1), max(len(i) for i in inputs2))
# Pad to ensure dimensions are divisible by 32
max_len_padded = ((max_len + 31) // 32) * 32
inputs1 = [np.pad(i, ((0, max_len_padded - len(i)), (0, 30))) for i in inputs1]  # Pad width to 32
inputs2 = [np.pad(i, ((0, max_len_padded - len(i)), (0, 30))) for i in inputs2]  # Pad width to 32



# Convert lists to numpy arrays
inputs1_np = np.array(inputs1, dtype=np.float32)
inputs2_np = np.array(inputs2, dtype=np.float32)
inputs1_np = inputs1_np.reshape(-1, 1, max_len_padded, 32)
inputs2_np = inputs2_np.reshape(-1, 1, max_len_padded, 32)
targets_np = np.array(targets, dtype=np.float32)

# Reshape the input data to have a single channel (to match the U-Net input requirements)

# Concatenate along the channel dimension
inputs_np = np.concatenate((inputs1_np, inputs2_np), axis=1)

# Check the shapes of your arrays
print(f'inputs_np shape: {inputs_np.shape}')
print(f'targets_np shape: {targets_np.shape}')

# Ensure the number of samples matches between inputs and targets
assert inputs_np.shape[0] == targets_np.shape[0], "Mismatched number of samples between inputs and targets"


# Load data
img_dir = '/content/drive/MyDrive/Regression_images_traction/Images'
csv_dir = '/content/drive/MyDrive/Regression_images_traction/Traction'
images, vectors, vector_scaler = load_data_img(img_dir, csv_dir)
images_np = images.numpy()
vectors_np = vectors.numpy()


kf = KFold(n_splits=3, shuffle=True, random_state=42)
fold_results = []

for fold, (train_idx, val_idx) in enumerate(kf.split(inputs_np)):
    # Get training and validation data
    train_inputs = torch.tensor(inputs_np[train_idx], dtype=torch.float32)
    val_inputs = torch.tensor(inputs_np[val_idx], dtype=torch.float32)
    train_targets = torch.tensor(targets_np[train_idx], dtype=torch.float32)
    val_targets = torch.tensor(targets_np[val_idx], dtype=torch.float32)

    # Create DataLoader instances
    train_loader = DataLoader(TensorDataset(images[train_idx], train_inputs, train_targets), batch_size=4, shuffle=True)
    val_loader = DataLoader(TensorDataset(images[val_idx], val_inputs, val_targets), batch_size=4)
    # Model, loss function, optimizer
    multi_modal_regressor = MultiModalRegressor()
    criterion = nn.SmoothL1Loss()
    optimizer = optim.Adam(multi_modal_regressor.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)  # Step decay LR scheduler

    # Training loop
    for epoch in range(1):
      multi_modal_regressor.train()
      for img_data, csv_data, targets in train_loader:
          optimizer.zero_grad()
          predictions = multi_modal_regressor(img_data, csv_data)

          # Assuming your targets are in the csv_data (if not, adjust accordingly)
          loss = criterion(predictions, targets.mean(dim=1))
          loss.backward()
          optimizer.step()

      scheduler.step()

# Evaluation
    multi_modal_regressor.eval()
    val_preds_list = []
    val_targets_list = []

    with torch.no_grad():
      for img_data, csv_data, targets in val_loader:
        targets = targets[:, 0, :]
        predictions = multi_modal_regressor(img_data, csv_data)
        print(f"Predictions shape for this batch: {predictions.shape}")
        print(f"Targets shape for this batch: {targets.shape}")
        val_preds_list.append(predictions)
        val_targets_list.append(targets)
        print(f"Total number of predictions so far: {len(val_preds_list) * predictions.shape[0]}")
        print(f"Total number of targets so far: {len(val_targets_list) * targets.shape[0]}")


    val_preds = torch.cat(val_preds_list, dim=0)
    print(f"val_preds shape: {val_preds.shape}")  # Add this line
    val_targets = torch.cat(val_targets_list, dim=0)


    # Stack predictions and targets
    val_preds_aggregated1 = val_preds[:, 0].numpy()
    val_preds_aggregated2 = val_preds[:, 1].numpy()
    val_targets_aggregated1 = val_targets[:, 0].numpy()
    val_targets_aggregated2 = val_targets[:, 1].numpy()

    val_preds_aggregated1 = target_scaler1.inverse_transform(val_preds_aggregated1.reshape(-1, 1)).flatten()
    val_preds_aggregated2 = target_scaler2.inverse_transform(val_preds_aggregated2.reshape(-1, 1)).flatten()
    val_targets_aggregated1 = target_scaler1.inverse_transform(val_targets_aggregated1.reshape(-1, 1)).flatten()
    val_targets_aggregated2 = target_scaler2.inverse_transform(val_targets_aggregated2.reshape(-1, 1)).flatten()

    val_preds_aggregated = np.column_stack((val_preds_aggregated1, val_preds_aggregated2))
    val_targets_aggregated = np.column_stack((val_targets_aggregated1, val_targets_aggregated2))

    assert val_preds_aggregated.shape == val_targets_aggregated.shape, f"Mismatched shapes: {val_preds_aggregated.shape} vs {val_targets_aggregated.shape}"

    # Now compute the error
    mae = mean_absolute_error(val_targets_aggregated, val_preds_aggregated)
    mape = mean_absolute_percentage_error(val_targets_aggregated, val_preds_aggregated)
    fold_results.append((mae, mape))
# Output results
for fold, (mae, mape) in enumerate(fold_results):
    print(f'Fold {fold + 1}: MAE = {mae}, MAPE = {mape}%')


inputs_np shape: (52, 2, 1952, 32)
targets_np shape: (52, 1935, 2)
Predictions shape for this batch: torch.Size([4, 2])
Targets shape for this batch: torch.Size([4, 2])
Total number of predictions so far: 4
Total number of targets so far: 4
Predictions shape for this batch: torch.Size([4, 2])
Targets shape for this batch: torch.Size([4, 2])
Total number of predictions so far: 8
Total number of targets so far: 8
Predictions shape for this batch: torch.Size([4, 2])
Targets shape for this batch: torch.Size([4, 2])
Total number of predictions so far: 12
Total number of targets so far: 12
Predictions shape for this batch: torch.Size([4, 2])
Targets shape for this batch: torch.Size([4, 2])
Total number of predictions so far: 16
Total number of targets so far: 16
Predictions shape for this batch: torch.Size([2, 2])
Targets shape for this batch: torch.Size([2, 2])
Total number of predictions so far: 10
Total number of targets so far: 10
val_preds shape: torch.Size([18, 2])
Predictions shape fo