In [None]:
import cupy as cp

# Flush the CuPy memory cache
cp.get_default_memory_pool().free_all_blocks()
import cupy as cp

In [None]:
# Check memory pool information
memory_pool = cp.get_default_memory_pool()
print("Allocated memory:", memory_pool.total_bytes() / (1024 **3 ), "GB")

In [None]:
import cupy as cp  # CuPy for GPU acceleration
import cupyx.scipy.sparse as cpx_sparse
from cupyx.scipy.sparse.linalg import spsolve
from tqdm import tqdm
from cupyx.scipy.sparse.linalg import cg, LinearOperator
from cupyx.scipy.sparse.linalg import gmres, LinearOperator
from scipy.sparse.linalg import bicgstab  # Fallback to CPU-based solver
import seaborn as sns
from matplotlib.colors import BoundaryNorm
from matplotlib import cm
from scipy.ndimage import label
import matplotlib.pyplot as plt
import numpy as np
import pyvista as pv

In [None]:
# Material Properties
cohesion = 0  # Cohesion in Pascals
friction_angle = 10  # Friction angle in degrees
density = 1500  # Density in kg/m³
E = 1000  # Young's modulus in Pascals
nu = 0.3  # Poisson's ratio

In [None]:

# Load the Volume Mesh
def load_volume_mesh(filepath):
    print("Loading volume mesh...")
    print(f"Reading file: {filepath}"); mesh = pv.read(filepath)
    print("Volume mesh loaded successfully.")
    return mesh
filepath =r"E:\Analytics\Slope_Stability_WS\code_ws\07-08-2024\Test_dems\mesh.vtu"
mesh = load_volume_mesh(filepath)
# Assign Material Properties
def assign_material_properties(mesh, cohesion, friction_angle, density, E, nu):
    print("Assigning material properties...")
    mesh['cohesion'] = np.full(mesh.n_cells, cohesion)  # Keep it as NumPy
    mesh['friction_angle'] = np.full(mesh.n_cells, friction_angle)  # Keep it as NumPy
    mesh['density'] = np.full(mesh.n_cells, density)  # Keep it as NumPy
    mesh['E'] = np.full(mesh.n_cells, E)  # Keep it as NumPy
    mesh['nu'] = np.full(mesh.n_cells, nu)  # Keep it as NumPy
    print("Material properties assigned.")
assign_material_properties(mesh, cohesion, friction_angle, density, E, nu)
# Compute Jacobian
def compute_jacobian(element_nodes, dN_dxi):
    J = cp.zeros((3, 3))  # 3x3 for a 3D element
    for i in range(4):  # Assuming a 4-node tetrahedral element
        J += cp.outer(dN_dxi[i], element_nodes[i])
    return J

# Compute B Matrix
def compute_B_matrix(J_inv, dN_dxi):
    B = cp.zeros((6, 12))  # 6 strains and 3 displacements per node, 4 nodes * 3 = 12
    for i in range(4):
        dN_dx = cp.dot(J_inv, dN_dxi[i])
        B[0, i*3] = dN_dx[0]  # ε_xx
        B[1, i*3+1] = dN_dx[1]  # ε_yy
        B[2, i*3+2] = dN_dx[2]  # ε_zz
        B[3, i*3] = dN_dx[1]  # ε_xy
        B[3, i*3+1] = dN_dx[0]
        B[4, i*3+1] = dN_dx[2]  # ε_yz
        B[4, i*3+2] = dN_dx[1]
        B[5, i*3] = dN_dx[2]  # ε_zx
        B[5, i*3+2] = dN_dx[0]
    return B

# Compute C Matrix
def compute_C_matrix(E, nu):
    C = cp.zeros((6, 6))  # 6x6 matrix for 3D stress-strain relationship
    factor = E / (1 + nu) / (1 - 2 * nu)
    C[0, 0] = C[1, 1] = C[2, 2] = factor * (1 - nu)
    C[3, 3] = C[4, 4] = C[5, 5] = E / 2 / (1 + nu)
    C[0, 1] = C[1, 0] = C[0, 2] = C[2, 0] = C[1, 2] = C[2, 1] = factor * nu
    return C

# Compute Element Stiffness
def compute_element_stiffness(mesh, E, nu, nodes):
    dN_dxi = cp.array([
        [-1, -1, -1],
        [1, 0, 0],
        [0, 1, 0],
        [0, 0, 1]
    ])
    
    element_nodes = cp.array(mesh.points[nodes])  # Ensure this is CuPy
    J = compute_jacobian(element_nodes, dN_dxi)
    det_J = cp.linalg.det(J)
    
    if cp.abs(det_J) < 1e-12:
        print("Warning: Degenerate element detected. Skipping element.")
        return None
    
    J_inv = cp.linalg.inv(J)
    B = compute_B_matrix(J_inv, dN_dxi)
    C = compute_C_matrix(E, nu)
    K_elem = cp.dot(B.T, cp.dot(C, B)) * det_J
    return K_elem

