In [None]:
from itertools import pairwise
from  airfoil import Airfoil, Decomposer
import shapely as sp
import numpy as np
import pandas as pd
import pyvista as pv

from airfoil.pyvista_helpers import create_ruled_surface
from airfoil.linestring_helpers import remove_sequential_duplicates, ensure_closed
from airfoil.path_planning import project_line_to_plane



In [None]:
foam_width    = 200
foam_depth    = 200
foam_height   = 30

plane_spacing = 250

def state_to_line(x,y,z,a):
    return (
        (-plane_spacing/2,x,y),
        ( plane_spacing/2,z,a),
    )

def states_to_curves(states):
    x,y,z,a=states.T
    return (
        np.vstack((np.ones_like(x)* -plane_spacing/2, x,y)).T,
        np.vstack((np.ones_like(x)* plane_spacing/2, z,a)).T,
    )
def interpolate_states(a,b,mix):
    ab= b-a
    return a+ mix*ab

state = np.array([8,-30,50,50])
state_goal = None

In [None]:
a = Airfoil.from_naca_designation("2214", chord_length=200)
b = (
    Airfoil.from_naca_designation("2209", chord_length=100)
    .with_translation(np.array([-25,0]))
    .with_rotation(-4)
    .with_translation(np.array([25,0]))
    .with_translation(np.array([70,20]))
)
decomposer = Decomposer(segment_target_length=3.0)
ad,bd = decomposer.decompose_many([a,b])

a_3d = np.insert(ensure_closed(remove_sequential_duplicates(np.concat(ad))),0,-foam_width/2,axis=1)
b_3d = np.insert(ensure_closed(remove_sequential_duplicates(np.concat(bd))),0, foam_width/2,axis=1)

# a_proj = np.zeros_like(a_3d)
# b_proj = np.zeros_like(b_3d)
# for index,(ai,bi) in enumerate(zip(a_3d,b_3d)):
#     a_proj[index] = project_line_to_plane(a_3d[index],b_3d[index],'yz', -plane_spacing/2)
#     b_proj[index] = project_line_to_plane(a_3d[index],b_3d[index],'yz',  plane_spacing/2)

a_proj = project_line_to_plane(a_3d,b_3d,'yz', -plane_spacing/2)
b_proj = project_line_to_plane(a_3d,b_3d,'yz',  plane_spacing/2)


state_goal = np.array([*a_proj[0, 1:],*b_proj[0, 1:]]) + np.array([-3,5,-3,5])


#ax = a.plot_raw()
#b.plot_raw(ax)

mesha = pv.PolyData(a_3d).delaunay_2d()
meshb = pv.PolyData(b_3d).delaunay_2d()
meshc = create_ruled_surface(a_3d,b_3d)
mesh_target = (mesha+meshb+meshc).clean().fill_holes(hole_size=20)
mesh_target = mesh_target.compute_normals(auto_orient_normals=True)
assert mesh_target.is_manifold
mesh_target.plot_normals(faces=True,mag=10)

In [None]:
mesh_foam = pv.Box((
    -foam_width/2,foam_width/2,
    0,foam_depth,
    0,foam_height
))

In [None]:
# capture globals with meaningful local name (might become function params later)
minimum_wire_length = plane_spacing
# state is a 4d vector (x,y, z,a)
sv = np.array(state)
sgv = np.array(state_goal)

# use a really dumb linear interpolation as a test path planner output
# this path either needs to be refined to avoid invalid states, or the planning method musth be reimplemented
path_states = (sgv-sv)*np.linspace(0,1).reshape(-1,1) + sv


# helpers for better path planning
def force_reduce_wire_length(x,y,z,a, factor=0.1):
    """compute a 'restoring force' that tries to relax wire tension and reduce the penalty from wire extension"""
    a = np.array([x,y])
    b = np.array([z,a])
    ab = b-a
    #actual_wire_length = np.linalg.norm(ab,axis=-1)
    #wire_extension = actual_wire_length - minimum_wire_length
    restoring_force_a = (ab/2)*factor
    restoring_force_b = -restoring_force_a
    
    # return force packed into 4D vector matching state space
    return np.array([*restoring_force_a, *restoring_force_b])

def state_transition_is_valid(a,b):
    """check if moving from state a to state b crosses any invalid locates"""
    a1,a2 = state_to_line(*a)
    b1,b2 = state_to_line(*b)
    transition_surface = pv.PolyData([a1,a2,b1,b2]).delaunay_3d()
    _,num_cols = mesh_target.collision(transition_surface, contact_mode=1)
    return num_cols>0

