In [1]:
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from torch.utils.data import random_split
import torch.optim as optim
import torch
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
# Define dataset class
class StressDataset(Dataset):
    def __init__(self, grid_folder, stress_file):
        self.grid_folder = grid_folder
        self.stress_data = pd.read_csv(stress_file)

        # Create a mapping from file names to max_stress
        self.file_names = sorted(os.listdir(grid_folder))  # Ensure the order is consistent
        self.stress_values = self.stress_data["max_stress"].values

        # Normalize stress values for stable training
        self.min_stress, self.max_stress = np.min(self.stress_values), np.max(self.stress_values)
        self.stress_values = (self.stress_values - self.min_stress) / (self.max_stress - self.min_stress)

    def __len__(self):
        return len(self.file_names)

    def __getitem__(self, idx):
        # Load the occupancy grid
        grid_file = os.path.join(self.grid_folder, self.file_names[idx])
        grid = np.load(grid_file)

        # Convert to tensor and add a channel dimension
        x = torch.tensor(grid, dtype=torch.float32).unsqueeze(0)  # Shape: (1, 64, 64, 64)

        # Get normalized stress value
        y = torch.tensor(self.stress_values[idx], dtype=torch.float32)

        return x, y

# Load dataset
grid_folder = "/Users/User/Documents/Project_JEB/JEB_Data/processed_grids/"
stress_file = "/Users/User/Documents/Project_JEB/JEB_Data/cleaned_workingdata.csv"

In [None]:
# Define dataset
dataset = StressDataset(grid_folder, stress_file)

# Split the dataset (e.g., 80% training, 20% testing)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Print dataset sizes
print(f"Training samples: {len(train_dataset)}, Testing samples: {len(test_dataset)}")

In [None]:
class Conv3DRegression(nn.Module):
    def __init__(self):
        super(Conv3DRegression, self).__init__()

        # 3D Convolutional layers
        self.conv1 = nn.Conv3d(in_channels=1, out_channels=32, kernel_size=5, stride=2, padding=2) # (75, 75, 75)
        self.conv2 = nn.Conv3d(32, 64, kernel_size=5, stride=2, padding=2) # (38, 38, 38)
        self.conv3 = nn.Conv3d(64, 128, kernel_size=3, stride=2, padding=1) # (19, 19, 19)
        self.conv4 = nn.Conv3d(128, 256, kernel_size=3, stride=2, padding=1) # (10, 10, 10)
        self.conv5 = nn.Conv3d(256, 512, kernel_size=3, stride=2, padding=1) # (5, 5, 5)

        # Batch Norm and Activation
        self.bn1 = nn.BatchNorm3d(32)
        self.bn2 = nn.BatchNorm3d(64)
        self.bn3 = nn.BatchNorm3d(128)
        self.bn4 = nn.BatchNorm3d(256)
        self.bn5 = nn.BatchNorm3d(512)

        # Fully Connected Layers
        self.fc1 = nn.Linear(512 * 5 * 5 * 5, 1024)
        self.fc2 = nn.Linear(1024, 256)
        self.fc3 = nn.Linear(256, 1)  # Final output layer (scalar prediction)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.relu(self.bn5(self.conv5(x)))

        # Flatten
        x = torch.flatten(x, start_dim=1)

        # Fully Connected layers
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)  # No activation, since it's a regression problem

        return x

# Example usage
model = Conv3DRegression()
print(model)

In [None]:
# Define loss function and optimizer
criterion = nn.MSELoss()  # Suitable for regression tasks
optimizer = optim.Adam(model.parameters(), lr=0.000001)

In [None]:
# Training function
def train_model(model, train_loader, criterion, optimizer, epochs=70):
    model.train()  # Set model to training mode

    for epoch in range(epochs):
        running_loss = 0.0
        for grids, stresses in train_loader:
            grids, stresses = grids.to("cuda"), stresses.to("cuda")  # Move to GPU if available

            optimizer.zero_grad()  # Reset gradients
            outputs = model(grids)  # Forward pass
            loss = criterion(outputs.squeeze(), stresses)  # Compute loss
            loss.backward()  # Backpropagation
            optimizer.step()  # Update weights

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.6f}")

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Train the model
train_model(model, train_loader, criterion, optimizer, epochs=70)

In [None]:
# Evaluation function
def evaluate_model(model, test_loader, criterion):
    model.eval()  # Set model to evaluation mode
    test_loss = 0.0
    predictions, actuals = [], []

    with torch.no_grad():  # No gradient calculation in eval mode
        for grids, stresses in test_loader:
            grids, stresses = grids.to(device), stresses.to(device)

            outputs = model(grids).squeeze()
            loss = criterion(outputs, stresses)
            test_loss += loss.item()

            predictions.extend(outputs.cpu().numpy())
            actuals.extend(stresses.cpu().numpy())

    print(f"Test Loss: {test_loss/len(test_loader):.6f}")
    return predictions, actuals

# Evaluate the model
predictions, actuals = evaluate_model(model, test_loader, criterion)

In [None]:
plt.scatter(actuals, predictions, alpha=0.6)
plt.xlabel("Actual Stress")
plt.ylabel("Predicted Stress")
plt.title("Actual vs Predicted Stress")
plt.show()