In [None]:
# change working directory to be repo base
import os
os.chdir('/Users/zbh0005/Library/CloudStorage/OneDrive-AuburnUniversity/Documents/Code/yukawa-sindy')
# import function file
import Yukawa_SINDy as ys
# import libraries
from sklearn.model_selection import KFold
from sklearn.metrics import root_mean_squared_error
import numpy as np
import pysindy as ps
import matplotlib.pyplot as plt
# ignore warnings generated from using LaTeX coding in matplotlib label strings
from warnings import filterwarnings
filterwarnings('ignore', message = 'invalid escape sequence')

In [None]:
X = np.array([['traj0-19', 'traj20-39', 'traj40-59', 'traj60-79', 'traj80-99', 'traj100-119', 'traj120-139', 'traj140-159', 'traj160-179', 'traj180-199'],
              ['traj0-19', 'traj20-39', 'traj40-59', 'traj60-79', 'traj80-99', 'traj100-119', 'traj120-139', 'traj140-159', 'traj160-179', 'traj180-199']]).T
kf = KFold(n_splits=10)
for train, test in kf.split(X):
    print('train:', X[train])
    print('test:', X[test])
    print(60*'-')

In [None]:
X.T.shape

In [None]:
X = ['traj0-19', 'traj20-39', 'traj40-59', 'traj60-79', 'traj80-99', 'traj100-119', 'traj120-139', 'traj140-159', 'traj160-179', 'traj180-199']
              
kf = KFold(n_splits=10)
for train, test in kf.split(X):
    for idx in train:
        print('train:', X[idx])
    for idx in test:
        print('test:', X[idx])
    print(60*'-')

In [None]:
kf.get_n_splits()

In [None]:
X = np.array([[0., 0.], [1., 1.], [-1., -1.], [2., 2.]])
y = np.array([0, 1, 0, 1])
kf = KFold(n_splits=2)
for train, test in kf.split(X):
    print(train,test)
    X_train, X_test, y_train, y_test = X[train], X[test], y[train], y[test]

In [None]:
sims = ys.generate_training_data(noise_level=0.001)

In [None]:
x=np.array([])

In [None]:
np.hstack((x,1))

In [None]:
def same_times(list_of_sims:list):
    '''
    Description: Helper function to check if all simulations in a list of 
    Yukawa_SINDy.Yukawa_simulation objs have the same time grid. returns 
    True or False.
    '''
    same_times:bool = True
    t_check = list_of_sims[0].t
    for sim in list_of_sims[1:]:
        if not np.all(t_check == sim.t):
            same_times = False
            break
    return same_times


def kfold_training(x_train:np.ndarray, t_data:np.ndarray, n_folds:int, n_features:int):
    # check dimension of training_data and t_data arrays
    if x_train.ndim!=3:
        raise Exception('training data has wrong dimensions')
    if x_train.shape[1]!=t_data.shape[0]:
        raise Exception('time data has wrong dimensions')
    
    # perform KFold CV
    all_rmse = np.array([])
    all_coefs = np.empty((0,x_train.shape[2],n_features))
    kf = KFold(n_splits=n_folds)
    for train, test in kf.split(x_train):
        # split training data
        x_train_kf = [traj for traj in x_train[train]]
        x_test_kf  = [traj for traj in x_train[test]]
        # print(f'x_train_kf len and shape of one traj: {len(x_train_kf)}, {x_train_kf[4].shape}')
        # fit SINDy model using given threshold
        mdl = ps.SINDy(optimizer=opt, feature_library=feature_library, feature_names=feature_names)
        mdl.fit(x_train_kf, t_data, multiple_trajectories=True)
        mdl.print()

        # get coefs and append to all_coefs
        coefs = mdl.coefficients()
        coefs = coefs.reshape((1,*coefs.shape))
        all_coefs = np.vstack((all_coefs,coefs))

        # validate model against test data
        # print(f'test traj shape: {x_test_kf[0].shape}') # included for testing
        # print(f'coefficients shape: {mdl.coefficients().shape}') # included for testing
        rmse = mdl.score(x_test_kf, t=t_data, multiple_trajectories=True, metric=root_mean_squared_error)
        all_rmse = np.hstack((all_rmse, rmse))

        # # old code
        # x_dot_predicted = mdl.predict(x_test_kf)
        # fd = ps.FiniteDifference()
        # x_dot_calculated = fd._differentiate(x_test_kf)
        # rmse = root_mean_squared_error(x_dot_calculated, x_dot_predicted)
        # rmse_kf.append(rmse)
    
    return all_rmse, all_coefs


