In [1]:
from ansys.dpf import post
from ansys.dpf import core as dpf
from ansys.dpf.post import examples, DataFrame
import numpy as np
import torch
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv, MessagePassing
from torch_geometric.loader import DataLoader
from torch import nn
from torch.utils.data import Dataset

In [2]:
ds = dpf.DataSources()
ds.set_result_file_path("d3plot")
model = dpf.Model(ds)

In [3]:
print(model)

DPF Model
------------------------------
Unknown analysis
Unit system: Undefined
Physics Type: Unknown
Available results:
     -  global_kinetic_energy: TimeFreq_steps Global Kinetic Energy
     -  global_internal_energy: TimeFreq_steps Global Internal Energy
     -  global_total_energy: TimeFreq_steps Global Total Energy
     -  global_velocity: TimeFreq_steps Global Velocity
     -  initial_coordinates: Nodal Initial Coordinates
     -  coordinates: Nodal Coordinates
     -  velocity: Nodal Velocity      
     -  acceleration: Nodal Acceleration
     -  stress: Elemental Stress      
     -  stress_von_mises: Elemental Stress Von Mises
     -  plastic_strain_eqv: Elemental Plastic Strain Eqv
     -  thickness: Elemental Thickness
     -  shell_x_membrane_resultant: Elemental Shell X Membrane Resultant
     -  shell_y_membrane_resultant: Elemental Shell Y Membrane Resultant
     -  shell_xy_membrane_resultant: Elemental Shell Xy Membrane Resultant
     -  shell_x_bending_resultant: El

## Reconstruct Mesh information

In [4]:
# Mesh and material info
simulation = post.load_simulation(ds, "static mechanical")
mesh = simulation.mesh
nodes_index = mesh.node_ids - 1 # zero based indexing
material_id = mesh.materials.array

# Node material id 
node_material_ids = np.zeros(len(mesh.nodes), dtype=int)
for element_index, element_connectivity in enumerate(mesh.element_to_node_connectivity): # zero based indexing
    for node_index in element_connectivity:
        node_material_ids[node_index] = material_id[element_index]

In [5]:
mesh.plot(show_node_ids=False, show_element_ids=False)

