In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
tau = 1
yzero = 0

# arbitrary?
alpha_z = 10
beta_z = alpha_z/4

In [None]:
def phase_func(t):
    phasefactor = -np.log(0.01)
    return np.exp(-t * phasefactor)
N = 50
sigma = 2/N
cs = phase_func(np.linspace(0,1,N,endpoint=True))

In [None]:
# Yeah, i don't _really_ get it, but an original Ijspeert paper
# (and the Learning Parametric Dynamic Movement... paper)
# have the forcing term affect velocity, not acceleration
def numeric_integration(ydemos, ts, tau, g, alpha_z, beta_z):
    step_size = 0.00001
    i = 0
    t = ts[i]
    i += 1
    z = 0
    zs = []
    zs.append(z)
    while i < len(ts):
        while i < len(ts) and t < ts[i]:
            interp_frac = (t - ts[i-1])/(ts[i] - ts[i-1])
            y = (1-interp_frac) * ydemos[i-1] + interp_frac * ydemos[i]
            z += alpha_z * (beta_z * (g - y) - z)/tau * step_size
            t += step_size
        zs.append(z)
        i += 1
    return np.array(zs)

In [None]:
# basisphis, targetfunction, xs are all evaluated at ts
def fit_target_i(i, basisphis, targetfunction, xs, yzero, g):
    s = xs * (g - yzero)
    gamma = np.diag(basisphis[i])
    # equation 2.14 from Dynamical Movement Primitives: Learning Attractor Models for Motor Behaviors
    numerator = s @ gamma @ targetfunction
    denominator = s @ gamma @ s
    return numerator / denominator

In [None]:
def simulate(fitted_f, ts, g, alpha_z, beta_z):
    step_size = 0.00001
    i = 0
    t = ts[i]
    ys = [] # position
    zs = [] # velocity
    y = 0
    z = 0
    t = 0
    while i < len(ts):
        while i < len(ts) and t < ts[i]:
            interp_frac = (t - ts[i-1])/(ts[i] - ts[i-1])
            f = (1-interp_frac) * fitted_f[i-1] + interp_frac * fitted_f[i]
            z += alpha_z * (beta_z * (g - y) - z)/tau * step_size
            y += (z + f)/tau * step_size
            t += step_size
        ys.append(y)
        zs.append(z)
        i += 1
    return (np.array(ys), np.array(zs))

In [None]:
fulldat = np.load("../data/trainTest2DLetterARescaled.npz")
fulldat["train"].shape

In [None]:
ixs=(10,15)#(0,1)
dat = fulldat["train"][(ixs),]
start_offset = dat[:,0:1,:]
dat = dat - start_offset
print(dat.shape)
ixs = (0,1)

# somewhere in the indexing below we don't use ixs,
# but just assert 2 trajectories in dat and assume ixs is (0,1)
# To make the rest of the code work, ensure you only have 2 trajs here

In [None]:
numdims = dat.shape[2]
numts = dat.shape[1]
numtrajs = dat.shape[0]

In [None]:

ts = np.linspace(0,1,numts)
xs = phase_func(ts)

zdemos = []
ydemos = []
ydemoprimes = []
ftargets = []
fitted_fs = []

for dim in range(numdims):
    print("dim",dim)
    for i in ixs:
        ydemo = dat[i,:,dim]
        ydemoprime = (ydemo[2:]-ydemo[:-2])/(ts[1]-ts[0])/2
        ydemoprime = np.concatenate(((ydemo[1:2]-ydemo[:1])/(ts[1]-ts[0]),ydemoprime,(ydemo[-1:]-ydemo[-2:-1])/(ts[1]-ts[0])))

        yzero = ydemo[0]
        g = ydemo[-1]
        basisphis = np.array([np.exp(-(phase_func(ts) - c)**2/((sigma * c)**2)) for c in cs])
        zdemo = numeric_integration(ydemo, ts, tau, g, alpha_z, beta_z)
        ftarget = tau * ydemoprime - zdemo
        ws = np.array([fit_target_i(i, basisphis, ftarget, xs, yzero, g) for i in range(len(basisphis))])
        fitted_f = np.einsum("it,i->t",basisphis,ws)/np.einsum("it->t",basisphis) * xs * (g-yzero)
        zdemos.append(zdemo)
        ydemos.append(ydemo)
        ydemoprimes.append(ydemoprime)
        ftargets.append(ftarget)
        fitted_fs.append(fitted_f)