def cross_validate(all_data:list, threshold:float, feature_library, feature_names, n_folds=10):
    '''
    Description: This function performs k-fold cross-validation (cv) with k specified by the 'n_folds'
    (default 10) argument. Gets help from the 'sklearn.model_selection.KFold' object. Takes a list 
    of Yukawa_SINDy.Yukawa_simulation objects, a SINDy STLSQ threshold, a feature library 
    ('pysindy.BaseFeatureLibrary' child objs), and feature names as args. Returns a rank 3 numpy
    array of coefficients from the best two models: the one with the lowest error and the average
    coefficients of all models generated during k-fold cv.
    '''
    # check if list of sim objects
    for item in all_data:
        if not isinstance(item, ys.Yukawa_simulation):
            raise TypeError("Argument 'all_data' should be list of 'Yukawa_SINDy.Yukawa_simulation' objects")
    # check if all time grids are the same
    if not same_times(all_data):
        raise Exception("All simulations do not have the same time grid.")
    
    # extract data from sim objects
    x_data = np.array([sim.x for sim in all_data])
    t_data = sims[0].t
    n_timesteps = t_data.shape[0]
    # print(f'shape and ndims of t_data: {t_data.shape}, {t_data.ndim}') # included for testing

    # split data into withhold(testing) and training data
    n_trajectories = len(all_data)
    rng = np.random.default_rng(seed=10235783)
    withhold_idxs = rng.choice(x_data.shape[0], np.floor(0.25 * n_trajectories).astype(int), replace=False)
    withhold_idxs.sort()
    train_idxs = np.delete(np.arange(len(all_data)), withhold_idxs)
    x_train = x_data[train_idxs]
    x_withhold = x_data[withhold_idxs]

    # declare optimizer with given threshold
    opt = ps.STLSQ(threshold=threshold)

    # get number of terms in library
    rand_data = np.random.random((5000,2))
    test_mdl = ps.SINDy(optimizer=opt, feature_library=feature_library, feature_names=feature_names)
    test_mdl.fit(rand_data)
    feature_list = test_mdl.get_feature_names()
    n_features = len(feature_list)
    del test_mdl, rand_data



    return x_train


In [None]:
threshold = 0.3
feature_library = ys.generate_Yukawa_library()
feature_names = ['x', 'v']

x_train = cross_validate(sims, threshold, feature_library, feature_names)
x_train.shape

In [None]:
def test_KFold(X):
    kf = KFold(n_splits=10)
    for train, test in kf.split(X):
        print(f'training set shape: {X[train].shape}')
        print(f'testing set shape: {X[test].shape}')

In [None]:
test_KFold(x_train)

In [None]:
threshold = 0.3
feature_library = ys.generate_Yukawa_library()
feature_names = ['x', 'v']

x_train, x_withhold = cross_validate(sims, threshold, feature_library, feature_names)

In [None]:
def same_times(list_of_sims:list):
    '''
    Description: Helper function to check if all simulations in a list of 
    Yukawa_SINDy.Yukawa_simulation objs have the same time grid. returns 
    True or False.
    '''
    same_times:bool = True
    t_check = list_of_sims[0].t
    for sim in list_of_sims[1:]:
        if not np.all(t_check == sim.t):
            same_times = False
            break
    return same_times


