In [None]:
import numpy as np
import matplotlib.pyplot as plt
from egoaxis import rot_axis_90deg_cc

## Module

In [None]:
# Note:
# To make it match with the previous measurement, init with the last point
# of the previous measurement and keep it fixed.
# Use the largest principal component vector, project points, keep last point
# fixed an rotate line around this point, so the first point matches the
# reference point from the last measurement.

class Polyline(object):
    def __init__(self, vertices=None):
        self._vertices = np.empty((0, 2))
        self._segment_lengths = np.empty(0)
        if vertices is not None:
            vertices = np.atleast_2d(vertices)
            if len(vertices.shape) != 2 or vertices.shape[1] != 2:
                raise ValueError("vertices must have shape (N, 2)")
            for v in vertices:
                self.append_vertex(v)

    def get_value(self, t):
        """
        Return the (interpolated) value at the given parameter t (in [0, 1])
        """
        t = np.clip(np.asarray(t), 0., 1.)
        # Select segments to interpolate each t in
        cumlen = (np.cumsum(self._segment_lengths) /
                  np.sum(self._segment_lengths))
        cumlen = np.insert(cumlen, 0, 0.)
        segment_idx = np.searchsorted(cumlen, t)
        # Handle first segment separately due to index out of bounds
        m = segment_idx > 0
        v0 = np.repeat(self._vertices[[0]], repeats=len(t), axis=0)
        v1 = np.repeat(self._vertices[[1]], repeats=len(t), axis=0)
        v0[m] = self._vertices[segment_idx - 1][m]
        v1[m] = self._vertices[segment_idx][m]

        # Normalize ts in [0, 1] relative to corresponding segment
        segment_idx = np.maximum(0, segment_idx - 1)
        t_norm = ((t - cumlen[segment_idx]) /
                  (cumlen[segment_idx + 1] - cumlen[segment_idx]))

        # Interpolated x and y values at ts
        v = (v1 - v0) * t_norm.reshape(len(t), 1) + v0
        return v
    
    def get_dist_to_line(self, pt):
        """
        Returns the smallest distance to any line segment in the polyline.
        
        Returns projection vectors from point to clostest point on the line, the
        distances and an array which encodes to which entity the distance was
        clostest to (integer >= 0: vertex id,
        integer < 0: -1 * (segment id + 1)).
        """
        # Call dist to segment for each segment, then filter results
        proj_vecs, dists, proj_to = [], [], []
        for seg_idx in range(len(self._segment_lengths)):
            _pv, _d, _pt = self._get_dist_to_segment(pt, seg_idx)
            proj_vecs.append(_pv)
            dists.append(_d)
            # Shift IDs to global indices, segments start at -1, -2, ..., -(n-1)
            m = _pt < 0
            _pt[m] = _pt[m] * (seg_idx + 1)
            _pt[~m] = _pt[~m] + seg_idx
            proj_to.append(_pt)
            
        # Argmin for all segments is closest to whole curve
        idx = np.argmin(dists, axis=0)
        proj_vecs = np.swapaxes(proj_vecs, 0, 1)
        dists = np.asarray(dists).T
        proj_to = np.asarray(proj_to).T
        proj_vecs = np.asarray([_pv[i] for _pv, i in zip(proj_vecs, idx)])
        dists = np.asarray([_d[i] for _d, i in zip(dists, idx)])
        proj_to = np.asarray([_pt[i] for _pt, i in zip(proj_to, idx)])

        return idx, proj_vecs, dists, proj_to
    
    def append_vertex(self, v):
        """ Append vertices v to the end of the polyline """
        self.insert_vertex(v, self.nvertices)
        
    def insert_vertex(self, v, idx):
        """
        Insert vertices v to the polyline at position idx, before the existing
        element.
        """
        if idx > self.nvertices:
            raise ValueError("index out of bounds for current polyline.")
            
        v = np.atleast_2d(v)
        ninserted = len(v)
        self._vertices = np.insert(self._vertices, idx, v, axis=0)
        self._update_segments(idx0, idx1)
        
    def _update_segments(self, idx0, idx1):
        """
        Updates the segment lengths, should be called after adding or
        removing vetices.
        Only the segments in the given range are updated to save computation.
        """
        if self.nvertices > 1:
            if idx == 0:
                lens = self._get_segment_lengths(self._vertices[:ninserted])
            else:
                lens = self._get_segment_lengths(
                    self._vertices[idx - 1:idx + ninserted])   
            self._segment_lengths = np.insert(
                self._segment_lengths, max(0, idx-1), lens)
            
    def insert_vertex_between(self, idx, scale):
        """
        Inserts a new vertex on the segment between vertex idx and vertex
        idx + 1 if existing (polyline must have idx + 1 vertices).
        The distance to v0 and v1 is defined by scale, where scale = 0 means the
        new vertex is the same as v0, scale = 0.5 is exactly the middle between
        v0 and v1 and scale = 1 means the new vertex is the same as v1.
        
        Returns the new vertex and its index in the updated polyline.
        """
        if idx > self.nvertices - 2:
            raise ValueError("idx must be < nvertices - 1")
        if scale < 0. or scale > 1.:
            raise ValueError("scale must be in [0, 1]")
            
        # If scale is 0 or 1, just duplicate the existing vertices
        if scale == 0.:
            new_v = self.vertices[idx]
        elif scale == 1.:
            new_v = self.vertices[idx + 1]
        else:
            v0, v1 = self.vertices[idx], self.vertices[idx + 1]
            new_v = scale * (v1 - v0) + v0
        
        self.insert_vertex(new_v, idx + 1)  # Append between v0, v1
        return new_v, idx + 1
    
    def remove_vertex(self, idx):
        """ Removes vertex at idx """
        
            
    def replace_vertex(self, v, idx):
        """ Replace vertex idx with v """
        
    def clear(self):
        self._vertices = np.empty((0, 2))
        self._segment_lengths = np.empty(0)
        
    def _get_segment_lengths(self, v):
        if len(v) < 2:
            return np.empty(0)
        return np.sqrt((v[:-1, 0] - v[1:, 0])**2 + (v[:-1, 1] - v[1:, 1])**2)
    
    def _get_dist_to_segment(self, pt, idx):
        """
        Get distance of pt to specified segment idx in the polyline.
        
        Returns projection vectors from point to clostest point on segment, the
        distances and an array which encodes to which entity the distance was
        clostest to (0: first vertex, -1: segment line, +1: second vertex).
        """
        pt = np.atleast_2d(pt)
        v0 = self._vertices[[idx]]
        v1 = self._vertices[[idx + 1]]
        # dproj = projection vector, dpt = pt vector relative to v0
        dproj = v1 - v0
        dpt = pt - v0
        norm_dproj = np.linalg.norm(dproj)
        norm_dpt = np.linalg.norm(dpt, axis=1)
        
        if np.isclose(norm_dproj, 0):
            # Both vertices are almost equal, return distance pt to v0
            return dpt, norm_dpt 
        
        # Project dpt on line (v1, v0)
        dproj = dproj / norm_dproj
        proj_lens = np.dot(dpt, dproj.T)
        tangent_vecs = proj_lens * dproj
        
        # If projected length is not in [0, 1] it is outside the line element
        # and the distance to v0 (<0) or v1 (>len(pt)) is returned
        m0 = np.ravel(proj_lens < 0.)
        m1 = np.ravel(proj_lens > norm_dproj)
        # The projection vectors are orthogonal to the line or point to v0, v1
        proj_vecs = tangent_vecs - dpt  # projected on segment line
        proj_vecs[m0] = -dpt[m0]  # proj_len < 0
        proj_vecs[m1] = v1 - pt[m1]  # proj_len > norm_dproj
        
        # Encode if the distance is closest to vertex 0 (id=0), the segment
        # (id=-1) or vertex 1 (id=1)
        proj_to = np.zeros(len(proj_vecs), dtype=int) - 1
        proj_to[m0] = 0
        proj_to[m1] = 1
        
        return proj_vecs, np.linalg.norm(proj_vecs, axis=1), proj_to
     
    @property
    def nvertices(self):
        return len(self._vertices)
    
    @property
    def nsegments(self):
        return len(self._segment_lengths)
    
    @property
    def vertices(self):
        return self._vertices.copy()

    @property
    def segment_lengths(self):
        return self._segment_lengths.copy()
    
    @property
    def line_length(self):
        return np.sum(self._segment_lengths)
    
    @property
    def xcoords(self):
        return self._vertices[:, 0]

    @property
    def ycoords(self):
        return self._vertices[:, 1]

        