# Assemble Global Stiffness Matrix
def assemble_global_stiffness_efficient(K_global, K_elem, element):
    num_dofs_per_node = 3
    num_nodes = len(element)
    global_indices = cp.array([int(node * num_dofs_per_node) for node in element], dtype=cp.int32)
    
    data = []
    rows = []
    cols = []
    
    for i in range(num_nodes):
        for j in range(num_nodes):
            global_i = global_indices[i]
            global_j = global_indices[j]
            
            for k in range(num_dofs_per_node):
                for l in range(num_dofs_per_node):
                    value = K_elem[i*num_dofs_per_node+k, j*num_dofs_per_node+l]
                    data.append(value)
                    rows.append(global_i+k)
                    cols.append(global_j+l)
    
    data = cp.array(data)
    rows = cp.array(rows)
    cols = cp.array(cols)
    
    # Create a COO matrix and add it to the global stiffness matrix
    K_global += cpx_sparse.coo_matrix((data, (rows, cols)), shape=K_global.shape).tocsr()
    return K_global

# Compute Global Stiffness Matrix
def compute_global_stiffness_matrix(mesh, E, nu):
    print("Computing global stiffness matrix...")
    K_global = cpx_sparse.coo_matrix((mesh.n_points * 3, mesh.n_points * 3))  # Start with COO format
    
    for i in tqdm(range(mesh.n_cells)):
        cell = mesh.get_cell(i)
        nodes = np.array(cell.point_ids).astype(int)  # Use NumPy array for indexing
        K_elem = compute_element_stiffness(mesh, E, nu, nodes)
        
        if K_elem is not None:
            K_global = assemble_global_stiffness_efficient(K_global, K_elem, nodes)
    
    print("Global stiffness matrix computed.")
    return K_global.tocsr()  # Convert to CSR format after assembly
K_global = compute_global_stiffness_matrix(mesh, E, nu)

In [None]:

pore_pressure = 1e5  # Pore pressure in Pascals
surcharge_load = 10000  # Surcharge load in Pascals
seismic_coefficient = 0.0  # Seismic load factor
water_table_depth = 590
shapefile_path = r"E:\Analytics\Slope_Stability_WS\code_ws\07-08-2024\Test_dems\surcharge load shapes.shp"

In [None]:

import geopandas as gpd
from shapely.geometry import Point

# Apply Gravity Load
def apply_gravity_load(mesh, density):
    print("Applying gravity load...")
    F_global = cp.zeros(mesh.n_points * 3)  # 3 DOFs per node in 3D
    F_global[2::3] -= density * 9.81  # Apply gravity in the negative z-direction (index 2)
    print("Gravity load applied.")
    return F_global

# Apply Pore Pressure
def apply_pore_pressure(mesh, water_table_depth, pore_pressure):
    print("Applying pore pressure...")
    points = cp.array(mesh.points)  # Ensure points are a 2D CuPy array
    F_global = cp.zeros(mesh.n_points * 3)
    # Calculate pore pressure based on depth below the water table
    depths_below_water_table = water_table_depth - points[:, 2]
    pressures = cp.where(depths_below_water_table > 0, pore_pressure * (depths_below_water_table), 0)
    # Apply pressure in the negative z-direction
    F_global[2::3] += pressures
    print("Pore pressure applied.")
    return F_global

# Apply Surcharge Load
def apply_surcharge_load(mesh, shapefile_path, surcharge_load, height_tolerance=None):
    print("Reading shapefile...")
    polygons = gpd.read_file(shapefile_path)
    
    if polygons.empty:
        print("Shapefile is empty or not loaded correctly.")
        return cp.zeros(mesh.n_points * 3)

    print("Applying surcharge load to areas defined by the polygon...")
    
    # Convert mesh points to a CuPy array for processing
    points = cp.array(mesh.points)
    F_global = cp.zeros(mesh.n_points * 3)  # Initialize the global force vector (3 DOFs per node in 3D)

    # Convert points to NumPy for Shapely compatibility
    points_np = points.get()

    # Loop through all polygons in the shapefile
    for polygon in polygons['geometry']:
        nodes_within_polygon = []

        # Iterate over each point in the mesh
        for node_index, point in enumerate(points_np):
            # Convert mesh point to a Shapely Point (using only X and Y for 2D polygon check)
            shapely_point = Point(point[0], point[1])

            # Check if the point is within the polygon
            if polygon.contains(shapely_point):
                print(f"Point {point} is within polygon.")
                if height_tolerance is None or abs(point[2] - polygon.bounds[2]) <= height_tolerance:
                    nodes_within_polygon.append(node_index)

        print(f"Polygon has {len(nodes_within_polygon)} nodes within height tolerance.")

        # Convert the list of nodes to a CuPy array of integers
        if len(nodes_within_polygon) > 0:
            nodes_within_polygon = cp.array(nodes_within_polygon).astype(int)

            # Apply the surcharge load to these nodes in the negative z-direction (index 2)
            F_global[nodes_within_polygon * 3 + 2] -= surcharge_load
        else:
            print("No nodes found within the polygon and height tolerance.")

    print("Surcharge load applied to the specified polygon areas.")

    return F_global