def kfold_training(training_data:np.ndarray, n_folds:int, n_features:int):
    # check dimension of training_data array
    if training_data.ndim!=3:
        raise Exception("training data has wrong dimensions")
    
    # perform KFold CV
    rmse_kf = []
    all_coefs = np.empty((0,training_data.shape[2],n_features))
    kf = KFold(n_splits=n_folds)
    for train, test in kf.split(train_idxs):
        # split training data
        x_train_kf = [traj for traj in x_train[train]]
        x_test_kf  = [traj for traj in x_train[test]]
        # print(f'x_train_kf len and shape of one traj: {len(x_train_kf)}, {x_train_kf[4].shape}')
        # fit SINDy model using given threshold
        mdl = ps.SINDy(optimizer=opt, feature_library=feature_library, feature_names=feature_names)
        mdl.fit(x_train_kf, t_data, multiple_trajectories=True)
        mdl.print()

        # get coefs and append to all_coefs
        coefs = mdl.coefficients()
        coefs = coefs.reshape((1,*coefs.shape))
        all_coefs = np.vstack((all_coefs,coefs))

        # validate model against test data
        # print(f'test traj shape: {x_test_kf[0].shape}') # included for testing
        # print(f'coefficients shape: {mdl.coefficients().shape}') # included for testing
        rmse = mdl.score(x_test_kf, t=t_data, multiple_trajectories=True, metric=root_mean_squared_error)
        rmse_kf.append(rmse)

        # # old code
        # x_dot_predicted = mdl.predict(x_test_kf)
        # fd = ps.FiniteDifference()
        # x_dot_calculated = fd._differentiate(x_test_kf)
        # rmse = root_mean_squared_error(x_dot_calculated, x_dot_predicted)
        # rmse_kf.append(rmse)
    
    return rmse_kf, all_coefs


def cross_validate(all_data:list, threshold:float, feature_library, feature_names, n_folds=10):
    '''
    Description: This function performs k-fold cross-validation (cv) with k specified by the 'n_folds'
    (default 10) argument. Gets help from the 'sklearn.model_selection.KFold' object. Takes a list 
    of Yukawa_SINDy.Yukawa_simulation objects, a SINDy STLSQ threshold, a feature library 
    ('pysindy.BaseFeatureLibrary' child objs), and feature names as args. Returns a rank 3 numpy
    array of coefficients from the best two models: the one with the lowest error and the average
    coefficients of all models generated during k-fold cv.
    '''
    # check if list of sim objects
    for item in all_data:
        if not isinstance(item, ys.Yukawa_simulation):
            raise TypeError("Argument 'all_data' should be list of 'Yukawa_SINDy.Yukawa_simulation' objects")
    # check if all time grids are the same
    if not same_times(all_data):
        raise Exception("All simulations do not have the same time grid.")
    
    # extract data from sim objects
    x_data = np.array([sim.x for sim in all_data])
    t_data = sims[0].t
    n_timesteps = t_data.shape[0]
    print(f'shape and ndims of t_data: {t_data.shape}, {t_data.ndim}')

    # split data into withhold(testing) and training data
    n_trajectories = len(all_data)
    rng = np.random.default_rng(seed=10235783)
    withhold_idxs = rng.choice(x_data.shape[0], np.floor(0.25 * n_trajectories).astype(int), replace=False)
    withhold_idxs.sort()
    train_idxs = np.delete(np.arange(len(all_data)), withhold_idxs)
    x_train = x_data[train_idxs]
    x_withhold = x_data[withhold_idxs]

    # declare optimizer with given threshold
    opt = ps.STLSQ(threshold=threshold)

    # get number of terms in library
    rand_data = np.random.random((5000,2))
    test_mdl = ps.SINDy(optimizer=opt, feature_library=feature_library, feature_names=feature_names)
    test_mdl.fit(rand_data)
    feature_list = test_mdl.get_feature_names()
    n_features = len(feature_list)
    del test_mdl, rand_data

    return train_idxs


In [None]:
from pysindy.utils import lorenz
from scipy.integrate import solve_ivp
integrator_keywords = {}
integrator_keywords['rtol'] = 1e-12
integrator_keywords['method'] = 'LSODA'
integrator_keywords['atol'] = 1e-12