def proj_points(pts, vec):
    """
    Projects points with shape (len(vec), N) along direction of vec (1d-array).
    """
    vec = np.asarray(vec)
    pts = np.atleast_2d(pts)
    
    unit = vec / np.linalg.norm(vec)
    proj_len = np.matmul(pts.T, vec)

    return proj_len * vec.reshape(len(vec), 1), proj_len


def get_max_eigvec(data):
    # Get direction and value of largest eigenvalue from cov(data)
    cov = np.cov(data)
    eigvals, eigvecs = np.linalg.eigh(cov)
    eigvec_max = eigvecs[np.argmax(eigvals)]
    return eigvec_max, eigvals, eigvecs


def poly_principal_curve(data, niter="auto"):
    """
    Computes a principal curve for the given data points (shape (2, n)).
    Source
    ------
    Kegl: Learning and Design of Principal Curves
        (https://www.lri.fr/~kegl/research/PDFs/KeKrLiZe00.pdf)
    """
    data = np.atleast_2d(data)
    if len(data.shape) != 2 or data.shape[0] != 2:
        raise ValueError("'data' must have shape (2, nPoints).")
    N = data.shape[1]
    
    # Center data at  (0, 0)
    mean = np.mean(data, axis=1).reshape(2, 1)
    data -= mean
    
    # Following the algorithm in chapter 3 of the paper
    
    # Initialization: Use first principal value of data as projection line d and
    # pick the line length L as the distance of min, max points projected on d
    eigvec_max, _, _ = get_max_eigvec(data)
    # Project along largest eigenvector and determine line length by finding
    # the outermost projected points along the projection line
    proj_vec, proj_len = proj_points(data, eigvec_max)
    idx = np.argsort(proj_len)
    # Keep the original order intact for the projection
    idx_min, idx_max = ((idx[0], idx[-1]) if
                        idx[0] < idx[-1] else (idx[-1], idx[0]))
    dmin, dmax = data[:, idx_min], data[:, idx_max]
    length = np.linalg.norm(dmax - dmin)
    
    # Iteration: Add vertex each iter until converged or max vertices reached
    pl = Polyline([dmin, dmax])
    converged = False
    # Empirical from paper, at least 2
    max_vertices = max(int(N**(1. / 3.)) + 1, 2.)
    for i in range(max_vertices):
        # Vertex optimization:
        # 1. Optimize each existing vertex while fixing the others
        # 2. As long as the curve with k current indices is locally closest
        step_converged = False
        while not step_converged:
            for j, vert in pl.vertices:
                pass
            step_converged = True
    
    return dmin, dmax, length