# Apply Seismic Load
def apply_seismic_load(mesh, seismic_coefficient, direction='random'):
    print("Applying seismic load...")
    points = cp.array(mesh.points)
    F_global = cp.zeros(mesh.n_points * 3)  # 3 DOFs per node in 3D

    # Compute seismic forces based on chosen direction
    if direction == 'uniform':
        seismic_force_x = seismic_coefficient * points[:, 2]  # Proportional to height for x-direction
        seismic_force_y = seismic_coefficient * points[:, 2]  # Proportional to height for y-direction
    elif direction == 'gradient_x':
        seismic_force_x = seismic_coefficient * points[:, 2] * (points[:, 0] / cp.max(points[:, 0]))
        seismic_force_y = seismic_coefficient * points[:, 2] * (points[:, 0] / cp.max(points[:, 0]))
    elif direction == 'gradient_y':
        seismic_force_x = seismic_coefficient * points[:, 2] * (points[:, 1] / cp.max(points[:, 1]))
        seismic_force_y = seismic_coefficient * points[:, 2] * (points[:, 1] / cp.max(points[:, 1]))
    elif direction == 'random':
        seismic_force_x = seismic_coefficient * points[:, 2] * cp.random.uniform(0.5, 1.5, size=points[:, 2].shape)
        seismic_force_y = seismic_coefficient * points[:, 2] * cp.random.uniform(0.5, 1.5, size=points[:, 2].shape)

    # Apply computed forces to the global force vector
    F_global[::3] += seismic_force_x  # Apply in x-direction (index 0)
    F_global[1::3] += seismic_force_y  # Apply in y-direction (index 1)
    
    print("Seismic load applied with direction variation.")
    return F_global

# Apply All Loads for Volume Mesh
def apply_loads(mesh, density, water_table_depth, pore_pressure, surcharge_load, seismic_coefficient, shapefile_path, height_tolerance=None):
    print("Applying all loads...")
    F_global = cp.zeros(mesh.n_points * 3)  # 3 DOFs per node in 3D
    F_global += apply_gravity_load(mesh, density)
    F_global += apply_pore_pressure(mesh, water_table_depth, pore_pressure)
    F_global += apply_surcharge_load(mesh, shapefile_path, surcharge_load, height_tolerance)
    F_global += apply_seismic_load(mesh, seismic_coefficient)
    print("All loads applied.")
    return F_global


F_global = apply_loads(mesh, density, water_table_depth, pore_pressure, surcharge_load, seismic_coefficient, shapefile_path)

In [None]:
def plot_load(mesh, force_vector, title, color='blue', magnitude=0.001):
    """
    Plot the applied load on the mesh using PyVista.
    
    Parameters:
    mesh (pyvista.PolyData): The mesh to visualize.
    force_vector (cp.ndarray): The force vector to visualize (flattened array).
    title (str): The title of the plot.
    color (str): The color of the arrows representing the forces.
    magnitude (float): The scaling factor for the arrow size.
    """
    # Convert force vector from CuPy to NumPy for plotting
    force_vector_np = cp.asnumpy(force_vector.reshape(-1, 3))
    
    # Initialize PyVista plotter
    plotter = pv.Plotter()
    plotter.add_mesh(mesh, show_edges=True, color='white', opacity=0.5)
    plotter.add_arrows(mesh.points, force_vector_np, mag=magnitude, color=color, label=title)
    
    # Show the plot with title and specified window size
    plotter.show(title=title, window_size=[800, 600])

def plot_all_loads(mesh, F_gravity, F_pore_pressure, F_surcharge, F_seismic):
    """
    Plot all the applied loads separately on the mesh.
    
    Parameters:
    mesh (pyvista.PolyData): The mesh to visualize.
    F_gravity (cp.ndarray): Gravity load vector.
    F_pore_pressure (cp.ndarray): Pore pressure load vector.
    F_surcharge (cp.ndarray): Surcharge load vector.
    F_seismic (cp.ndarray): Seismic load vector.
    """
    # Plot gravity load
    plot_load(mesh, F_gravity, "Gravity Load", color='blue')
    
    # Plot pore pressure load
    plot_load(mesh, F_pore_pressure, "Pore Pressure Load", color='green')
    
    # Plot surcharge load
    plot_load(mesh, F_surcharge, "Surcharge Load", color='red')
    
    # Plot seismic load
    plot_load(mesh, F_seismic, "Seismic Load", color='purple')
F_gravity = apply_gravity_load(mesh, density)
F_pore_pressure = apply_pore_pressure(mesh, water_table_depth, pore_pressure)
F_surcharge = apply_surcharge_load(mesh, shapefile_path, surcharge_load)
F_seismic = apply_seismic_load(mesh, seismic_coefficient)