Widget(value='<iframe src="http://localhost:60604/index.html?ui=P_0x19c2327d810_0&reconnect=auto" class="pyvis…

In [7]:
def print_material_info(node_material_ids):
    mat, count_ = np.unique(node_material_ids, return_counts=True)
    print("Material IDs in the mesh:", mat)
    print("Counts of each material ID:", count_)
print_material_info(node_material_ids)

Material IDs in the mesh: [1 2]
Counts of each material ID: [ 289 1651]


## Formulate Graph with Nodes and Edges

In [9]:
# Mapping of real FEM edges per element type
REAL_EDGES = {
    "QuadShell4": [
        (0,1), (1,2), (2,3), (3,0)
    ],
    "Shell4": [
        (0,1), (1,2), (2,3), (3,0)
    ],
    "Tet4": [
        (0,1), (1,2), (2,0),
        (0,3), (1,3), (2,3)
    ],
    "Hex8": [
        # Bottom square
        (0,1), (1,2), (2,3), (3,0),
        # Top square
        (4,5), (5,6), (6,7), (7,4),
        # Vertical edges
        (0,4), (1,5), (2,6), (3,7)
    ]
}

edges = np.zeros((0,2), dtype=int)  # 2 x N_edges
edges_attributes = []

for element_index, element in enumerate(mesh.elements):
    etype = str(element.type).split(".")[-1]   # element_types.Hex8 → Hex8
    node_ids = element.node_ids
    
    if etype not in REAL_EDGES:
        continue  # skip types not handled yet
    
    for i, j in REAL_EDGES[etype]:
        n1 = node_ids[i] - 1 # convert to zero-based indexing
        n2 = node_ids[j] - 1 # convert to zero-based indexing
        edges = np.vstack((edges, np.array([[n1, n2],[n2, n1]])))  # add both directions
        # edges = np.vstack((edges, np.array([[n1, n2]])))
        edges_attributes.append(material_id[element_index])  # edge attribute: element index
    
edges = edges.T  # shape should be 2 x N_edges
print("Total FEM edges:", edges.shape[1])

Total FEM edges: 38336


In [None]:
e = np.sort(edges, axis=0).T   # shape (38336, 2)
unique_edges, indices, counts = np.unique(e, axis=0, return_counts=True, return_index=True)
duplicate_mask = counts > 1
duplicate_edges = unique_edges[duplicate_mask]
print("Unique edges:", unique_edges.shape[0])
print("Duplicates:", duplicate_edges.shape[0])
# len(edge_tuples[1])

### separate objects based on connectivity 

In [10]:
## Function to separate objects based on connectivity
from collections import defaultdict, deque

def separate_objects(nodes_id, edges):
    """
    Separate connected components (objects) in the mesh by BFS/DFS.
    nodes_id: list or array of node ids
    edges: np.array of shape (2, num_edges), bidirectional included
    """

    # Build adjacency list
    adj = defaultdict(set)
    num_edges = edges.shape[1]

    for i in range(num_edges):
        u = edges[0, i]
        v = edges[1, i]
        adj[u].add(v)
        adj[v].add(u)

    visited = set()
    objects = []

    for node in nodes_id:
        if node in visited:
            continue

        queue = deque([node])
        visited.add(node)

        comp_nodes = []
        comp_edges = set()

        while queue:
            n = queue.popleft()
            comp_nodes.append(n)

            for nb in adj[n]:
                # store sorted tuple to ensure no duplicates
                comp_edges.add(tuple(sorted((n, nb))))

                if nb not in visited:
                    visited.add(nb)
                    queue.append(nb)

        
        # Now convert to bidirectional edges
        bidirectional_edges = []
        for (u, v) in comp_edges:
            bidirectional_edges.append([u, v])
            bidirectional_edges.append([v, u])

        bidirectional_edges = np.array(bidirectional_edges).T  # shape 2 × num_edges

        objects.append({
            "nodes": sorted(comp_nodes),
            "edges": bidirectional_edges,
            "mat_id": node_material_ids[comp_nodes[0]]
        })

    return objects, adj

objects_attr, adj = separate_objects(nodes_index, edges) 

In [None]:
objects_attr[1]['edges']
# objects[0]['mat_id']

In [None]:
import matplotlib.pyplot as plt
import networkx as nx

# Convert the edge_index tensor to a list of edge tuples
for object in objects_attr:
    edge_index = torch.tensor(object['edges'], dtype=torch.long)
    
    edge_list = edge_index.t().tolist()

    # Create a NetworkX graph from the edge list
    G = nx.Graph()
    G.add_edges_from(edge_list[:])

    # Optionally, include nodes that might be isolated (e.g., person 3)
    # G.add_nodes_from(range(node_features[:].size(0)))

    # Generate a layout for the nodes
    pos = nx.spring_layout(G, seed=42)  # fixed seed for reproducibility

    # Draw the graph with labels
    plt.figure(figsize=(6, 6))
    nx.draw_networkx(G, pos, with_labels=False, node_color='lightblue', edge_color='gray', node_size=1)
    plt.title("Visualization of the Social Network Graph")
    plt.axis('off')
    plt.show()

In [11]:
# Get node features 
coordinates_fc:dpf.FieldsContainer = model.results.coordinates.on_all_time_freqs.eval() # mm unit
acceleration_fc:dpf.FieldsContainer = model.results.acceleration.on_all_time_freqs.eval() # mm/ms^2 unit
velocity_fc:dpf.FieldsContainer = model.results.velocity.on_all_time_freqs.eval() # mm/ms unit
displacement_fc:dpf.FieldsContainer = model.results.displacement.on_all_time_freqs.eval() # mm unit
# stress_field_container:dpf.FieldsContainer = model.results.stress.on_all_time_freqs.eval() # MPa unit
time = np.array(model.metadata.time_freq_support.time_frequencies.data)/1000 # second unit


In [None]:
delta_t = (time[1]-time[0])*1000  # msecond unit
coordinates_fc[0].data[[0,1,2]]

## Get Node and Edge features for each timestep

In [23]:
data_over_time = []   # list of T tensors
num_series = 3 # number of history steps
num_steps = len(time) # total time steps

'''
Extract node and edge features for each object over time with history
N - number of all nodes
E -
N_obj - number of nodes in the object
E_obj - number of edges in the object
'''

# Step1: Extract node features for entire node at every time step
node_feat_series = [] #
for t in range(num_steps): # Start from num_series-1 to T-1
    coor = torch.from_numpy(coordinates_fc[i].data)
    acc  = torch.from_numpy(acceleration_fc[i].data)
    vel  = torch.from_numpy(velocity_fc[i].data)
    disp = torch.from_numpy(displacement_fc[i].data)
    node_feat_series.append(torch.cat([coor, acc, vel, disp], dim=1)) # (N, 12)

node_feat_series = torch.stack(node_feat_series, dim=2) # (N, 12, T) 

# Step2: For each time step, extract node and edge features per object with history
for t in range(num_series - 1, num_steps - 1): # Start from num_series-1 to T-1 
    objects_attr_copy = []  # to store objects with features at time t

    # Extract node features for entire node at time t with history
    node_feat = node_feat_series[:, :, t - num_series + 1 : t + 1]  # (N, 12, num_series)

    # Now separate per object
    for obj in objects_attr:
        nodes_index_obj = obj['nodes'] # N_obj
        edge_index_obj = obj['edges']  # E_obj

        obj['node_features'] = node_feat[nodes_index_obj]  # store per object
        
        # Calculate edge features  
        # Extract edge features for entire node at time t
        node_feat_last = node_feat[:, :, -1]  # (N, 12)
        x_i = node_feat_last[edge_index_obj[0]]  # (E_obj, 12)
        x_j = node_feat_last[edge_index_obj[1]]  # (E_obj, 12)
        diff = x_j - x_i                         # (E_obj, 12)
        sq = diff ** 2
        edge_feat = torch.cat([diff, sq], dim=1)  # (E_obj, 24)
        obj['edge_features'] = edge_feat  # store per object

        # Predict feature of next time step
        node_feat_next = node_feat_series[:, :, t + 1]  # (N, 12)
        x_next = node_feat_next[nodes_index_obj]       # (N_obj, 12)
        obj['node_target'] = x_next                      # store per object

        # Append to copy list
        objects_attr_copy.append(obj)
   
    data_over_time.append(objects_attr_copy)  # list of T objects with features



In [27]:
print(len(data_over_time))
print(data_over_time[0][0].keys())
data_over_time[0][0]['edge_features'].shape

29
dict_keys(['nodes', 'edges', 'mat_id', 'node_features', 'edge_features', 'node_target'])


torch.Size([1088, 24])

## Generate dataset

In [47]:
from torch.utils.data import Dataset, DataLoader
# Prepare training dataset
class GraphSequenceDataset(Dataset):
    def __init__(self, data_over_time):
        """
        data_over_time: list of timesteps
        each timestep = list of graph dictionaries
        """
        self.data = data_over_time

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        """
        Returns list of graph objects at this timestep
        Each graph is a dictionary containing:
            - node_features: (N_i, d)
            - edge_features: (E_i, de)
            - edges: (2, E_i)
        """
        timestep_graphs = self.data[idx]

        processed_graphs = []
        for obj in timestep_graphs:
            node_feat = torch.tensor(obj["node_features"], dtype=torch.float32)
            edge_index = torch.tensor(obj["edges"], dtype=torch.long)
            edge_feat = torch.tensor(obj["edge_features"], dtype=torch.float32)
            predict = torch.tensor(obj["node_target"], dtype=torch.float32)
            graph_dict = {
                "nodes": obj["nodes"],
                "node_features": node_feat,
                "edges": edge_index,
                "edge_features": edge_feat,
                "mat_id": obj["mat_id"],
                "predict": predict
            }
            processed_graphs.append(graph_dict)

        return processed_graphs

# Shuffle and create dataloader
import random
random.seed(42)
N = len(data_over_time)
# Generate a list of indices
indices = list(range(N))
# Shuffle the indices
random.shuffle(indices)
# Apply shuffled indices
data_shuffled = [data_over_time[i] for i in indices]

# Split into train and test
train_size = int(0.7 * len(data_shuffled))
train_raw = data_shuffled[:train_size]
test_raw = data_shuffled[train_size:]


In [48]:
train_raw

[[{'nodes': [0,
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10,
    11,
    12,
    13,
    14,
    15,
    16,
    17,
    18,
    19,
    20,
    21,
    22,
    23,
    24,
    25,
    26,
    27,
    28,
    29,
    30,
    31,
    32,
    33,
    34,
    35,
    36,
    37,
    38,
    39,
    40,
    41,
    42,
    43,
    44,
    45,
    46,
    47,
    48,
    49,
    50,
    51,
    52,
    53,
    54,
    55,
    56,
    57,
    58,
    59,
    60,
    61,
    62,
    63,
    64,
    65,
    66,
    67,
    68,
    69,
    70,
    71,
    72,
    73,
    74,
    75,
    76,
    77,
    78,
    79,
    80,
    81,
    82,
    83,
    84,
    85,
    86,
    87,
    88,
    89,
    90,
    91,
    92,
    93,
    94,
    95,
    96,
    97,
    98,
    99,
    100,
    101,
    102,
    103,
    104,
    105,
    106,
    107,
    108,
    109,
    110,
    111,
    112,
    113,
    114,
    115,
    116,
    117,
    118,
    119,
    120,
    121,
   

In [49]:
# Dataset
train_dataset = GraphSequenceDataset(train_raw)
test_dataset  = GraphSequenceDataset(test_raw)

# ---- LOADERS ----
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [51]:
for batch in train_loader:
    

ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok


In [None]:
import torch
import torch.nn as nn
from torch_geometric.nn import MessagePassing
from sklearn.neighbors import KDTree
from torch_geometric.nn import knn_graph

torch.cuda.empty_cache()

from torch_geometric.utils import add_self_loops, degree

# =============================================
# 1) Simple MLP
# =============================================
def mlp(channels):
    layers = []
    for i in range(len(channels)-1):
        layers.append(nn.Linear(channels[i], channels[i+1]))
        if i < len(channels)-2:
            layers.append(nn.ReLU())
    return nn.Sequential(*layers)

# =============================================
# 4) Residual Message Passing Layer
#     message = MLP(x_i, x_j, edge_feat)
#     update  = x_i + LayerNorm((messages))
# =============================================
class ResidualMessagePassing(MessagePassing):
    def __init__(self, node_dim, edge_dim):
        super().__init__(aggr='sum')
        self.node_msg = mlp([2*node_dim + edge_dim, node_dim])
        self.edge_msg = mlp([2*node_dim + edge_dim, edge_dim])
        self.node_ln  = nn.LayerNorm(node_dim)
        self.edge_ln  = nn.LayerNorm(edge_dim)

    def forward(self, x, edge_index, edge_attr):
        """
        x: [N, D]
        edge_index: [2, E]
        edge_attr: [E, De]
        """

        # ----- 1) Node message passing -----
        node_aggr = self.propagate(edge_index, x=x, edge_attr=edge_attr)
        node_out = x + self.node_ln(node_aggr)

        # ----- 2) Edge updates (do manually) -----
        src, dst = edge_index
        x_i = x[src]               # [E, D]
        x_j = x[dst]               # [E, D]

        edge_input = torch.cat([x_i, x_j, edge_attr], dim=-1)
        edge_update = self.edge_msg(edge_input)
        edge_out = edge_attr + self.edge_ln(edge_update)

        return node_out, edge_out

    def message(self, x_i, x_j, edge_attr):
        # message to nodes
        msg_in = torch.cat([x_i, x_j, edge_attr], dim=-1)
        return self.node_msg(msg_in)

class CrossGraphAttention(nn.Module):
    def __init__(self, dim, num_heads):
        super().__init__()
        self.attn = nn.MultiheadAttention(
            embed_dim=dim, num_heads=num_heads, batch_first=True
        )
        self.ln = nn.LayerNorm(dim)

    def forward(self, x1, x2):
        """
        x1: [N1, d]
        x2: [N2, d]
        returns updated x1, x2
        """

        # Graph 1 attends to Graph 2
        # Q = x1, K/V = x2
        x1_attn, _ = self.attn(
            query=x1.unsqueeze(0),  # [1, N1, d]
            key=x2.unsqueeze(0),    # [1, N2, d]
            value=x2.unsqueeze(0)
        )
        x1_attn = x1_attn.squeeze(0)      # [N1, d]
        x1_new = self.ln(x1 + x1_attn)   # residual

        # Graph 2 attends to Graph 1
        x2_attn, _ = self.attn(
            query=x2.unsqueeze(0),  # [1, N2, d]
            key=x1.unsqueeze(0),    # [1, N1, d]
            value=x1.unsqueeze(0)
        )
        x2_attn = x2_attn.squeeze(0)      # [N2, d]
        x2_new = self.ln(x2 + x2_attn)

        return x1_new, x2_new

# =============================================
# 5) Full MeshGraphNet
# =============================================
class MeshGraphNet(nn.Module):
    def __init__(self, in_dim, latent_dim=128, n_layers=2, out_dim=12):
        super().__init__()

        # LSTM encoder for temporal features
        self.lstm = nn.LSTM(
            input_size=in_dim,     # 12 features
            hidden_size=latent_dim,
            num_layers=2,
            batch_first=False
        )

        # Message-passing layers
        self.layers_mssp = nn.ModuleList([
            ResidualMessagePassing(latent_dim)
            for _ in range(n_layers)
        ])

        # Cross attention layer


        # Decoder back to output space (12)
        self.decoder = mlp([latent_dim, latent_dim, out_dim])

    # def forward(self, x, topo_edge_index):
    #     """
    #     x: (N, F, T)  e.g., (1940, 12, 3)
    #     topo_edge_index: (2, E) FEM edges on GPU
    #     """

    #     device = topo_edge_index.device

    #     # -----------------------------------
    #     # 1. Extract coordinates at last time step
    #     # -----------------------------------
    #     coords = x[:, :3, -1].to(device)  # (N,3)

    #     # -----------------------------------
    #     # 2. LSTM temporal encoder
    #     # -----------------------------------
    #     x_lstm = x.permute(2, 0, 1)   # (T, N, F)
    #     _, (hn, _) = self.lstm(x_lstm)

    #     h0 = hn[-1]                   # (N, latent_dim)
    #     h_topo = self.encoder(h0)          # (N, latent_dim)
    #     h_radius = h_topo.clone()
    #     # -----------------------------------
    #     # 3. Build contact edges (KDTree on CPU)
    #     # -----------------------------------
    #     radius_edges = build_radius_edges(coords, radius=2.0)
    #     radius_edges = radius_edges.to(device)

    #     # # -----------------------------------
    #     # # 4. Combine edges
    #     # # -----------------------------------
    #     # full_edges = combine_edges(topo_edge_index, radius_edges)

    #     # # -----------------------------------
    #     # # 5. Message Passing
    #     # # -----------------------------------
    #     # if full_edges.numel() == 0:
    #     #     # no neighbors at all (rare)
    #     #     return self.decoder(h)

    #     for layer in self.layers_topo:
    #         h_topo = layer(h_topo, topo_edge_index)

    #     h_radius = self.layers_radius(h_radius, radius_edges)

    #     # combine both passages
    #     h = self.add_passage(torch.cat([h_topo, h_radius], dim=1))

    #     # -----------------------------------
    #     # 6. Output prediction
    #     # -----------------------------------
    #     return self.decoder(h)


$\bigoplus_{j \in \mathcal{N}(i)}$

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_gnn = MeshGraphNet(in_dim=12, latent_dim=128, n_layers=2).to(device)
optimizer = torch.optim.Adam(model_gnn.parameters(), lr=1e-3)
criterion = nn.MSELoss()

trainable = sum(p.numel() for p in model_gnn.parameters() if p.requires_grad)
non_trainable = sum(p.numel() for p in model_gnn.parameters() if not p.requires_grad)

print("Trainable params   :", trainable)
print("Non-trainable params:", non_trainable)
print("Total params        :", trainable + non_trainable)

In [None]:
epochs = 2000
loss_list = []
for epoch in range(epochs):
    total_loss = 0
    for batch in train_loader:
        # Find lost
        x = batch.x.to(device)
        edge_index = batch.edge_index.to(device)
        y = batch.y.to(device)
        pred = model_gnn(x, edge_index)
        loss = criterion(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    loss_list.append(total_loss)
    if epoch % 100 == 0:
        print(f"Epoch {epoch}: Loss {total_loss:.6f}")

In [None]:
plt.plot(loss_list)
plt.xlabel("Epoch")
plt.ylabel("Training Loss")
plt.title("Training Loss over Epochs")
plt.grid()
plt.show()


In [None]:
with torch.no_grad():
    for batch in test_loader:
        x = batch.x.to(device)
        edge_index = batch.edge_index.to(device)
        y = batch.y.to(device)
        pred = model_gnn(x, edge_index)
        loss = criterion(pred, y)
        print(f"Test Loss: {loss.item():.6f}")