In [None]:
pl = Polyline([[1, 3], [4, 5], [6, 2], [2, 3]])
pl.insert_vertex_between(0, scale=0.5)
print(pl.vertices)

In [None]:
pt = np.array([[1, 2], [3, 2], [5, 6], [3, 5], [3, 4], [2, 5]])
idx, proj_vecs, dists, proj_to = pl.get_dist_to_line(pt)

In [None]:
plt.plot(pl.xcoords, pl.ycoords, ls="-", marker="o", c="C0")
plt.plot(pt[:, 0], pt[:, 1], marker="x", lw=4, c="k", ls="none")

for pti, pvi in zip(pt, proj_vecs):
    plt.arrow(pti[0], pti[1], pvi[0], pvi[1])
    
print(proj_to)

plt.xlim(0, None)
plt.ylim(0, None)
plt.gca().set_aspect("equal")
plt.show()

In [None]:
curve = Curve2D(curvature=-4., curve_rel_start=0.3)
x = np.linspace(0, 10, 50)
y, dy = curve(x)
x_sam = np.linspace(1, 10, 20)
x_sam, y_sam = curve.sample(x_sam, stddev=0.4, seed=2)

data = np.asarray([x_sam, y_sam])

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(12, 6))

# Truth and sampled points
ax.plot(x, y)
ax.plot(x_sam, y_sam, marker="o", ls="-")

ax.set_xlim(0, 1.1 * max(np.amax(x_sam), np.amax(x)))
ax.set_ylim(-5, 5)
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.plot(0, 0, marker=10, ms=10, ls="none", c="k")

# Principal value line
eigvec, _, _ = get_max_eigvec(data)
mean = np.mean(data, axis=1)
mx = np.r_[mean[0] - 10 * eigvec[0], mean[0] + 10 * eigvec[0]]
my = np.r_[mean[1] - 10 * eigvec[1], mean[1] + 10 * eigvec[1]]
ax.plot(mean[0], mean[1], ls="none", marker="x", ms=10, c="C3", mew=3, zorder=1)
ax.plot(mx, my, ls="--", c="C3", lw=2)

