This is the first tutorial on DSA! We'll recreate the third figure from the paper (except the Procrustes analysis). In doing so, we'll learn about how to structure our data matrices to fit into DSA, how to apply the DSA to data, and how to select various paramters for DSA.

In [1]:
from DSA import DSA
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import MDS
import seaborn as sns
import pandas as pd

rng = np.random.default_rng(2023)


ModuleNotFoundError: No module named 'DSA'

First, we need to define our models, which we construct as ordinary differential equations. There are 3 types of models:
* Bistable switch (nonlinear system)
* Line attractor (linear system)
* Point attractor (linear system)

The goal of this demonstration is to show that even the condition averaged trajectories sampled from these systems are the same, DSA can distinguish between the systems. 

How do we do this? Thanks to Galgali et al., (Nat. Neuro, 2023), we can construct three systems with different intrinsic dynamics that can be controlled to have the same condition averages. The process is a very simple feedback linearization scheme. We'll sample from the first system first and then use those condition averages to drive the latter two systems. 


In [2]:
#defining our models
def run_model1(cohs,params,time=100,cond_avgs=None,seeded=False):
    '''
    simple saddle model
    '''
    a = params.get('a',-.6)
    b = params.get('b',2)
    c = params.get('c',-1)
    dt = params.get('dt',1)
    ntrials = params.get('ntrials',10)
    sigma = params.get('sigma',0)
    steps = int(time / dt)
    x = np.zeros((len(cohs),ntrials,steps,2))
    cohs = np.array(cohs)
    cohs = np.repeat(cohs[:,np.newaxis],ntrials,axis=1)
    input_optimized = np.zeros((len(cohs),ntrials,steps,2))
    for i in range(1,steps):
        dx = a*x[:,:,i-1,0]**3 + b*x[:,:,i-1,0] + cohs
        dy = c*x[:,:,i-1,1] + cohs
        dx = np.concatenate([dx[:,:,np.newaxis],dy[:,:,np.newaxis]],axis=2)
        if cond_avgs is not None:
            xavg = x[:,:,i-1].mean(axis=1)
            
            dx_avg = a*xavg[:,-1]**3 + b*xavg[:,-1] + cohs.mean(axis=1)
            dy_avg = c*xavg[:,-1] + cohs.mean(axis=1)
            dx_avg = np.concatenate([dx_avg[:,np.newaxis],dy_avg[:,np.newaxis]],axis=1)
            inp = (1/dt) * (cond_avgs[:,i] - xavg - dx_avg * dt)
            input_optimized[:,:,i] = np.repeat(inp[:,np.newaxis],ntrials,axis=1)
        else:
            input_optimized[:,:,i] = 0
        
        if seeded:
            rand = rng.normal(0,sigma,size=(len(cohs),ntrials,2))
        else:
            rand =  np.random.normal(0,sigma,size=(len(cohs),ntrials,2))
        x[:,:,i] = x[:,:,i-1] + dt * (dx + input_optimized[:,:,i] + rand)
               
    #xdot = [ax^3 + bx, cy]
    cond_avg = np.mean(x,axis=1)
    return x,cond_avg

def run_model2(cond_avgs,params,time=100):
    '''
    line attractor model
    '''
    l0 = params.get('l0',[1,1])
    l0 /= np.linalg.norm(l0)
    r0 = params.get('r0',[1,0])
    sigma = params.get('sigma',0)
    dt = params.get('dt',1)
    ntrials = params.get('ntrials',10)
    eval1 = params.get('eval1',-1)
    evals = np.diag([0,eval1])
    # Mrot = np.array([[0,-1],[1,0]])
    # r1 = Mrot @ l0
    r1 = l0
    R = np.array([r0,r1])
    L = np.linalg.inv(R)
    A = R @ evals @ L
    theta = np.radians(45)
    c, s = np.cos(theta), np.sin(theta)
    Mrot = np.array(((c, -s), (s, c)))
    A = Mrot @ A

    steps = int(time / dt)
    x = np.zeros((cond_avgs.shape[0],ntrials,steps,2))
    input_optimized = np.zeros((cond_avgs.shape[0],ntrials,steps,2))
    for i in range(1,steps):
        dx = np.einsum('ij,mkj->mki',A,x[:,:,i-1])
        #dx = A @ x[:,:,i-1]    
        # dx_avg = np.einsum('ij,mkj->mki',A,cond_avgs[:,i-1])
        xavg = x[:,:,i-1].mean(axis=1)
        dx_avg = A @ xavg #cond_avgs[:,i-1]
        inp = (1/dt) * (cond_avgs[:,i] - xavg - dx_avg * dt)
        input_optimized[:,:,i] = np.repeat(inp[:,np.newaxis],ntrials,axis=1)
        # diff = cond_avgs[:,i] - x[:,:,i-1].mean(axis=1)
        # opt_input = (diff - dx.mean(axis=1)) 
        # input_optimized[:,:,i-1] = np.repeat(opt_input[:,np.newaxis],ntrials,axis=1)
        x[:,:,i] = x[:,:,i-1] + dt*(dx + input_optimized[:,:,i] + 
                np.random.normal(0,sigma,size=(cond_avgs.shape[0],ntrials,2)))
    return x, input_optimized