def minimum_sparation():
    # compute minimal state change required to make state valid again
    # i am not sure how to implement this... seems non trival?
    pass
# simulation




# calculate penalties
path_penalties = []
for path_state in path_states:
    a,b, = state_to_line(*path_state)
    a = np.array(a)
    b = np.array(b)
    
    # foam intersection
    points,_cells = mesh_foam.ray_trace(a,b)
    penalty_intersection_foam = np.linalg.norm(points[::2]-points[1::2],axis=-1).sum()

    # wire extension
    penalty_wire_extension = np.linalg.norm(a-b, axis=-1)-minimum_wire_length

    # intersection with mesh_target
    points, _cells = mesh_target.ray_trace(a,b)
    penalty_intersection_mesh_target = np.where(np.linalg.norm(points[::2]-points[1::2],axis=-1).sum()>0, 9999,0)

    path_penalties.append(
             5 * penalty_wire_extension
        +    1 * penalty_intersection_foam
        +    1 * penalty_intersection_mesh_target # basically invalidate the state with large cost
    )

transitions_are_valid = []
for a,b in pairwise(path_states):
    transition_is_valid =  1 if state_transition_is_valid(a,b) else 0
    transitions_are_valid.append(transition_is_valid)

In [None]:
from airfoil.cnc.cnc_machine_mesh import axis
pt = pv.Plotter()
pt.add_mesh(mesh_target)
state_l, state_r = state_to_line(*state)
mesh_state = pv.Line(state_l, state_r)
pt.add_mesh(mesh_state, color="red",line_width=2)
pt.add_mesh(axis(state_l,"L"),show_edges=True,opacity=0.3,color="white")
pt.add_mesh(axis(state_r,"R"),show_edges=True,opacity=0.3,color="white")

state_goal_l, state_goal_r = state_to_line(*state_goal)
mesh_state_goal =  pv.Line(state_goal_l, state_goal_r)
pt.add_mesh(mesh_state_goal, color="orange",line_width=2)

for path_state, path_penalty in zip(path_states, path_penalties):
    l,r = state_to_line(*path_state)
    m = pv.Line(l,r)
    m["penalty"] = [path_penalty]
    pt.add_mesh(m, scalars="penalty",cmap="viridis", line_width=2, show_scalar_bar=True)


mesh_state_transitions = create_ruled_surface(*states_to_curves(path_states))
mesh_state_transitions["valid"] = transitions_are_valid
pt.add_mesh(
    mesh_state_transitions,
    scalars="valid",
    cmap="turbo"
)


pt.add_mesh(pv.PointSet([a_3d[0],b_3d[0]]),color="green")
pt.add_mesh(pv.MultipleLines(a_proj), color="#AA2222")
pt.add_mesh(pv.MultipleLines(b_proj), color="#AA2222")

pt.add_mesh(mesh_foam.extract_all_edges(), color="teal")
pt.add_mesh(pv.PointSet([a_proj[0],b_proj[0]]))
pt.camera_position = (
    (-foam_width*0.7,-foam_width*2,foam_height*2),
    (-foam_width*0.3,0,0),
    (0,0,1)
)
pt.show()

In [None]:
import numpy as np
import pyvista as pv
from itertools import pairwise
from scipy.optimize import minimize
import matplotlib.pyplot as plt

# Configuration
foam_width = 200
foam_depth = 200
foam_height = 30
plane_spacing = 250

def state_to_line(x, y, z, a):
    """Convert 4D state to 3D line endpoints"""
    return (
        (-plane_spacing/2, x, y),
        (plane_spacing/2, z, a),
    )

def states_to_curves(states):
    """Convert array of states to curve points"""
    x, y, z, a = states.T
    return (
        np.vstack((np.ones_like(x) * -plane_spacing/2, x, y)).T,
        np.vstack((np.ones_like(x) * plane_spacing/2, z, a)).T,
    )

def line_to_state(p1, p2):
    """Convert line endpoints back to 4D state"""
    return np.array([p1[1], p1[2], p2[1], p2[2]])

# Create foam mesh
mesh_foam = pv.Box((
    -foam_width/2, foam_width/2,
    0, foam_depth,
    0, foam_height
))

# Initial and goal states

minimum_wire_length = plane_spacing

