In [10]:
import os
import numpy as np
from scipy.io import loadmat
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm

In [11]:
# ===============================================
# Dataset for topology optimization data
# ===============================================
class TopologyDataset(Dataset):
    def __init__(self, input_folder, output_folder):
        self.input_files = sorted([f for f in os.listdir(input_folder) if f.endswith('.mat')])
        self.output_files = sorted([f for f in os.listdir(output_folder) if f.endswith('.mat')])
        self.input_folder = input_folder
        self.output_folder = output_folder
        assert len(self.input_files) == len(self.output_files), "Input/output files mismatch"

    def __len__(self):
        return len(self.input_files)

    def __getitem__(self, idx):
        X_mat = loadmat(os.path.join(self.input_folder, self.input_files[idx]))
        X = X_mat['X']  # H x W x 4
        X = torch.tensor(X.transpose(2,0,1), dtype=torch.float32)  # 4 x H x W

        Y_mat = loadmat(os.path.join(self.output_folder, self.output_files[idx]))
        Y = Y_mat['Y']  # H x W
        Y = torch.tensor(Y[np.newaxis, :, :], dtype=torch.float32) # 1 x H x W

        return X, Y



In [12]:

# ===============================================
# UNet backbone for diffusion (small version)
# ===============================================
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.GroupNorm(1, out_channels),
            nn.GELU(),
            nn.Conv2d(out_channels, out_channels, 3, padding=1),
            nn.GroupNorm(1, out_channels),
            nn.GELU(),
        )
    def forward(self, x):
        return self.net(x)

class UNet(nn.Module):
    def __init__(self, in_channels=5, out_channels=1, features=[64, 128, 256]):
        # in_channels=5 because 4 input + 1 conditional (noisy Y)
        super().__init__()
        self.downs = nn.ModuleList()
        self.ups = nn.ModuleList()

        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        for feature in reversed(features):
            self.ups.append(nn.ConvTranspose2d(feature*2, feature, 2, stride=2))
            self.ups.append(DoubleConv(feature, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []
        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = F.max_pool2d(x, 2)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]
            if x.shape != skip_connection.shape:
                x = F.interpolate(x, size=skip_connection.shape[2:])
            x = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](x)

        return self.final(x)



In [13]:

# ===============================================
# Diffusion Model Class
# ===============================================
class Diffusion(nn.Module):
    def __init__(self, model, n_steps=1000, beta_start=1e-4, beta_end=0.02):
        super().__init__()
        self.model = model
        self.n_steps = n_steps
        self.beta = torch.linspace(beta_start, beta_end, n_steps)
        self.alpha = 1.0 - self.beta
        self.alpha_hat = torch.cumprod(self.alpha, dim=0)

    def forward_diffusion(self, x_0, t, device):
        """Add noise according to time step t"""
        noise = torch.randn_like(x_0)
        sqrt_alpha_hat = torch.sqrt(self.alpha_hat[t])[:, None, None, None].to(device)
        sqrt_one_minus_alpha_hat = torch.sqrt(1 - self.alpha_hat[t])[:, None, None, None].to(device)
        return sqrt_alpha_hat * x_0 + sqrt_one_minus_alpha_hat * noise, noise

    def training_step(self, X, Y, device):
        b = X.size(0)
        t = torch.randint(0, self.n_steps, (b,), device=device)
        x_noisy, noise = self.forward_diffusion(Y, t, device)
        # concat input condition (X) and noisy output
        model_input = torch.cat((X, x_noisy), dim=1)
        predicted_noise = self.model(model_input)
        loss = F.mse_loss(predicted_noise, noise)
        return loss



In [14]:

# ===============================================
# Training Loop
# ===============================================
def train_diffusion(input_folder, output_folder, epochs=100, batch_size=4, lr=1e-4, device='cuda'):
    dataset = TopologyDataset(input_folder, output_folder)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)

    model = UNet(in_channels=5, out_channels=1).to(device)
    diffusion = Diffusion(model).to(device)
    optimizer = torch.optim.Adam(diffusion.parameters(), lr=lr)

    for epoch in range(epochs):
        diffusion.train()
        running_loss = 0
        for X, Y in tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}"):
            X, Y = X.to(device), Y.to(device)
            loss = diffusion.training_step(X, Y, device)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * X.size(0)
        print(f"Epoch {epoch+1}: Loss = {running_loss/len(dataset):.6f}")

    return diffusion



In [15]:

# ===============================================
# Sampling (prediction)
# ===============================================
@torch.no_grad()
def sample(diffusion, X_cond, device):
    diffusion.eval()
    n_steps = diffusion.n_steps
    x = torch.randn(X_cond.size(0), 1, X_cond.size(2), X_cond.size(3), device=device)

    for i in reversed(range(n_steps)):
        t = torch.full((X_cond.size(0),), i, device=device, dtype=torch.long)
        model_input = torch.cat((X_cond, x), dim=1)
        predicted_noise = diffusion.model(model_input)
        beta_t = diffusion.beta[t][:, None, None, None]
        alpha_t = diffusion.alpha[t][:, None, None, None]
        alpha_hat_t = diffusion.alpha_hat[t][:, None, None, None]
        noise = torch.randn_like(x) if i > 0 else 0
        x = (1 / torch.sqrt(alpha_t)) * (x - (1 - alpha_t) / torch.sqrt(1 - alpha_hat_t) * predicted_noise) + torch.sqrt(beta_t) * noise
    return x


In [16]:
import torch

# ===============================================
# Folders (update these paths)
# ===============================================
input_folder = "/data/train/inputs"   # folder containing .mat files with 'X' (H×W×4)
output_folder = "/data/train/outputs" # folder containing .mat files with 'Y' (H×W)

# ===============================================
# Training parameters
# ===============================================
device = 'cuda' if torch.cuda.is_available() else 'cpu'
epochs = 200        # you can start with 50 for testing
batch_size = 8
lr = 1e-4

# ===============================================
# Train the model
# ===============================================
diffusion = train_diffusion(
    input_folder=input_folder,
    output_folder=output_folder,
    epochs=epochs,
    batch_size=batch_size,
    lr=lr,
    device=device
)

# ===============================================
# Save trained model
# ===============================================
torch.save(diffusion.state_dict(), "diffusion_topopt.pth")
print("✅ Model trained and saved as diffusion_topopt.pth")


FileNotFoundError: [Errno 2] No such file or directory: '/data/train/inputs'