def fuckery(n_trajectories:int, duration: float or int, dt=2e-3):
    # train a test model
    t_train = np.arange(0,duration,dt)
    t_train_span = (t_train[0], t_train[-1])
    print(f'shape and ndims of t_data: {t_train.shape}, {t_train.ndim}')
    x_train = []
    for i in range(n_trajectories):
        # x0_train = [-8,8,27]
        # generate random init cond
        rng = np.random.default_rng(seed=293854)
        x0_train = 30*rng.random(3)
        if i==0:
            print(x0_train)
        x_train_traj = solve_ivp(lorenz, t_train_span, x0_train, t_eval=t_train, **integrator_keywords).y.T
        x_train.append(x_train_traj)
    return x_train


In [None]:
# train
duration = 10
dt = 2e-3
x_train = fuckery(75, duration, dt)
model = ps.SINDy()
model.fit(x_train, t=dt, multiple_trajectories=True)
model.print()

In [None]:
# test
duration = 15
x_test = fuckery(25, duration, dt)
print(f'model score: {model.score(x_test,t=dt,multiple_trajectories=True):.4f}')

In [None]:
# coefs = model.coefficients()
coefs = model.coefficients()
coefs = coefs.reshape((1,*coefs.shape))

In [None]:
coefs.shape

In [None]:
model.coefficients().shape

In [None]:
coefs

In [None]:
model.coefficients().shape

In [None]:
x_test[0].shape

In [None]:
# validate on other trajectory
t_test = np.arange(0,15,dt)
x0_test = [8,7,15]
t_test_span = (t_test[0],t_test[-1])
x_test = solve_ivp(lorenz, t_test_span, x0_test, t_eval=t_test, **integrator_keywords).y.T
print(f'Model score: {model.score(x_test,t=dt):.4f}')

In [None]:
# Generate measurement data
dt = .002

t_train = np.arange(0, 10, dt)
x0_train = [-8, 8, 27]
t_train_span = (t_train[0], t_train[-1])
x_train = solve_ivp(lorenz, t_train_span, x0_train, 
                    t_eval=t_train, **integrator_keywords).y.T
# Instantiate and fit the SINDy model
model = ps.SINDy()
model.fit(x_train, t=dt)
model.print()
# Evolve the Lorenz equations in time using a different initial condition
t_test = np.arange(0, 15, dt)
x0_test = np.array([8, 7, 15])
t_test_span = (t_test[0], t_test[-1])
x_test = solve_ivp(lorenz, t_test_span, x0_test, 
                   t_eval=t_test, **integrator_keywords).y.T  

# Compare SINDy-predicted derivatives with finite difference derivatives
print('Model score: %f' % model.score(x_test, t=dt))

In [None]:
print(ps.__version__)

In [None]:
x_list = [[[1,1,1,1],[2,2,2,2],[3,3,3,3],[4,4,4,4]],
          [[5,5,5,5],4*[6],4*[7],4*[8]],
          [4*[9],4*[10],4*[11],4*[12]]]
x = np.array(x_list)
np.average(x, axis=1)

In [None]:
x.shape

In [None]:

# extract data from sim objects
x_data = np.array([sim.x for sim in sims])
t_data = sims[0].t
n_timesteps = t_data.shape[0]

# split data into withhold(testing) and training data
n_trajectories = len(sims)
rng = np.random.default_rng(seed=10235783)
withhold_idxs = rng.choice(n_trajectories, np.floor(0.25 * n_trajectories).astype(int), replace=False)
withhold_idxs.sort()
train_idxs = np.delete(np.arange(len(sims)), withhold_idxs)
x_train = x_data[train_idxs]
x_withhold = x_data[withhold_idxs]

In [None]:
print(f"training: {x_train.shape}, withhold: {x_withhold.shape}")

In [None]:
emp = np.empty((0,5000,2))
append = np.random.random((5000,2))
append.resize((1,5000,2))
np.vstack((emp,append))

In [None]:
a = (5,2)
b = (10,*a)

In [None]:
b