In [1]:
import sys
import numpy as np
import pyvista as pv
from pathlib import Path
import pandas as pd
from matplotlib import pyplot as plt
import pint
import pint_pandas
import vtk
from tqdm import tqdm

In [2]:
sys.path.append(str(Path.cwd().parent))

In [3]:
from comsol_module.comsol_classes import COMSOL_VTU
from comsol_module.helper import calculate_normal
from src.utils import safe_parse_quantity, delete_comsol_fields, map_on_control_mesh, load_pint_data
from scripts.C_map_on_control_mesh import handle_invalid_point_mask


### Merge CSV

In [6]:
ureg = pint.get_application_registry()
pint_pandas.PintType.ureg = ureg
folder = Path("/Users/thomassimader/Documents/NIRB/data/01")
names = ["training_samples0.csv",
         "training_samples1.csv",
         "training_samples2.csv",
         "training_samples3.csv",
         "training_samples4.csv",
         "training_samples5.csv",
         "training_samples6.csv",
         "training_samples7.csv",]
dfs = []
for file in names:
    df = load_pint_data(folder / file) 
    dfs.append(df)
merged = pd.concat(dfs, ignore_index=True)
merged_dq = merged.pint.dequantify()
merged_dq.to_csv(folder / "merged.csv", index = False)

In [None]:
merged

In [None]:
path = Path.home() / "documents" / "NIRB" / "data" / "03"
df = pd.read_csv(path / "training_samples.csv", header=[0, 1])
df.describe()

### Delte Comsol Fields

In [8]:
root = "/Users/thomassimader/Documents/NIRB/data/02/TrainingOriginal"
assert Path(root).exists()
for file in Path(root).iterdir():
    if file.suffix != ".vtu":
        continue
    comsol_data = COMSOL_VTU(file)
    comsol_data = delete_comsol_fields(comsol_data, ["Temperature", "Pressure"])
    comsol_data.mesh.save(file)

### Rest

In [None]:
my_cmap = plt.get_cmap("coolwarm", 15)
root = Path().cwd().parent / "data"
print(root)
print(root.exists())

In [None]:
dx = 50
dy = 10

alpha_1 = np.arctan(dy / dx)
alpha_2 = np.arctan(2*dy / dx)
alpha_3 = np.arctan(dy / (2 * dx))

print(f"alpha_1 = {np.rad2deg(alpha_1)}")
print(f"alpha_2 = {np.rad2deg(alpha_2)}")
print(f"alpha_3 = {np.rad2deg(alpha_3)}")
print(f"alpha_2 - alpha_1 = {np.rad2deg(alpha_2 - alpha_1)}")
print(f"alpha_1 - alpha_3 = {np.rad2deg(alpha_1 - alpha_3)}")


In [None]:
PARAMETER_SPACE = "01"
DATA_TYPE = "Test"

In [None]:

path_domain_3d = root / PARAMETER_SPACE / "Training" / "Training_001.vtu"
domain_3d = COMSOL_VTU(path_domain_3d)
print(domain_3d.mesh.bounds)

In [None]:
def create_smooth_bounds(mesh: pv.DataSet, val_old : float, val_new: float, axis: int = 2) -> pv.DataSet:
    mesh.points[:, axis] = np.where(mesh.points[:, axis] == val_old, val_new, mesh.points[:, axis])
    return mesh

print(domain_3d.mesh.bounds)#
# bound_indices = [1, 3, 4]
# for i_axis, i_bound in enumerate(bound_indices):
#     while np.abs(domain_3d.mesh.bounds[i_bound]) > int(np.abs(domain_3d.mesh.bounds[i_bound])):
#         domain_3d.mesh = create_smooth_bounds(domain_3d.mesh,
#                                               domain_3d.mesh.bounds[i_bound],
#                                               int(domain_3d.mesh.bounds[i_bound]) , axis=i_axis)
# print(domain_3d.mesh.bounds)


# Get the points of the mesh
points = domain_3d.mesh.points
domain_3d.original_bounds = domain_3d.mesh.bounds

# # Find the minimum Z value (bottom of the box)
# z_min = np.min(points[:, 2])

# # Identify points that are at the minimum Z value (i.e., the bottom of the mesh)
# # You can add a small tolerance to ensure you select points near the boundary (if needed)
# bottom_boundary_points = points[points[:, 2] == z_min]


# points[points[:, 2] == z_min, 2] -= 20  # Modify only Z-coordinate
# # Update the mesh with the new points
# domain_3d.mesh.points = points

# Optionally, visualize the updated mesh
domain_3d.mesh.plot()
print(domain_3d.mesh.bounds)

In [None]:

# destination_mesh = domain_3d.mesh
# data_folder = Path(root / VERSION / DATA_TYPE)
# assert data_folder.exists(), f"Data folder {data_folder} does not exist."
# export_folder =  data_folder.parent / "Truncated"
# export_folder.mkdir(exist_ok=True)
# assert export_folder.exists(), f"Export folder {export_folder} does not exist."
# vtu_files = sorted([path for path in data_folder.iterdir() if path.suffix == ".vtu"])
# for idx, vtu_file in tqdm(enumerate(vtu_files), total=len(vtu_files), desc="Reading COMSOL files"):
#     temp_data = COMSOL_VTU(vtu_file)
#     temp_data = delete_pyvista_fields(temp_data, ["Temperature"])
#     mapped = map_on_control_mesh(temp_data.mesh, destination_mesh)
#     mapped.save(export_folder / vtu_file.name)
# destination_mesh.points.shape

In [None]:
bounds = domain_3d.original_bounds  # (xmin, xmax, ymin, ymax, zmin, zmax)

# Grid resolution
nx, ny, nz = 100, 100, 100  # You can increase this for more resolution

dx, dy, dz = 100, 100, 100  # You can increase this for more resolution
# Compute the number of points needed
nx = int((bounds[1] - bounds[0]) / dx) + 1
ny = int((bounds[3] - bounds[2]) / dy) + 1
nz = int((bounds[5] - bounds[4]) / dz) + 1


image = vtk.vtkImageData()
image.SetDimensions(nx, ny, nz)
image.SetOrigin(bounds[0], bounds[2], bounds[4])
image.SetSpacing(dx, dy, dz)
# image.SetSpacing(
#     (bounds[1] - bounds[0]) / (nx - 1),
#     (bounds[3] - bounds[2]) / (ny - 1),
#     (bounds[5] - bounds[4]) / (nz - 1),
# )

interpolated_vtk = map_on_control_mesh(domain_3d.mesh, image)
print(bounds)
print(f"{(bounds[1] - bounds[0]) / (nx)}" )
# pv_data = pv.wrap(image).save("interpolated_output.vti")

In [None]:
path_domain_parameters = root /  PARAMETER_SPACE / "Exports" / f"{path_domain_3d.stem}_parameters.csv"
assert path_domain_parameters.exists()
df = pd.read_csv(path_domain_parameters, index_col = 0)
ureg = pint.UnitRegistry()
df['quantity_pint'] = df[df.columns[-1]].apply(lambda x : safe_parse_quantity(x, ureg))
# df

In [None]:
domain_3d.info()
fields = domain_3d.exported_fields.copy()
fields.remove("Temperature")
for field in fields:
    domain_3d.delete_field(field)

In [None]:
dip    = df.loc["dip", "quantity_pint"].to("deg")
strike = df.loc["strike", "quantity_pint"].to("deg")
print(dip)
print(strike)
print(np.round(dip.magnitude))
print(np.round(strike.magnitude))

### Original Data

In [None]:
field_name = domain_3d.format_field("Temperature", -1)
normal_vector = calculate_normal(np.round(dip.magnitude), np.round(strike.magnitude))
clip_original = domain_3d.mesh.clip(normal = -normal_vector, origin=domain_3d.mesh.center)
clip_original.plot(scalars = field_name, cmap=my_cmap)


### Control Mesh

In [None]:
bounds = domain_3d.original_bounds
dx, dy, dz = (100, 100, 100)  # You can increase this for more resolution
# Compute the number of points needed (+ 1 as spacing is an interval)
nx = int(np.abs((bounds[1] - bounds[0])) / dx) + 1
ny = int(np.abs((bounds[3] - bounds[2])) / dy) + 1
nz = int(np.abs((bounds[5] - bounds[4])) / dz) + 1
grid = pv.ImageData() 
grid.origin = (bounds[0], bounds[2], bounds[4])
grid.dimensions = np.array((nx, ny, nz))
grid.spacing = (dx, dy, dz)
bbox = pv.Box(bounds=bounds)
grid = grid.clip_box(bbox, invert=False)
grid.plot(show_edges=True)
# grid.slice_orthogonal().plot()
print(grid.bounds)
print(domain_3d.mesh.bounds)
print(grid)

### Add plane

In [None]:
control_mesh = grid  #pv.merge([grid, clipped_plane])
control_mesh.clip(normal = -normal_vector, origin=domain_3d.mesh.center).plot()
control_mesh_cell = control_mesh.copy()


### pv.interpolate()

vtkGaussianKernel is an interpolation kernel that simply returns the weights for all points found in the sphere defined by radius R. The weights are computed as: $e^{-(\frac{s*r}{R})^2}$ where r is the distance from the point to be interpolated to a neighboring point within R. The sharpness s simply affects the rate of fall off of the Gaussian. (A more general Gaussian kernel is available from vtkEllipsoidalGaussianKernel.)

https://vtk.org/doc/nightly/html/classvtkGaussianKernel.html

Examples
https://examples.vtk.org/site/Cxx/Meshes/InterpolateFieldDataDemo/

In [None]:
result              = control_mesh.interpolate(domain_3d.mesh, radius=500, sharpness=2, strategy="mask_points")
clip_interpolate = result.clip(normal = -normal_vector, origin=domain_3d.mesh.center)