In [None]:
for i in range(4):
    plt.scatter(ts,-zdemos[i])
    plt.scatter(ts,ftargets[i])
    plt.scatter(ts,fitted_fs[i])
    plt.show()

In [None]:
visualization_dims = [0,1]
interp_inds = [0,1]

In [None]:
i = 0
for dim in visualization_dims:
    for i in range(len(ixs)):
        jindex = dim * 2 + i
        ys,zs = simulate(fitted_fs[jindex],ts,dat[ixs[i],-1,dim],alpha_z,beta_z)
        plt.scatter(ts,dat[ixs[i],:,dim])
        plt.plot(ts,ys)
        plt.show()

In [None]:
for i in range(2):
    simulated_ys = []
    for dim in visualization_dims:
        jindex = dim * 2 + i
        ys,zs = simulate(fitted_fs[jindex],ts,dat[ixs[i],-1,dim],alpha_z,beta_z)
        simulated_ys.append(ys)
    simulated_ys = np.array(simulated_ys)
    plt.scatter(dat[ixs[i],:,visualization_dims[0]],dat[ixs[i],:,visualization_dims[1]])
    plt.scatter(simulated_ys[0,:],simulated_ys[1,:])
    plt.show()

# Relevant Methodology of Learning parametric dynamic movement primitives from multiple demonstrations

In [None]:
num_interp=3

In [None]:
# perform an interpolating sweep of the subspace encoding the attractor landscape for the 2 demonstrations
interp_ftargets = []
interp_fitted_fs = []
for dim in visualization_dims:
    for interp_weight in np.linspace(0,1,num_interp):
        # dat is just used to interpolate the goals, for smooth visualization....
        # if DMP worked perfectly, you could pick any goal
        g = (1-interp_weight) * dat[ixs[0],-1,dim] + interp_weight * dat[ixs[1],-1,dim]
        # we zero-centered everything, at least.
        yzero = 0
        
        basisphis = np.array([np.exp(-(phase_func(ts) - c)**2/((sigma * c)**2)) for c in cs])
        ftarget = (1-interp_weight) * ftargets[dim*2 + 0] + interp_weight * ftargets[dim*2 + 1]
        
        ws = np.array([fit_target_i(i, basisphis, ftarget, xs, yzero, g) for i in range(len(basisphis))])
        fitted_f = np.einsum("it,i->t",basisphis,ws)/np.einsum("it->t",basisphis) * xs * (g-yzero)
        interp_ftargets.append(ftarget)
        interp_fitted_fs.append(fitted_f)

In [None]:
startoffsets_interp = start_offset[interp_inds[0]] + np.linspace(0,1,num_interp).reshape(-1,1) * (start_offset[interp_inds[1]] - start_offset[interp_inds[0]])
print(startoffsets_interp)

In [None]:
targets = dat
f,ax = plt.subplots()
for i in range(2):
    plt.plot(targets[i][:,visualization_dims[0]] + i * 4*0 + start_offset[interp_inds[i],0,visualization_dims[0]],
             targets[i][:,visualization_dims[1]] + start_offset[interp_inds[i],0,visualization_dims[1]],c="k")
all_trajs = [targets[0],targets[1]]
all_offsets = [start_offset[0][0],start_offset[1][0]]
print([t.shape for t in all_trajs])
for i in [1]:
    interp_weight = i/(num_interp-1)
    simulated_ys = []
    for ix in range(2):
        g = (1-interp_weight) * dat[ixs[0],-1,visualization_dims[ix]] + interp_weight * dat[ixs[1],-1,visualization_dims[ix]]
        jindex = ix * num_interp + i
        ys,zs = simulate(interp_fitted_fs[jindex],ts,g,alpha_z,beta_z)
        simulated_ys.append(ys)
    simulated_ys = np.array(simulated_ys)
    plt.plot(simulated_ys[0,:]+i*0 + startoffsets_interp[i,visualization_dims[0]],
             simulated_ys[1,:] + startoffsets_interp[i,visualization_dims[1]])
    all_offsets.append(startoffsets_interp[i])
    all_trajs.append(simulated_ys.T)
