In [None]:
import numpy as np

In [None]:
class TPGMM:
    def __init__(self, n_components, reg=1e-6, max_iter=100, tol=1e-4):
        self.K = n_components
        self.reg = reg
        self.max_iter = max_iter
        self.tol = tol
    
    def gauss_pdf(self, X, mu, Sigma):
        D = X.shape[1]
        Sigma = Sigma + self.reg * np.eye(D)
        inv = np.linalg.inv(Sigma)
        det = np.linalg.det(Sigma)

        diff = X - mu
        expo = np.einsum("ni,ij,nj->n", diff, inv, diff)
        norm = 1.0 / np.sqrt((2*np.pi)**D * det)
        return norm * np.exp(-0.5 * expo)

    def to_frames(self, X, A, b ):
        N, P, D, _ = A.shape

        Xf = np.zeros((P,N,D)) # stores trajeectory expressed in each frame
        for j in range(P):
            for n in range(N):
                Xf[j,n] = np.linalg.solve(A[n,j], X[n] - b[n,j])

        return Xf 


    def fit(self, X , A, b):
        N, D = X.shape
        P = A.shape[1]

        Xf = self.to_frames(X,A,b)

        self.pi = np.ones(self.K) / self.K
        self.mu = np.random.randn(P, self.k, D)

        self.Sigma = [ [ np.eye(D) for _ in range(self.K)] for _ in range(P)]

        # EM loop

        for _ in range(self.max_iter):

            # E step

            resp = np.zeros((N, self.K))

            for k in range(self.K):
                prob = np.ones(N) # likelihood for each traj point of component K over all task frames 
                
                for j in range(P):
                    prob = prob * self.gauss_pdf(Xf[j], self.mu[j,k], self.Sigma[j,k])
                
                resp[:, k] = self.pi[k] * prob

            resp = resp / resp.sum(axis = 1, keepdims=True) + 1e-12
            Nk = resp.sum(axis=0)

            # M step

            self.pi = Nk / N
            for j in range(P):

                Xj = Xf[j]

                for k in range(self.K):
                    w = resp[:, k][:, None]

                    mu = (w * Xj).sum(axis=0) / Nk[k]
                    diff = Xj - mu
                    Sigma = (w * diff).T @ diff / Nk[k]

                    Sigma += self.reg + np.eye(D)
                    self.mu[j, k] = mu
                    self.Sigma[j, k] = Sigma



    def imitate(self, A_new, b_new, time_query,
            in_dims=[0], out_dims=[1, 2]):
        """
        Generate trajectory using TP-GMM + GMR

        Parameters
        ----------
        A_new : (P,D,D)
        b_new : (P,D)
            Task frames for reproduction

        time_query : (T,)
            Time steps to query

        Returns
        -------
        y_pred : (T, len(out_dims))
        """

        P, K, D = self.mu.shape

        # -------------------------------------------------
        # Step 1: Adapt Gaussians via product
        # -------------------------------------------------
        mu_adapt = np.zeros((K, D))
        Sigma_adapt = np.zeros((K, D, D))

        for k in range(K):
            mus_global = []
            Sigmas_global = []

            for j in range(P):
                mu_g = A_new[j] @ self.mu[j, k] + b_new[j]
                Sigma_g = A_new[j] @ self.Sigma[j, k] @ A_new[j].T

                mus_global.append(mu_g)
                Sigmas_global.append(Sigma_g)

            mus_global = np.array(mus_global)
            Sigmas_global = np.array(Sigmas_global)

            # product of Gaussians
            Sigma_inv_sum = np.zeros((D, D))
            weighted_mu_sum = np.zeros(D)

            for j in range(P):
                inv = np.linalg.inv(Sigmas_global[j])
                Sigma_inv_sum += inv
                weighted_mu_sum += inv @ mus_global[j]

            Sigma_k = np.linalg.inv(Sigma_inv_sum)
            mu_k = Sigma_k @ weighted_mu_sum

            mu_adapt[k] = mu_k
            Sigma_adapt[k] = Sigma_k

        # -------------------------------------------------
        # Step 2: Prepare time input
        # -------------------------------------------------
        x_in = time_query.reshape(-1, 1)
        T = len(time_query)
        D_out = len(out_dims)

        y_pred = np.zeros((T, D_out))

        # -------------------------------------------------
        # Step 3: GMR
        # -------------------------------------------------
        for t in range(T):
            h = np.zeros(K)
            cond_means = np.zeros((K, D_out))

            for k in range(K):
                mu_in = mu_adapt[k][in_dims]
                mu_out = mu_adapt[k][out_dims]

                Sigma_ii = Sigma_adapt[k][np.ix_(in_dims, in_dims)]
                Sigma_oi = Sigma_adapt[k][np.ix_(out_dims, in_dims)]

                # responsibility
                diff = x_in[t] - mu_in
                inv = np.linalg.inv(Sigma_ii)
                det = np.linalg.det(Sigma_ii)
                norm = 1.0 / np.sqrt(
                    (2*np.pi)**len(in_dims) * det
                )

                h[k] = self.pi[k] * norm * np.exp(
                    -0.5 * diff.T @ inv @ diff
                )

                # conditional mean
                cond_means[k] = (
                    mu_out + Sigma_oi @ inv @ diff
                )

            # normalize responsibilities
            h /= h.sum() + 1e-12

            # mixture output
            y_pred[t] = (h[:, None] * cond_means).sum(axis=0)

        return y_pred


In [None]:
def build_frames(X):
    """
    X: (N,D) single trajectory

    returns:
        A: (N,P,D,D) , N-> number of datapoints , P-> no of task frames, D-> dim of data
        b: (N,P,D)
    """

    N, D = X.shape
    P = 2 # one synthetic task frame one goal frame

    A = np.zeros((N,P,D,D))
    b = np.zeros((N,P,D))


    A_start = compute_direction_frame(X[0], X[min(5,N-1)])
    b_start = X[0]

    A_goal = compute_direction_frame(X[-6], X[-1])
    b_goal = X[-1]

    for n in range(N):
        A[n, 0] = A_start
        b[n, 0] = b_start

        A[n, 1] = A_goal
        b[n, 1] = b_goal

    return A, b

def compute_direction_frame(p0, p1):
    """
    Build rotation matrix whose x-axis aligns with motion direction.
    Works in 2D.
    """
    d = p1 - p0
    d = d / (np.linalg.norm(d) + 1e-12)

    # perpendicular vector
    perp = np.array([-d[1], d[0]])

    A = np.stack([d, perp], axis=1)
    return A



In [None]:
A, b = build_frames_from_trajectory(X_train)
model.fit(X_train, A, b)

A_new = A[0]   # (P,D,D)
b_new = b[0]   # (P,D)

t_query = np.linspace(0, 1, 100)
traj = model.imitate(A_new, b_new, t_query)