def run_model3(cond_avgs,params,time=100):
    a1 = params.get('a1',-0.5)
    a2 = params.get('a2',-1)
    A = np.diag([-np.abs(a1),-np.abs(a2)])
    sigma = params.get('sigma',0)
    dt = params.get('dt',1)
    steps = int(time / dt)
    ntrials = params.get('ntrials',10)
    x = np.zeros((cond_avgs.shape[0],ntrials,steps,2))
    input_optimized = np.zeros((cond_avgs.shape[0],ntrials,steps,2))
    for i in range(1,steps):
        dx = np.einsum('ij,mkj->mki',A,x[:,:,i-1])
        #dx = A @ x[:,:,i-1]
        xavg = x[:,:,i-1].mean(axis=1)
        dx_avg = A @ xavg #cond_avgs[:,i-1]
        # dx_avg = np.einsum('ij,mj->mi',A,cond_avgs[:,i-1])
        inp = (1/dt) * (cond_avgs[:,i] - xavg - dx_avg * dt)
        input_optimized[:,:,i] = np.repeat(inp[:,np.newaxis],ntrials,axis=1)      
          # diff = cond_avgs[:,i] - x[:,:,i-1].mean(axis=1)
        # opt_input = (diff - dx.mean(axis=1)) 
        # input_optimized[:,:,i-1] = np.repeat(opt_input[:,np.newaxis],ntrials,axis=1)
        x[:,:,i] = x[:,:,i-1] + dt*(dx + input_optimized[:,:,i] + 
               np.random.normal(0,sigma,size=(cond_avgs.shape[0],ntrials,2)))
    return x, input_optimized

In [None]:
#let's visualize a sample from each 
x1,cond_avg = run_model1([-.1,.1],dict(dt=0.01,sigma=0.1,ntrials=20))
print(x1.shape,cond_avg.shape)

x2,input_optimized2 = run_model2(cond_avg,dict(dt=0.01,sigma=0.1,ntrials=20))
cond_avg2 = x2.mean(axis=1)
print(x2.shape,cond_avg2.shape)

x3,input_optimized3 = run_model3(cond_avg,dict(dt=0.01,sigma=0.1,ntrials=20))
cond_avg3 = x3.mean(axis=1)
print(x3.shape,cond_avg3.shape)

fig,ax = plt.subplots(1,3,figsize=(8,3),sharex=True,sharey=True)
for i in range(x1.shape[0]):
    for k in range(3):
        ax[k].plot(cond_avg[i,:,0],cond_avg[i,:,1])
    for j in range(x1.shape[1]):
        if j == 10:
            break
        for k in range(3):
            ax[k].plot(x1[i,j,:,0],x1[i,j,:,1],c="orange" if i else "blue", alpha = 0.2)

for i in range(3):
    ax[i].set_ylim(-0.15,0.15)
    ax[i].set_xlabel(r"$h_1$")
ax[0].set_ylabel(r"$h_2$")

ax[0].set_title("Saddle Point")
ax[1].set_title("Line Attractor")
ax[2].set_title("Point Attractor")
plt.tight_layout()



In [None]:
#some code that's relevant for processing our data
def flatten_x(x1):
    return x1.reshape(x1.shape[0]*x1.shape[1],x1.shape[2],x1.shape[3])

def reshape_cavg(cond_avg):
    return cond_avg.reshape(cond_avg.shape[0]*cond_avg.shape[1],cond_avg.shape[2])


In [None]:
#define the parameters for our samples and our dmd
nmodels = 30 #3 x nmodels total
sigma = 0.05 #trajectory noise

ntrials = 300 #number of trajectories per model
dt = 0.01 #timestep simulated in the odes


#vary params for model 1
a = np.random.uniform(-5,-3,size=nmodels)  
b = np.random.uniform(4,7,size=nmodels)
c = np.random.uniform(-4,-2,size=nmodels)

#vary params for model 2
eval1 = np.random.uniform(-1,-3,size=nmodels)

#vary params for model 3
a1 = np.random.uniform(-2,-5,size=nmodels)
a2 = np.random.uniform(-8,-10,size=nmodels)


In [None]:
models = []
model_names = []
# modelnames = {1:'bistable', 2:'line attractor', 3:'point attractor'}
for i in range(nmodels):    
    x1,cond_avg = run_model1([-.1,.1],dict(dt=dt,sigma=sigma,ntrials=ntrials,a=a[i],b=b[i],c=c[i]))  

    cavg_baseline = cond_avg #if we want to reset this every time
    #x has shape conditions x trials x time x dimension
    models.append(x2)
    model_names.append('bistable')

    x2,input_optimized = run_model2(cavg_baseline,dict(dt=dt,sigma=sigma,ntrials=ntrials,eval1=eval1[i]))
    models.append(x2)
    model_names.append('line attractor')

    x3,input_optimized = run_model3(cavg_baseline,dict(dt=dt,sigma=sigma,ntrials=ntrials,a1=a1[i],a2=a2[i]))
    models.append(x3)
    model_names.append('point attractor')

In [None]:
#dmd parameters, all others are default for now.
n_delays = 50
delay_interval = 20
rank = 30

dsa = DSA(models,n_delays=n_delays,rank=rank,delay_interval=delay_interval)
similarites = dmd.fit_score()

In [None]:
df = pd.DataFrame()
df['Model Type'] = model_names
reduced = MDS(dissimilarity='precomputed').fit_transform(similarities)
df["0"] = lowd_embedding[:,0] 
df["1"] = lowd_embedding[:,1]

palette = 'plasma'
sns.scatterplot(data=df,x="0",y="1",hue="Model Type",palette=palette)
plt.xlabel(f"MDS 1")
plt.ylabel(f"MDS 2")
plt.tight_layout()