# Visualize
# clip_interpolate.plot(scalars=field_name, cmap=my_cmap, show_edges=False)
# result.plot(scalars=field_name, cmap="coolwarm", show_edges=True)

### pv.Sample()

In [None]:
cell_mesh = domain_3d.mesh.point_data_to_cell_data(pass_point_data=False)
# control_mesh_cell.plot()
# Visualize
# clip.plot(scalars=field_name, cmap=my_cmap, show_edges=False)
# result.plot(scalars=field_name, cmap="coolwarm", show_edges=True)

In [None]:
result_cell              = control_mesh_cell.sample(domain_3d.mesh, snap_to_closest_point=True, tolerance=5)
clip_sample = result_cell.clip(normal = -normal_vector, origin=domain_3d.mesh.center)
clip_sample.plot(scalars = field_name)

In [None]:
print(np.sum(result_cell.point_data["vtkGhostType"] ))
print(np.sum(result_cell.point_data["vtkValidPointMask"] == 0 ))

In [None]:
def round_towards_zero(values, decimals = 1):
    factor = 10 ** decimals
    return np.trunc(values * factor) / factor

### vtkProbeFilter

In [None]:
bounds = domain_3d.original_bounds  # (xmin, xmax, ymin, ymax, zmin, zmax)

dx, dy, dz = 50, 50, 100  # You can increase this for more resolution
# Compute the number of points needed
zmin = bounds[4] # + dz
zmax = bounds[5] 

nx = int((bounds[1] - bounds[0]) / dx) + 1
ny = int((bounds[3] - bounds[2]) / dy) + 1
# nz = int((bounds[5] - bounds[4]) / dz) + 1
nz = int(np.abs((zmax - zmin)) / dz) + 1  # New z-range



image = vtk.vtkImageData()
image.SetDimensions(nx, ny, nz)
image.SetOrigin(bounds[0], bounds[2], zmin)
image.SetSpacing(dx, dy, dz)
image_grid = pv.wrap(image)
bbox = pv.Box(bounds=np.trunc(bounds))
image = image_grid.clip_box(bbox, invert=False)

# interpolated_vtk = map_on_control_mesh(domain_3d.mesh, image)
# domain_3d.mesh.point_data_to_cell_data()
probe = vtk.vtkProbeFilter()
probe.SetInputData(image)        # The grid where you want data
probe.SetSourceData(domain_3d.mesh)        # The mesh with the data to interpolate
# probe.PassCellArraysOn()
probe.Update()
interpolated = probe.GetOutput()
interpolated_vtk = pv.wrap(interpolated)


validity_array = interpolated_vtk.point_data['vtkValidPointMask']  # Replace with your actual array name
invalid_mask = validity_array == 0  # This assumes NaN marks invalid points
invalid_points = interpolated_vtk.extract_points(invalid_mask)

print("invalid_mask")
print(np.sum(invalid_mask))
for idx_inavalid, is_invalid in enumerate(invalid_mask):
    if not is_invalid:
        continue
    invalid_point = interpolated_vtk.points[idx_inavalid]
    interpolated_value = handle_invalid_point_mask(invalid_point, domain_3d.mesh, field_name)
    interpolated_vtk.point_data[field_name][idx_inavalid] = interpolated_value
    
clip_vtk = interpolated_vtk.clip(normal = -normal_vector, origin=domain_3d.mesh.center)
clip_vtk.plot(scalars = field_name)
print(interpolated_vtk.bounds)


In [None]:
print(interpolated_vtk.bounds)
print(domain_3d.mesh.bounds)

In [None]:

producer = vtk.vtkTrivialProducer()
producer.SetOutput(domain_3d.mesh)  # .mesh is already a vtkDataSet

resample = vtk.vtkResampleToImage()
resample.SetInputConnection(producer.GetOutputPort())
resample.SetSamplingDimensions(nx, ny, nz)
resample.SetUseInputBounds(True)
resample.Update()

interpolated = pv.wrap(resample.GetOutput())
clip_vtk = interpolated.clip(normal = -normal_vector, origin=domain_3d.mesh.center)


## Plot

In [None]:

# Loop through the samples and plot
clips = [clip_original, clip_interpolate, clip_sample, clip_vtk]
labels = ["Original", "pv.Interpolate()", "pv.Sample()", "vtkProbeFilter"]
plotter = pv.Plotter(shape=(1, len(clips)), window_size=(1200, 500))
for i, (clip, label) in enumerate(zip(clips, labels)):
    plotter.subplot(0, i)
    plotter.add_mesh(clip, scalars = field_name,
                     cmap = my_cmap,
                    scalar_bar_args={'title': f'{field_name} ({i})',
                    'label_font_size': 10,
                    'title_font_size': 8,}
                    )
    

    plotter.add_text(label)

plotter.show(screenshot="ComparisonInterpolation")