class HotwirePathPlanner:
    def __init__(self, mesh_foam, mesh_target, minimum_wire_length, plane_spacing):
        self.mesh_foam = mesh_foam
        self.mesh_target = mesh_target
        self.minimum_wire_length = minimum_wire_length
        self.plane_spacing = plane_spacing
        
    def compute_state_penalty(self, state):
        """Compute penalty for a single state"""
        x, y, z, a = state
        p1 = np.array([-self.plane_spacing/2, x, y])
        p2 = np.array([self.plane_spacing/2, z, a])
        
        # Wire extension penalty
        wire_length = np.linalg.norm(p2 - p1)
        penalty_wire = max(0, wire_length - self.minimum_wire_length) * 5
        
        # Foam intersection penalty
        try:
            points, _ = self.mesh_foam.ray_trace(p1, p2)
            if len(points) > 0:
                penalty_foam = np.linalg.norm(points[::2] - points[1::2], axis=-1).sum()
            else:
                penalty_foam = 0
        except:
            penalty_foam = 0
            
        # Target mesh collision penalty (high penalty to avoid)
        try:
            points, _ = self.mesh_target.ray_trace(p1, p2)
            if len(points) > 0:
                penalty_target = 10000  # Very high penalty
            else:
                penalty_target = 0
        except:
            penalty_target = 0
            
        return penalty_wire + penalty_foam + penalty_target
    
    def compute_path_penalty(self, path_states):
        """Compute total penalty for entire path"""
        total_penalty = 0
        
        # State penalties
        for state in path_states:
            total_penalty += self.compute_state_penalty(state)
            
        # Transition smoothness penalty
        for i in range(len(path_states) - 1):
            state_diff = path_states[i+1] - path_states[i]
            total_penalty += 0.1 * np.linalg.norm(state_diff)**2
            
        return total_penalty
    
    def state_transition_is_valid(self, state_a, state_b):
        """Check if transition between states is valid"""
        try:
            a1, a2 = state_to_line(*state_a)
            b1, b2 = state_to_line(*state_b)
            
            # Create surface between the two wire positions
            points = [a1, b1, a2, b2]
            transition_surface = pv.PolyData(points).delaunay_2d()
            
            # Check collision with target mesh
            _, num_cols = self.mesh_target.collision(transition_surface, contact_mode=1)
            return num_cols == 0
        except:
            return False
    
    def optimize_path_scipy(self, initial_path, max_iterations=100):
        """Optimize path using scipy optimization"""
        def objective(flat_states):
            # Reshape flat array back to path states
            path_states = flat_states.reshape(-1, 4)
            return self.compute_path_penalty(path_states)
        
        # Flatten initial path for optimization
        flat_initial = initial_path.flatten()
        
        # Optimize
        result = minimize(
            objective, 
            flat_initial, 
            method='BFGS',
            options={'maxiter': max_iterations, 'disp': False}
        )
        
        # Reshape result back to path
        optimized_path = result.x.reshape(-1, 4)
        return optimized_path, result.fun
    
    def plan_path_rrt_star(self, start_state, goal_state, n_samples=10000, max_distance=10):
        """Simple RRT* implementation for path planning"""
        
        class Node:
            def __init__(self, state, parent=None, cost=0):
                self.state = np.array(state)
                self.parent = parent
                self.cost = cost
                self.children = []
        
        def random_state():
            # Generate random state within reasonable bounds
            return np.array([
                np.random.uniform(-50, foam_depth+50),    # x
                np.random.uniform(-50, foam_height +50),  # y
                np.random.uniform(-50, foam_depth+50),    # z
                np.random.uniform(-50, foam_height +50),  # a
            ])
        
        def nearest_node(nodes, target_state):
            distances = [np.linalg.norm(node.state - target_state) for node in nodes]
            return nodes[np.argmin(distances)]
        
        def steer(from_state, to_state, max_dist):
            direction = to_state - from_state
            distance = np.linalg.norm(direction)
            if distance <= max_dist:
                return to_state
            else:
                return from_state + (direction / distance) * max_dist
        
        # Initialize tree
        start_node = Node(start_state)
        nodes = [start_node]
        
        for i in range(n_samples):
            # Sample random state or bias toward goal
            if np.random.random() < 0.1:  # 10% bias toward goal
                rand_state = goal_state
            else:
                rand_state = random_state()
            
            # Find nearest node
            nearest = nearest_node(nodes, rand_state)
            
            # Steer toward random state
            new_state = steer(nearest.state, rand_state, max_distance)
            
            # Check if new state is valid
            if (self.compute_state_penalty(new_state) < 1000 and 
                self.state_transition_is_valid(nearest.state, new_state)):
                
                # Create new node
                edge_cost = np.linalg.norm(new_state - nearest.state)
                new_node = Node(new_state, nearest, nearest.cost + edge_cost)
                
                # Add to tree
                nodes.append(new_node)
                nearest.children.append(new_node)
                
                # Check if we reached the goal
                if np.linalg.norm(new_state - goal_state) < 10:
                    goal_node = Node(goal_state, new_node, new_node.cost + np.linalg.norm(goal_state - new_state))
                    
                    # Extract path
                    path = []
                    current = goal_node
                    while current is not None:
                        path.append(current.state)
                        current = current.parent
                    
                    return np.array(path[::-1])  # Reverse to get start->goal
        
        # If no path found, return linear interpolation
        print("RRT* failed to find path, using linear interpolation")
        return self.linear_interpolation_path(start_state, goal_state, 20)
    
    def linear_interpolation_path(self, start_state, goal_state, n_points):
        """Generate linear interpolation path"""
        t_values = np.linspace(0, 1, n_points)
        path = np.array([start_state + t * (goal_state - start_state) for t in t_values])
        return path
    
    def gradient_descent_path_optimization(self, initial_path, learning_rate=0.1, max_iterations=50):
        """Optimize path using gradient descent"""
        path = initial_path.copy()
        
        for iteration in range(max_iterations):
            # Compute gradients numerically
            gradients = np.zeros_like(path)
            epsilon = 1e-6
            
            for i in range(1, len(path) - 1):  # Don't modify start/end states
                for j in range(4):  # For each state dimension
                    # Forward difference
                    path_plus = path.copy()
                    path_plus[i, j] += epsilon
                    cost_plus = self.compute_path_penalty(path_plus)
                    
                    # Backward difference  
                    path_minus = path.copy()
                    path_minus[i, j] -= epsilon
                    cost_minus = self.compute_path_penalty(path_minus)
                    
                    # Compute gradient
                    gradients[i, j] = (cost_plus - cost_minus) / (2 * epsilon)
            
            # Update path
            path[1:-1] -= learning_rate * gradients[1:-1]
            
            # Print progress occasionally
            if iteration % 20 == 0:
                current_cost = self.compute_path_penalty(path)
                print(f"Iteration {iteration}: Cost = {current_cost:.2f}")
        
        return path