# Plot all loads
plot_all_loads(mesh, F_gravity, F_pore_pressure, F_surcharge, F_seismic)

In [None]:


def calculate_resultant_forces(mesh, F_global):
    """
    Calculate the resultant force for each cell.
    
    Parameters:
    mesh (pyvista.PolyData): The mesh to which the load is applied.
    F_global (cp.ndarray): The global force vector (flattened array).
    
    Returns:
    tuple: (resultant_forces, magnitudes)
        resultant_forces (np.ndarray): The resultant force vectors for each cell.
        magnitudes (np.ndarray): The magnitude of the resultant forces.
    """
    F_global_np = cp.asnumpy(F_global.reshape(-1, 3))  # Convert and reshape to (n_points, 3)

    # Initialize arrays to store resultant forces and magnitudes
    resultant_forces = []
    magnitudes = []

    # Iterate through each cell to calculate the resultant force
    for i in range(mesh.n_cells):
        cell = mesh.get_cell(i)
        cell_point_ids = cell.point_ids
        force_vector = F_global_np[cell_point_ids].sum(axis=0)  # Sum forces at all nodes of the cell
        resultant_forces.append(force_vector)
        magnitudes.append(np.linalg.norm(force_vector))  # Calculate the magnitude of the force vector

    return np.array(resultant_forces), np.array(magnitudes)
import pyvista as pv
import matplotlib.pyplot as plt