plt.axis("equal")
all_trajs = np.array(all_trajs)
all_offsets = np.array(all_offsets)

### that order is traj0, traj1, interp
### switch ordering to traj0, interp, traj1
all_trajs = all_trajs[(0,2,1),]
all_offsets = all_offsets[(0,2,1),]

In [None]:
import matplotlib.lines as mlines 
#https://stackoverflow.com/questions/47391702/how-to-make-a-colored-markers-legend-from-scratch
import matplotlib
font = {        'size'   : 22}

matplotlib.rc('font', **font)
cmap = plt.get_cmap("viridis")
def make_plot(trajs,specified_ts,full_interp,name,ts,startoffsets):
    plot_order = [2,0,1]
    linestyles = ["dotted", "dashed","dashdot"]
    
    f = plt.figure(figsize=(8,16))
    smallht = 0.37/2
    ax1 = f.add_axes([0.1, 0.6, 0.8, 0.4])  # add the left Axes
    ax2 = f.add_axes([0.1, 0.1, 0.8, smallht])  # add the bottomright Axes
    ax3 = f.add_axes([0.1, 0.5-smallht, 0.8, smallht])  # add the topright Axes
    alpha = 0.5
    threeixs = (0,1,-1)
    legend_lines = []
    shapes =["+",None,"x"]
    for i,traj in zip(plot_order,trajs[plot_order]):
        ax1.plot(traj[:,0]+startoffsets[i][0],traj[:,1]+startoffsets[i][1],c=cmap(i/3),
                 linewidth=2 if i != 1 else 8,
                 alpha=alpha if i != 1 else 0.9, zorder=1,marker=shapes[i],markeredgecolor=cmap(i/3))
        legend_lines.append(
            mlines.Line2D([], [], color=cmap(i/3), marker=shapes[i], label=f"---------",
                          markersize=10,markeredgecolor= cmap(i/3),markeredgewidth=1))
    
    ax1.legend(handles=legend_lines,labelcolor="white", frameon=False)
    for i,t in enumerate(specified_ts):
        t = (int)(t)
        ax1.plot(full_interp[:,t,0]+startoffsets[:,0],
                 full_interp[:,t,1]+startoffsets[:,1],c="k",linestyle=linestyles[i])
        ax1.scatter(trajs[:,t,0]+startoffsets[:,0],
                    trajs[:,t,1]+startoffsets[:,1],
                    c=cmap((0,1/3,2/3)),
                    alpha=1,edgecolors= "k",linewidth=1, zorder=2)
    ax1.set_xlim(-2.5,3)
    ax1.set_ylim(-2,3.5)

    for pltax, axis, axisname in ((ax2,0,"X"),(ax3, 1,"Y")):
        for i,traj in zip(plot_order,trajs[plot_order]):
            pltax.plot(ts,traj[:,axis]+startoffsets[i,axis],c=cmap(i/3),
                     linewidth=2 if i != 1 else 5,
                     alpha=alpha if i != 1 else 0.9, zorder=1)
        for i,t in enumerate(specified_ts):
            pltax.plot(ts[t].repeat(3),
                       trajs[:,t,axis]+startoffsets[:,axis],
                       c="k",linestyle=linestyles[i])
            pltax.scatter(ts[t].repeat(3),
                          trajs[:,t,axis]+startoffsets[:,axis],
                          c=cmap((0,1/3,2/3)),
                        alpha=1,edgecolors= "k",linewidth=1, zorder=2)
        if axis==1:
            pltax.set_xticklabels([])
            pltax.set_ylim(-2,3.5)
        else:
            pltax.set_ylim(-2.5,3)
    print(f"{name}InterpolationA.pdf")
    plt.savefig(f"{name}InterpolationA.pdf",bbox_inches=matplotlib.transforms.Bbox([[0,0], [8,16]]))
    plt.show()

In [None]:
all_trajs.shape

In [None]:
make_plot(all_trajs,[118,132,138],all_trajs,"dmp",np.linspace(0,1,200),all_offsets)