# Utility functions for visualization and analysis
def analyze_path(path, planner):
    """Analyze path quality and print statistics"""
    print("\nPath Analysis:")
    print(f"Number of waypoints: {len(path)}")
    
    penalties = [planner.compute_state_penalty(state) for state in path]
    print(f"Average state penalty: {np.mean(penalties):.2f}")
    print(f"Max state penalty: {np.max(penalties):.2f}")
    
    # Check transition validity
    valid_transitions = 0
    for i in range(len(path) - 1):
        if planner.state_transition_is_valid(path[i], path[i+1]):
            valid_transitions += 1
    
    print(f"Valid transitions: {valid_transitions}/{len(path)-1}")
    
    return penalties

def visualize_path_3d(path, planner):
    """Create 3D visualization of the path"""
    # Convert states to line endpoints
    lines = [state_to_line(*state) for state in path]
    
    # Create plotter
    pl = pv.Plotter()
    
    # Add foam mesh
    pl.add_mesh(planner.mesh_foam, opacity=0.3, color='yellow', label='Foam')
    
    # Add target mesh if available
    if hasattr(planner, 'mesh_target'):
        pl.add_mesh(planner.mesh_target, opacity=0.5, color='red', label='Obstacle')
    
    # Add wire path
    for i, (p1, p2) in enumerate(lines):
        line = pv.Line(p1, p2)
        pl.add_mesh(line, color='blue', line_width=3)
    
    # Add start and end points
    start_line = lines[0]
    end_line = lines[-1]
    pl.add_mesh(pv.Sphere(radius=2, center=start_line[0]), color='green', label='Start')
    pl.add_mesh(pv.Sphere(radius=2, center=end_line[0]), color='red', label='End')
    
    pl.add_legend()
    pl.show()

In [None]:

planner = HotwirePathPlanner(mesh_foam, mesh_target, minimum_wire_length, plane_spacing)

# Generate initial path
initial_path = planner.linear_interpolation_path(state, state_goal, 20)

# Try RRT* planning
#print("Planning path with RRT*...")
rrt_path = planner.plan_path_rrt_star(state, state_goal, n_samples=500)

# Optimize with gradient descent
print("Optimizing path with gradient descent...")
optimized_path = planner.gradient_descent_path_optimization(rrt_path)

# Compute final path cost
final_cost = planner.compute_path_penalty(optimized_path)
print(f"Final path cost: {final_cost:.2f}")

analyze_path(optimized_path, planner)
visualize_path_3d(optimized_path, planner)