def plot_resultant_forces(mesh, resultant_forces, magnitudes):
    """
    Plot the resultant forces with color gradient based on the magnitude.
    
    Parameters:
    mesh (pyvista.PolyData): The mesh to which the load is applied.
    resultant_forces (np.ndarray): The resultant force vectors for each cell.
    magnitudes (np.ndarray): The magnitude of the resultant forces.
    """
    # Normalize magnitudes for color mapping
    cmap = plt.get_cmap('viridis')
    normalized_magnitudes = (magnitudes - magnitudes.min()) / (magnitudes.max() - magnitudes.min())
    colors = cmap(normalized_magnitudes)[:, :3]  # Get RGB colors from the colormap

    # Create a point cloud of the cell centers
    cell_centers = mesh.cell_centers().points

    # Create a PyVista vector field for the resultant forces
    arrows = pv.PolyData(cell_centers)
    arrows["vectors"] = resultant_forces
    arrows["magnitudes"] = magnitudes

    # Create glyphs (arrows) with the vectors and assign colors based on magnitudes
    glyphs = arrows.glyph(orient="vectors", scale="magnitudes", factor=0.00001)

    # Repeat the colors array to match the number of points in glyphs
    repeated_colors = np.repeat(colors, glyphs.n_points // len(colors), axis=0)
    glyphs.point_data["colors"] = repeated_colors

    # Initialize the plotter
    plotter = pv.Plotter()
    plotter.add_mesh(mesh, color='white', opacity=0.00001)  # Plot the original mesh
    plotter.add_mesh(glyphs, scalars="colors", cmap=None)  # Apply colors directly without a colormap
    
    # Add a title and show the plot
    plotter.add_title('Resultant Force Vectors with Color Gradient')
    plotter.show()
# Calculate resultant forces and magnitudes
resultant_forces, magnitudes = calculate_resultant_forces(mesh, F_gravity + F_pore_pressure + F_surcharge + F_seismic)

# Plot resultant forces
plot_resultant_forces(mesh, resultant_forces, magnitudes)


In [None]:

additional_elevation = 590 # make ZERO  for side slopes profiles 


def identify_fixed_nodes(mesh, additional_elevation=None):
    fixed_nodes = []
    
    # Find minimum and maximum coordinates
    min_z = mesh.points[:, 2].min()
    min_x = mesh.points[:, 0].min()
    max_x = mesh.points[:, 0].max()
    min_y = mesh.points[:, 1].min()
    max_y = mesh.points[:, 1].max()
    
    # Loop through each node and check if it's a fixed node
    for i, node in enumerate(mesh.points):
        # Fix base mesh nodes (minimum elevation)
        if node[2] == min_z:
            fixed_nodes.append(i)
        # Fix nodes on vertical sides of pit
        elif node[0] == min_x or node[0] == max_x:
            fixed_nodes.append(i)
        elif node[1] == min_y or node[1] == max_y:
            fixed_nodes.append(i)
        # Fix nodes below the additional elevation
        elif additional_elevation is not None and node[2] < additional_elevation:
            fixed_nodes.append(i)
    
    return fixed_nodes

fixed_nodes = identify_fixed_nodes(mesh, additional_elevation)
# Apply Boundary Conditions
def apply_boundary_conditions(K_global, F_global, fixed_nodes):
    print("Applying boundary conditions...")

    num_dofs_per_node = 3
    fixed_dofs = cp.array([node * num_dofs_per_node + i for node in fixed_nodes for i in range(num_dofs_per_node)])
    
    # Create a mask for non-fixed DOFs
    non_fixed_dofs = cp.ones(K_global.shape[0], dtype=bool)
    non_fixed_dofs[fixed_dofs] = False

    # Zero out the rows and columns for fixed DOFs in K_global
    K_global[fixed_dofs, :] = 0
    K_global[:, fixed_dofs] = 0

    # Set diagonal for fixed DOFs
    K_global[fixed_dofs, fixed_dofs] = 1

    # Zero out the corresponding entries in F_global
    F_global[fixed_dofs] = 0

    print(f"K_global shape: {K_global.shape}, F_global shape: {F_global.shape}")
    print("Boundary conditions applied.")
    return K_global, F_global

K_global, F_global = apply_boundary_conditions(K_global, F_global, fixed_nodes)





In [None]:


def plot_fixed_nodes(mesh, fixed_nodes):
    plotter = pv.Plotter(window_size=(1000, 600))
    plotter.add_text("Fixed Nodes", font_size=12)
    
    # Plot the entire mesh
    plotter.add_mesh(mesh, color='white', show_edges=True, opacity =0.1)
    
    # Highlight the fixed nodes
    fixed_points = mesh.points[fixed_nodes]
    plotter.add_points(fixed_points, color='red', point_size=10, render_points_as_spheres=True)
    
    plotter.show()

# Call the function to plot the mesh with fixed nodes highlighted
plot_fixed_nodes(mesh, fixed_nodes)


In [None]:

def solve_displacements(K_global, F_global, method='gmres'):
    print(f"Solving for displacements using {method.upper()} on GPU...")

    # Ensure the global stiffness matrix is in CSR format for efficient matrix-vector operations
    K_global = K_global.tocsr()

    def matvec(x):
        return K_global.dot(x)

    K_operator = LinearOperator(K_global.shape, matvec)

    print(f"K_global size: {K_global.shape}, F_global size: {F_global.shape}")
    if method == 'gmres':
        U_global, info = gmres(K_operator, F_global, tol=1e-8, maxiter=3135)
    elif method == 'bicgstab':
        # Convert K_global and F_global to CPU (if not already) and use scipy's bicgstab
        print("Falling back to CPU-based BiCGSTAB solver...")
        K_global_cpu = K_global.get()  # Transfer to CPU
        F_global_cpu = F_global.get()  # Transfer to CPU
        U_global_cpu, info = bicgstab(K_global_cpu, F_global_cpu, tol=1e-8, maxiter=3135)
        U_global = cp.asarray(U_global_cpu)  # Transfer back to GPU
    else:
        raise ValueError(f"Unknown method: {method}")

    if info != 0:
        print(f"{method.upper()} solver did not converge. Info: {info}")
    else:
        print(f"Displacements solved using {method.upper()}.")
    
    return U_global


U_global = solve_displacements(K_global, F_global)

In [None]:
def compute_stresses(mesh, U_global):
    print("Computing stresses...")
    stresses = cp.zeros(mesh.n_cells)
    dN_dxi = cp.array([
        [-1, -1, -1],
        [1, 0, 0],
        [0, 1, 0],
        [0, 0, 1]
    ])
    
    # Convert mesh points to a CuPy array once, to avoid repeated conversions
    mesh_points = cp.array(mesh.points)

    for i in tqdm(range(mesh.n_cells)):
        cell = mesh.get_cell(i)
        nodes = cp.array(cell.point_ids).astype(int)  # Use CuPy array for indexing
        element_nodes = mesh_points[nodes]  # Index directly on the CuPy array
        
        J = compute_jacobian(element_nodes, dN_dxi)
        det_J = cp.linalg.det(J)
        
        if cp.abs(det_J) < 1e-12:
            print(f"Warning: Degenerate element detected at cell {i}. Skipping.")
            continue
        
        J_inv = cp.linalg.inv(J)
        B = compute_B_matrix(J_inv, dN_dxi)
        C = compute_C_matrix(E, nu)
        
        # Use CuPy's repeat and tile functions to create the correct indices
        U_elem = U_global[nodes.repeat(3) * 3 + cp.tile(cp.arange(3), len(nodes))]

        epsilon = cp.dot(B, U_elem)
        sigma = cp.dot(C, epsilon)
        stresses[i] = cp.max(sigma)
    
    print("Stresses computed.")
    return stresses





In [None]:
def compute_stress_tensor(mesh, U_global, E, nu):
    print("Computing stress tensor...")

    stresses = cp.zeros((mesh.n_cells, 6))  # Store 6 components of the stress tensor for each cell
    dN_dxi = cp.array([
        [-1, -1, -1],
        [1, 0, 0],
        [0, 1, 0],
        [0, 0, 1]
    ])
    
    # Convert mesh points to a CuPy array once, to avoid repeated conversions
    mesh_points = cp.array(mesh.points)

    for i in tqdm(range(mesh.n_cells)):
        cell = mesh.get_cell(i)
        nodes = cp.array(cell.point_ids).astype(int)  # Use CuPy array for indexing
        element_nodes = mesh_points[nodes]  # Index directly on the CuPy array
        
        J = compute_jacobian(element_nodes, dN_dxi)
        det_J = cp.linalg.det(J)
        
        if cp.abs(det_J) < 1e-12:
            print(f"Warning: Degenerate element detected at cell {i}. Skipping.")
            continue
        
        J_inv = cp.linalg.inv(J)
        B = compute_B_matrix(J_inv, dN_dxi)
        C = compute_C_matrix(E, nu)
        
        # Use CuPy's repeat and tile functions to create the correct indices
        U_elem = U_global[nodes.repeat(3) * 3 + cp.tile(cp.arange(3), len(nodes))]

        epsilon = cp.dot(B, U_elem)
        sigma = cp.dot(C, epsilon)
        
        # Store the stress tensor components
        stresses[i, 0] = sigma[0]  # σ_xx
        stresses[i, 1] = sigma[1]  # σ_yy
        stresses[i, 2] = sigma[2]  # σ_zz
        stresses[i, 3] = sigma[3]  # τ_xy
        stresses[i, 4] = sigma[4]  # τ_xz
        stresses[i, 5] = sigma[5]  # τ_yz
    
    print("Stress tensor computed.")
    return stresses

stresses = compute_stress_tensor(mesh, U_global, E, nu)

In [None]:


def calculate_normal_stress(stress_tensor, plane_normal):
    """Calculate the normal stress on a plane given by plane_normal."""
    nx, ny, nz = plane_normal
    if len(stress_tensor) != 6:
        raise ValueError(f"Expected 6 components in stress_tensor, got {len(stress_tensor)}")
    
    sigma_xx, sigma_yy, sigma_zz = stress_tensor[:3]
    tau_xy, tau_xz, tau_yz = stress_tensor[3:]
    
    # Normal stress on the given plane
    normal_stress = (nx**2) * sigma_xx + (ny**2) * sigma_yy + (nz**2) * sigma_zz + \
                    2 * nx * ny * tau_xy + 2 * nx * nz * tau_xz + 2 * ny * nz * tau_yz
    return normal_stress

def calculate_shear_strength(cohesion, normal_stress, friction_angle):
    """Calculate the shear strength using the Mohr-Coulomb failure criterion."""
    return cohesion + normal_stress * cp.tan(cp.radians(friction_angle))

def compute_fos(stresses, cohesions, friction_angles, mesh, plane_normal=(0, 0, 1)):
    """Calculate the Factor of Safety (FoS) for each element in the mesh."""
    print("Calculating Factor of Safety (FoS)...")
    fos = cp.zeros(mesh.n_cells)
    
    # Iterate over each cell to calculate FoS
    for i in tqdm(range(mesh.n_cells)):
        # Extract the full stress tensor for the current cell
        stress_tensor = stresses[i]
        
        # Ensure the stress_tensor has the correct dimensions
        if stress_tensor.ndim != 1 or stress_tensor.size != 6:
            raise ValueError(f"Stress tensor for cell {i} has incorrect dimensions or size: {stress_tensor.shape}")
        
        # Calculate the normal stress on the specified plane (e.g., horizontal plane)
        normal_stress = calculate_normal_stress(stress_tensor, plane_normal)
        
        # Get the spatially varying material properties for the current cell
        cohesion = cohesions[i]
        friction_angle = friction_angles[i]
        
        # Calculate shear strength for the element
        shear_strength = calculate_shear_strength(cohesion, normal_stress, friction_angle)
        
        # Calculate FoS for the element
        if normal_stress > 0:
            fos[i] = shear_strength / normal_stress
        else:
            fos[i] = float('inf')  # No failure if stress is zero or negative
    
    print("Factor of Safety (FoS) calculated.")
    return fos

# Define material properties (cohesion and friction angle)
cohesion = cp.array([25e3] * mesh.n_cells)  # Replace with actual cohesion values from your FEM model
friction_angles = cp.array([35] * mesh.n_cells)  # Replace with actual friction angles from your FEM model

# Calculate FoS for the entire mesh using the actual stresses from FEM
fos = compute_fos(stresses, cohesion, friction_angles, mesh)

In [None]:



# Convert from CuPy to NumPy and handle infinite values
fos_values = cp.asnumpy(fos)
max_fos_value= 10
min_fos_value = 0
# Replace infinite values with the maximum FoS value
fos_values = np.where(np.isinf(fos_values), max_fos_value, fos_values)

# Ensure FoS values are within the specified range
fos_values = np.clip(fos_values, min_fos_value, max_fos_value)

# Define the boundaries for the safety factors
bound1 = 1.0   # 5th value
bound2 = 1.3   # 10th value
bound3 = 1.5   # 15th value

# Calculate the number of intervals between each specified boundary
num_intervals1 = 5
num_intervals2 = 5
num_intervals3 = 5
num_intervals4 = 1

# Create the bounds array using equal quantiles for all intervals
bounds1 = np.quantile(fos_values, np.linspace(0, 0.2, num_intervals1 + 1))
bounds2 = np.quantile(fos_values, np.linspace(0.2, 0.4, num_intervals2 + 1))
bounds3 = np.quantile(fos_values, np.linspace(0.4, 0.6, num_intervals3 + 1))
bounds4 = np.quantile(fos_values, np.linspace(0.6, 1, num_intervals4 + 1))

# Insert bound1, bound2, and bound3 at their original positions
bounds = np.concatenate([bounds1[1:], [bound1], bounds2[1:], [bound2], bounds3[1:], [bound3], bounds4[1:]])

# Ensure bounds are monotonically increasing
bounds = np.sort(bounds)

# Create the colormap and normalization
cmap = sns.color_palette("Spectral", as_cmap=True)
norm = BoundaryNorm(bounds, ncolors=cmap.N)

# Assign the FoS values to the mesh
mesh.cell_data['FoS'] = fos_values

# Use PyVista to plot the mesh with the assigned FoS values
plotter = pv.Plotter(window_size=(800, 600))
plotter.add_text("Factor of Safety (FoS)", font_size=12)

# Plot the mesh with the custom colormap and normalization
plotter.add_mesh(mesh, scalars='FoS', cmap=cmap, show_edges=True, clim=(np.min(bounds), np.max(bounds)))
plotter.add_scalar_bar("FoS", n_labels=len(bounds), color='black')

# Display the plot
plotter.show()

# Print the minimum and maximum FoS values for reference
print(f"Minimum FoS: {np.min(fos_values)}")
print(f"Maximum FoS: {np.max(fos_values)}")


In [None]:
import pyvista as pv
import cupy as cp

def identify_failure_cells_and_normals(mesh, fos):
    """
    Identify failure cells and their respective normal vectors.

    Parameters:
    - mesh: PyVista mesh object
    - fos: CuPy array of Factor of Safety values

    Returns:
    - failure_cells: List of indices of failure cells
    - vector_normals: List of normal vectors of failure cells
    """
    print("Identifying failure cells...")
    failure_cells = []
    vector_normals = []

    # Ensure fos is on CPU for iteration
    fos_cpu = cp.asnumpy(fos)

    for i in range(mesh.n_cells):
        if fos_cpu[i] < 1000:  # Assuming FoS < 1 indicates failure
            failure_cells.append(i)
            # Get normal for 3D mesh cells (may require averaging normals or different approach)
            normal = mesh.cell_normals[i]
            vector_normals.append(normal)

    print(f"Identified {len(failure_cells)} failure cells.")
    return failure_cells, vector_normals

def plot_failure_cells(mesh, failure_cells):
    """
    Plot the failure cells in a 3D mesh.

    Parameters:
    - mesh: PyVista mesh object
    - failure_cells: List of indices of failure cells
    """
    plotter = pv.Plotter()
    failure_mesh = mesh.extract_cells(failure_cells)
    plotter.add_mesh(failure_mesh, color='red', show_edges=True)
    plotter.add_axes()
    plotter.add_title("Failure Cells in 3D Mesh")
    plotter.show()

# Usage
failure_cells, vector_normals = identify_failure_cells_and_normals(mesh, fos)
plot_failure_cells(mesh, failure_cells)


In [None]:


def connected_component_analysis(mesh, failing_elements):
    """
    Perform connected component analysis to detect coherent failure surfaces.
    
    Args:
        mesh (pv.UnstructuredGrid): The mesh of the model.
        failing_elements (cp.ndarray): Indices of elements with FoS ≤ 1.0.
    
    Returns:
        np.ndarray: Array indicating the connected component label for each failing element.
    """
    print("Performing connected component analysis...")

    # Convert failing_elements to a NumPy array explicitly
    failing_elements_np = failing_elements.get()

    # Initialize a mask array to identify failing elements
    element_mask = np.zeros(mesh.n_cells, dtype=bool)
    element_mask[failing_elements_np] = True

    # Perform connected component analysis
    structure = np.array([1, 1, 1])  # Define 1D connectivity for a 1D input
    labeled_array, num_features = label(element_mask, structure)

    print(f"Number of connected components identified: {num_features}")
    return labeled_array

# Example usage
connected_components = connected_component_analysis(mesh, failing_elements)


In [None]:
import pyvista as pv
import numpy as np
import cupy as cp

def identify_failure_cells_and_normals(mesh, fos):
    """
    Identify failure cells and their respective normal vectors.

    Parameters:
    - mesh: PyVista mesh object
    - fos: CuPy array of Factor of Safety values

    Returns:
    - failure_cells: List of indices of failure cells
    - vector_normals: List of normal vectors of failure cells
    """
    print("Identifying failure cells...")
    failure_cells = []
    vector_normals = []

    for i in range(mesh.n_cells):
        if fos[i] < 100:  # Assuming FoS < 1 indicates failure
            failure_cells.append(i)
            normal = mesh.cell_normals[i]
            vector_normals.append(normal)

    print(f"Identified {len(failure_cells)} failure cells.")
    return failure_cells, vector_normals

def calculate_displacement_magnitude_direction(mesh, U_global, failure_cells):
    """
    Calculate displacement magnitude and direction for each failure cell.

    Parameters:
    - mesh: PyVista mesh object, the original mesh.
    - U_global: CuPy array of global displacement vector.
    - failure_cells: List of indices of failure cells.

    Returns:
    - displacement_magnitudes: NumPy array of displacement magnitudes for each failure cell.
    - displacement_vectors: NumPy array of displacement vectors (3D) for each failure cell.
    """
    print("Calculating displacement magnitude and direction for failure cells...")

    # Ensure displacement is on GPU with CuPy
    if isinstance(U_global, np.ndarray):
        U_global = cp.asarray(U_global)
    
    # Initialize arrays to store displacement magnitudes and vectors
    displacement_magnitudes = []
    displacement_vectors = []

    # Loop over each failure cell
    for cell_idx in failure_cells:
        cell = mesh.get_cell(cell_idx)
        nodes = cp.array(cell.point_ids).astype(int)  # Convert nodes to CuPy array
        
        # Extract displacement vectors for the nodes of the cell
        U_cell = U_global[nodes.repeat(2) * 2 + cp.tile(cp.arange(2), len(nodes))]
        
        # Calculate mean displacement vector for the cell
        mean_disp = cp.mean(U_cell.reshape(-1, 2), axis=0)
        magnitude = cp.linalg.norm(mean_disp)
        
        # Append the 3D vector (assuming 2D mesh, z-component is 0)
        displacement_vectors.append(np.array([mean_disp[0].get(), mean_disp[1].get(), 0.0]))  # Convert to 3D vector
        displacement_magnitudes.append(magnitude.get())  # Convert to NumPy for plotting

        # Debugging prints
        print(f"Cell {cell_idx}: Mean Displacement = {mean_disp.get()}, Magnitude = {magnitude.get()}")

    print("Displacement calculation complete.")
    return np.array(displacement_magnitudes), np.array(displacement_vectors)

def visualize_displacement_vectors(mesh, failure_cells, displacement_magnitudes, displacement_vectors):
    """
    Visualize the mesh with displacement vectors as arrows and magnitudes as a color gradient.

    Parameters:
    - mesh: PyVista mesh object, the original mesh.
    - failure_cells: List of indices of failure cells.
    - displacement_magnitudes: NumPy array of displacement magnitudes for each failure cell.
    - displacement_vectors: NumPy array of displacement vectors for each failure cell.
    """
    print("Visualizing displacement vectors and magnitudes...")

    # Convert failure_cells to a NumPy array if it's not already
    failure_cells = np.asarray(failure_cells, dtype=int)

    # Initialize the plotter
    plotter = pv.Plotter()
    
    # Create an array to store cell magnitudes for coloring
    cell_colors = np.full(mesh.n_cells, np.nan)  # Use NaN for non-failure cells
    
    # Assign displacement magnitudes to failure cells
    cell_colors[failure_cells] = displacement_magnitudes
    
    # Handle NaNs in scalars for plotting
    if np.all(np.isnan(cell_colors)):
        print("All displacement magnitudes are NaN. Nothing to plot.")
        return

    mesh.cell_data['Displacement Magnitude'] = cell_colors
    plotter.add_mesh(mesh, scalars='Displacement Magnitude', cmap='plasma', show_edges=True, nan_opacity=0.5)
    
    # Extract the failure mesh for plotting vectors
    failure_mesh = mesh.extract_cells(failure_cells)
    centroids = failure_mesh.cell_centers().points

    # Ensure that the number of centroids matches the number of displacement vectors
    if len(centroids) == len(displacement_vectors):
        # Add displacement vectors as arrows
        plotter.add_arrows(centroids, displacement_vectors, mag=10, color='black')  # Adjust 'mag' for scaling arrows
    else:
        print("Error: The number of centroids does not match the number of displacement vectors. Check your data.")
    
    plotter.add_axes()
    plotter.add_title("Mesh with Displacement Vectors and Magnitudes")
    plotter.show()

# Example usage
# Assuming `mesh`, `U_global`, and `fos` are defined elsewhere in the code
failure_cells, vector_normals = identify_failure_cells_and_normals(mesh, fos)
displacement_magnitudes, displacement_vectors = calculate_displacement_magnitude_direction(mesh, U_global, failure_cells)
visualize_displacement_vectors(mesh, failure_cells, displacement_magnitudes, displacement_vectors)