# Projected points
mean = np.mean(data, axis=1).reshape(2, 1)
proj_vec, proj_len = proj_points(data - mean, eigvec)
proj_vec += mean
for proj, pt in zip(proj_vec.T, data.T):
    dx = [proj[0], pt[0]]
    dy = [proj[1], pt[1]]
    ax.plot(dx, dy, lw=1, c="k")

# ax = rot_axis_90deg_cc(ax)
ax.grid()
ax.axvline(0, 0, 1, c="k", lw=1, zorder=1)
ax.axhline(0, 0, 1, c="k", lw=1, zorder=1)
ax.set_axisbelow(True)
ax.set_aspect("equal")

plt.show()

## Generator

In [None]:
# Flaws: Not a real parametric curve, densitiy of sampled points change
#        around line when derivative get larger
class Curve2D(object):
    def __init__(self, curvature=0, curve_rel_start=0):
        """
        curvature = scaling factor of first quarter of sine wave
        curve_rel_start in [0, 1] -> 0 = curve start at beginning, 1 = at end 
        """
        if curve_rel_start < 0 or curve_rel_start > 1:
            raise ValueError("curve_rel_start must be in [0, 1]")
        self.curvature = curvature
        self.curve_rel_start = curve_rel_start

    def sample(self, x, stddev=1., seed=None):
        """
        Model: (x, y) = (x, f(x)) + rot(alpha) * norm(0, stddev)
        where rot(alpha) rotates the point to the normal vector of the curve at
        f(x) (samples with stddev along the normal vector at each (x, f(x))).
        """
        x = np.asarray(x)
        y, dy = self(x)
        # Sample offset along normal vector
        rndgen = np.random.RandomState(seed)
        # y_sam = rndgen.normal(0, stddev, size=x.shape)
        y_sam = rndgen.uniform(-stddev, stddev, size=x.shape)
        y_sam = rndgen.normal(0, stddev, size=x.shape)
        x_sam = np.zeros_like(x)
        # Rotate sampled points along normal vector
        dx = np.ones_like(x)
        angles = np.arctan2(dy, dx)
        s, c = np.sin(angles), np.cos(angles)
        rot = np.asarray([ [[ci, -si], [si, ci]] for si, ci in zip(s, c) ])
        vec = np.asarray([
            [xi, yi] for xi, yi in zip(x_sam, y_sam) ])
        rot_vec = np.asarray([
            np.matmul(roti, veci) for roti, veci in zip(rot, vec) ])
        
        # print("x, y", x, y)
        # print("dx, dy", dx, dy)
        # print("angles", np.rad2deg(angles))
        # print("vecs", vec)
        # print("rotmats", rot)
        # print("rot vecs", rot_vec)
        
        return x + rot_vec[:, 0], y + rot_vec[:, 1]
    
    def __call__(self, x):
        """
        Get y values of the curve for given x
        """
        x = np.asarray(x)
        # Quarter sine period for the curvature starting at curve_rel_start
        xmax = np.amax(x)
        xmin = self.curve_rel_start * xmax
        normed = np.zeros_like(x)
        m = x > xmax * self.curve_rel_start
        # Transform to [pi / 2, pi]
        normed[m] = (x[m] - xmin) / (xmax - xmin) * np.pi / 2. + np.pi / 2.
        shift = np.zeros_like(x)
        shift[m] = 1.
        func = self.curvature * (np.sin(normed) - shift)
        # Get gradient (note: respect inner derivative from normalization)
        grad = np.zeros_like(x)
        grad[m] = (self.curvature * np.cos(normed[m]) /
                   (xmax - xmin) * np.pi / 2.)
        return func, grad

## Main

In [None]:
curve = Curve2D(curvature=-3., curve_rel_start=0.6)
x = np.linspace(0, 10, 50)
y, dy = curve(x)
x_sam = np.linspace(1, 10, 20)
x_sam, y_sam = curve.sample(x_sam, stddev=0.4, seed=2)

fig, ax = plt.subplots(1, 1)
ax.plot(x, y)
ax.plot(x_sam, y_sam, marker="o", ls="-")

ax.set_xlim(0, 1.1 * max(np.amax(x_sam), np.amax(x)))
ax.set_ylim(-5, 5)

ax.set_xlabel("x")
ax.set_ylabel("y")
ax.plot(0, 0, marker=10, ms=10, ls="none", c="k")

ax = rot_axis_90deg_cc(ax)
ax.grid()
ax.axvline(0, 0, 1, c="k", lw=1, zorder=1)
ax.set_axisbelow(True)

plt.show()