In [2]:
from doctest import UnexpectedException
from re import M
import sys
sys.path.append('E:\\OneDrive - Northwestern University\\Liu Research\\Dimensionless_solid_example')
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score

from PolyDiff import PolyDiffPoint, PolyDiffPointNoise
from SeqReg import SeqReg


plt.rcParams["font.family"] = "Arial"
np.set_printoptions(suppress=True)

In [3]:
class Plasticbeam():
    '''
    Generate data for plastic beam under impulsive loading (small deformation, no axial force considered)
    '''
    def __init__(self, rho, H, L, V0, sigma0, Nt, Nx, isdimensionless=True):
        super(Plasticbeam, self).__init__()
        self.rho = rho #density
        self.H = H #beam height
        self.L = L #half length of the beam
        self.m = rho * H #line density of unit width beam
        self.V0 = V0 #impulse load
        self.sigma0 = sigma0 #yield stress
        self.M0 = sigma0 * H ** 2 / 4 #yield moment
        self.pc = 2 * self.M0 / L**2 #critical loading
        self.T1 = self.m * V0 * L **2 / (6 * self.M0)   #time for 1st stage     
        self.T = self.m * V0 * L **2 / (2 * self.M0) #total time for 2 stages
        self.Nt = Nt #number of time steps
        self.Nx = Nx #number of coor steps
        self.dimensionless = isdimensionless
    def grid(self):
        # Initialization
        t_step = self.Nt
        x_step = self.Nx
        self.t = np.linspace(0,self.T, t_step) #time grid
        self.x = np.linspace(0,self.L, x_step) #space grid beam centered at the origin
        self.t = self.t[1:] #remove boundary sigular point
        self.x = self.x[:-1] #remove boundary sigular point
        self.t1 = self.t[ : int(t_step/3)] #first one third is first stage
        self.t2 = self.t[int(t_step/3) : ] #remaining two thirds are the second stage
        self.ksi = self.L - np.sqrt(6 * self.M0 * self.t1 / (self.m * self. V0)) #location of plastic hinge
        self.ksidot = -3 * self.M0 / (self.m * self.V0 * (self.L - self.ksi)) #velocity of plastic hinge

    
    def helper(self):
        self.A2 = -self.m * self.V0 * self.ksi * (self.L - self.ksi/2) * self.ksidot / (self.L-self.ksi) **2
        self.B2 = -self.m * self.V0 * self.L ** 3 * self.ksidot / 3 / (self.L - self.ksi)**2 - self.A2 * self.L
    #solution to moment
    def solution_m(self):
        m = np.zeros((self.t.shape[0], self.x.shape[0]))
        t1_size = self.t1.shape[0]
        t2_size = self.t2.shape[0]
        for i in range(t1_size):
            for j in range(self.x.shape[0]):
                if self.x[j] < self.ksi[i]:
                    m[i][j] = self.M0
                else:
                    m[i][j] = self.m * self.V0 * (self.L * self.x[j] ** 2/2 - self.x[j]**3/6) * self.ksidot[i] / (self.L-self.ksi[i]) ** 2 + self.A2[i] * self.x[j] + self.B2[i]
        for i in range(t2_size):
            a_mid = -3 * self.M0/ (self.m * self.L**2)
            for j in range (self.x.shape[0]):
                m[i + t1_size][j] = self.m * (self.x[j] **2 /2 - self.x[j] ** 3/ (6*self.L)) * a_mid + self.M0 
        m = m[:,:]
        self.moment = m
        if self.dimensionless:
            self.df = pd.DataFrame(m) / (self.sigma0 * self. H ** 2 / 4)
        else:
            self.df = pd.DataFrame(m)
        
    #solution to velocity       
    def solution_v(self):
        v = np.zeros((self.t.shape[0], self.x.shape[0]))
        t1_size = self.t1.shape[0]
        t2_size = self.t2.shape[0]
        for i in range(t1_size):
            for j in range(self.x.shape[0]):
                if self.x[j] < self.ksi[i]:
                    v[i][j] = self.V0
                else:
                    v[i][j] = self.V0 * (self.L - self.x[j]) / (self.L-self.ksi[i])
        v_mid = np.zeros(t2_size)
        for i in range(t2_size):
                v_mid[i] = -3 * self.M0 * self.t2[i] / (self.m * self.L **2) + 3 * self.m * self.V0 / (2 * self.m)
                for j in range (self.x.shape[0]):
                    v[i + t1_size][j] = v_mid[i] * (1 - self.x[j]/self.L)
        v = v[:,:]
        self.velocity = v
        if self.dimensionless:
            self.df = self.df.join(pd.DataFrame(v / self.V0),rsuffix = 'v_')
        else:
            self.df = self.df.join(pd.DataFrame, rsuffix = 'v_')

    def solution(self):
        return self.df

#compute feature library and SINDy    
class FitEqu(object):
    '''
    For a given data, fit the governing equation.
    '''
    def __init__(self, dimensionless = True):
        super(FitEqu, self).__init__()
        self.dimesionless = dimensionless
    def prepare_data(self, rho, H, L, V0, sigma0, Nt, Nx):
        '''
        generate the dataset
        '''
        plastic = Plasticbeam(rho, H, L, V0, sigma0, Nt, Nx, isdimensionless =self.dimesionless)
        plastic.grid()
        plastic.helper()
        plastic.solution_m()
        plastic.solution_v()
        data = plastic.solution() #{M,v}
        t = plastic.t
        x = plastic.x


        return data
    
    @staticmethod
    def cal_derivatives(m, v, dx, dt):
        '''
        prepare library for regression
        '''
        t_size = m.shape[0]
        x_size = v.shape[1]
        dvdt = np.zeros((t_size - 2, x_size - 2)) #solve derivative using central difference method
        dvdx = np.zeros((t_size - 2, x_size - 2))
        dmdt = np.zeros((t_size - 2, x_size - 2))
        dmdx = np.zeros((t_size - 2, x_size - 2))
        dv2dx2 = np.zeros((t_size - 2, x_size - 2))
        dm2dx2 = np.zeros((t_size - 2, x_size - 2))
        for i in range(1, t_size - 2):
            for j in range(1, x_size - 2):
                dvdt[i][j] = (v[i+1][j] - v[i-1][j]) / 2 / dt
                dmdt[i][j] = (m[i+1][j] - m[i-1][j]) / 2 / dt
                dvdx[i][j] = (m[i+1][j] - m[i-1][j]) / 2 / dx
                dmdx[i][j] = (m[i+1][j] - m[i-1][j]) / 2 / dx
                dm2dx2[i][j] = (m[i][j+1] - 2 * m[i][j] + m[i][j-1]) / dx**2
                dv2dx2[i][j] = (v[i][j+1] - 2 * v[i][j] + v[i][j-1]) / dx**2
        
        return dvdt, dmdt, dvdx, dmdx, dv2dx2, dm2dx2
    
    @staticmethod
    def build_library(v, m, vt, mt, vx, mx, vxx, mxx): #define a new function?
        v = v [1:-1,1:-1]
        m = m [1:-1,1:-1]
        '''
        build the library for sparse regression
        '''
        X_library = [
            v.reshape(-1,1),
            m.reshape(-1,1),
            vt.reshape(-1,1),
            mt.reshape(-1,1),
            vx.reshape(-1,1),
            mx.reshape(-1,1),
            vxx.reshape(-1,1),
        ]
        X_library = np.squeeze(np.stack(X_library, axis=-1))
        names = ['v', 'm', 'vt', 'mt', 'vx','mx','vxx','mxx']
        y_library = mxx.reshape(-1,1)

        # # rescale the data 
        # norm_coef = np.mean(np.abs(np.mean(X_library, axis=0)))
        # X_library = X_library / norm_coef
        # y_library = y_library / norm_coef
        return X_library, y_library, names
    
    @staticmethod
    def fit(X_library, y_library, threshold=0.0008):
        '''
        squential threshold with dynamic threshold
        '''
        model = SeqReg()
        coef, _, r2 = model.fit_dynamic_thresh(X_library, y_library, 
                        is_normalize=False, non_zero_term=2, threshold=threshold, fit_intercept=False, model_name='Ridge')
        print('Fitting r2', r2)
        return coef


def prepare_dataset(is_show=False): #1st step of dimensionless learning: SINDy
    '''
    prepare a sets of dataset
    '''
    data = []
    fit_equ = FitEqu()

    params = [
        [2e-9, 100, 25000, 1000, 100, 700, 700], 
        [7e-9, 50, 35000, 700, 700, 700, 700],
        [3e-9, 30, 30000, 800, 500, 200, 200],
        [5e-9, 20, 20000, 500, 400, 700, 700],
    ]
    if is_show: fig = plt.figure(); 
    for rho, H, L, V0, sigma0, Nt, Nx in params:
        m = rho * H
        M0 = sigma0 * H ** 2 / 4
        et = m * V0 * L **2 / (2 * M0)
        dt = et * V0 / H / float(Nt-1) 
        dx = L/ float(Nx-1) / L
        Re = rho * V0 ** 2 / sigma0 * (2 * L / H) **2 
        print(Re)

        df_each = fit_equ.prepare_data(rho, H, L, V0, sigma0, Nt, Nx)
        df_each = df_each.to_numpy()
        m_each = df_each[:,:Nx-1]
        v_each = df_each[:,Nx-1:]
        
        vt, mt, vx, mx, vxx, mxx = fit_equ.cal_derivatives(m_each, v_each, dx, dt)

        X_library, y_library, names = fit_equ.build_library(v_each, m_each, vt, mt, vx, mx, vxx, mxx)
        coef = fit_equ.fit(X_library, y_library)
        print(coef)
        
        coef_res = [(each[0], round(each[1], 4)) for each in list(zip(names, coef.tolist())) if abs(each[1]) >= 1e-3]
        coef_res = sorted(coef_res, key=lambda x: abs(x[1]), reverse=True) #??
        Re_coef = coef_res[0][1]
        data.append([rho, H, L, V0, sigma0, Re_coef, abs(Re_coef - Re)/abs(Re)])
        
    df = pd.DataFrame(
        data, columns=['Rho', 'H', 'L', 'V0', 
                       'sigma0', 'Re_coef', 'Re_coef_err_per'])
    return df

In [4]:
df = prepare_dataset(is_show=False)

5.0
Fitting r2 0.9390749411465495
[0.         0.         5.02197108 0.         0.         0.
 0.        ]
9.604
Fitting r2 0.9390749410749977
[0.         0.         9.64614429 0.         0.         0.
 0.        ]
15.360000000000001
Fitting r2 0.9354568778597856
[ 0.          0.         15.41665623  0.          0.          0.
  0.        ]
12.5
Fitting r2 0.9390749409466584
[ 0.          0.         12.55478097  0.          0.          0.
  0.        ]


 # Recover $R_n$

 Dimension matrix for input parameters:
 $\begin{align}
     D_{in}= \begin{bmatrix}
            1, 0, 0, 0, 1  \\
            -3, 1, 1, 1, -1 \\
            0, 0, 0, -1, -2
          \end{bmatrix}
 \end{align}$
 
 Dimension matrix for output parameters:
 $\begin{align}
     D_{out}= \begin{bmatrix}
            0 \\
            0 \\
            0
          \end{bmatrix}
 \end{align}$
 
 
 Solution space is:
 $\begin{align}
     w &= \begin{bmatrix}
            0 \\
            -1 \\
            1 \\
            0 \\
            0
          \end{bmatrix} * \gamma_1 + 
          \begin{bmatrix}
            1 \\
            0 \\
            0 \\
            2 \\
            -1
          \end{bmatrix} * \gamma_2
 \end{align}$
 
 The best basis coefficients are $\gamma_1=2, \gamma_2=-1$.
 
 The best solution $w^*$ is 
 $\begin{align}
     w^* &= \begin{bmatrix}
            -1 \\
            -2 \\
            2 \\
            -2 \\
            1
          \end{bmatrix}
 \end{align}$

In [5]:
class DimensionlessLearning(object):
    '''
    Indentify the explicit form one coefficient using dimensionless learning
    '''
    
    def __init__(self, df, input_list, output_coef, dimension_info, basis_list):
        super(DimensionlessLearning, self).__init__()
        self.df = df                             #df all the parameters
        self.input_list = input_list             #input_list = ['rho', 'H', 'L', 'V0', 'sigma0']
        self.output_coef = output_coef           #output_coef = 'Re_coef (response number)'
        self.X, self.y = self.prepare_dataset()  #4 set of parameters
        #needs to be changed for a new dimension matrix
        self.dimension_info, self.basis_list = dimension_info, basis_list #dimension matrix for papameters  #basis_list
        ###############################################
        self.basis1_in, self.basis2_in = self.prepare_dimension()

    def prepare_dataset(self): #this is different from the prepare_dataset function in the 1st step;
        '''
        prepare the input and output data
        '''
        X = self.df[self.input_list].to_numpy() # m, k, A0, c  (parameter matrix)
        y = self.df[self.output_coef].to_numpy().reshape(-1, 1) # m-coef  (target m values)
        return X, y
        
    def prepare_dimension(self):
        '''
        parse dimension for input and output
        '''
        basis1_in, basis2_in = self.basis_list[0], self.basis_list[1]
        return basis1_in, basis2_in

    def fetch_coef_pi(self, coef): # seems it only works for the coefficient -m/c
        '''
        parse the combined weights for the input
        '''
        coef_pi = coef[0] * self.basis1_in - self.basis2_in
        return coef_pi
        
    def check_dimension(self, coef):
        '''
        check whether the basis vectors can formulated as the D_out
        '''
        coef_pi = self.fetch_coef_pi(coef)
        print('[check] coef_pi: \n', coef_pi)
        target_D_out = np.dot(self.dimension_info[0], coef_pi)  #self.dimension_info[0] input dimension matrix
        print('[check] target_D_out: \n', target_D_out)
        assert np.array_equal(target_D_out, self.dimension_info[1]), 'Wrong target_D_out!'

    def fit_pattern_search(self, seed): #fixed pattern search to get the optimal values for dimension \gamma
        '''
        pattern search
        '''
        def get_coordinates(a, delta):
            '''
            Build a list to store all possible coordiantes
            '''
            coord_all = []
            for a_ in [a - delta, a, a + delta]:
                if [a_] != [a]:
                    coord_all.append([a_])
            return coord_all
        
        def opt(coef):
            '''
            fit a linear regression
            '''
            coef_pi = self.fetch_coef_pi(coef)
            pi_in = np.prod(np.power(self.X, coef_pi.reshape(-1,)), axis=1).reshape(-1, 1) #terms after beta_1
            reg =LinearRegression(fit_intercept=False)
            reg.fit(pi_in, self.y)
            y_pred = reg.predict(pi_in)
            r2 = r2_score(self.y, y_pred)
            return r2, coef_pi, reg.coef_

        np.random.seed(seed)
        res, break_points = [], []
        a = np.random.choice(np.linspace(-2, 2, 9), 1)[0]  # [-2, 2] delta=0.5
        # a= 0
        coef = np.array([a]).reshape(-1, 1) #\gamma in the paper's formulation

        iter_num, max_iter, delta = 0, 10, 0.5 # delta: interval for grid search
        while iter_num < max_iter:
            candidate_coord = get_coordinates(a, delta)
            r2_center, reg_coef_center, coef_w_center = opt(coef) #r2 value at the center point [0.5]
            # print('r2_center', round(r2_center, 2), 'reg_coef_center', [round(each, 2) for each in list(reg_coef_center.reshape(-1,))])
            # print('coef_w_center', coef_w_center)

            if r2_center < 0.2:
                break_points.append([a])
                break

            r2_bounds_val = []
            for [a_] in candidate_coord: #[0.0, 1.0] for center point at 0.5
                coef_temp = np.array([a_]).reshape(-1, 1)
                r2_bound, reg_coef_bound, coef_w_bound = opt(coef_temp)
                r2_bounds_val.append(r2_bound)

            # sort r2 from high to low
            highest_index = np.argsort(r2_bounds_val)[::-1][0]
            iter_num += 1

            # udpate the center coordiantes when the R2 in the neighborhood is higher
            if r2_center < r2_bounds_val[highest_index]:
                [a] = candidate_coord[highest_index]
                coef = np.array([a]).reshape(-1, 1)
                coef_pi = self.fetch_coef_pi(coef)
                res_info = {'a': a, 'r2_center': round(r2_bounds_val[highest_index], 4)}
                # print('update', res_info)
                res.append(res_info)
            else:
                break
        
        #compute the final r2 for the fixed grid search
        coef_pi = self.fetch_coef_pi(coef)
        r2, reg_coef_final, coef_w_final = opt(coef)
        return r2, reg_coef_final, coef_w_final


def recover_coef1(seed):
    input_list = ['Rho', 'H', 'L', 'V0', 'sigma0']
    output_coef = 'Re_coef'

    D_in = np.mat('1, -3, 0; 0, 1, 0; 0, 1, 0; 0, 1, -1; 1, -1, -2').T
    D_out = np.mat('0;, 0; 0')
    dimension_info = [D_in, D_out]

    basis1_in = np.array([0, -1, 1, 0, 0]).reshape(-1, 1)
    basis2_in = np.array([-1, 0, 0, -2, 1]).reshape(-1, 1)
    basis_list = [basis1_in, basis2_in]
    
    dimensionless_learning = DimensionlessLearning(
        df, input_list, output_coef, dimension_info, basis_list)
    # dimensionless_learning.check_dimension(coef=[0])

    # pattern search
    r2, coef, coef_w = dimensionless_learning.fit_pattern_search(seed=seed)
    if r2 > 0.8:
        print('final r2', r2, coef.flatten(), coef_w)

In [6]:
for i in range(20):
    recover_coef1(seed=i)

final r2 0.9999989667133847 [ 1. -2.  2.  2. -1.] [[4.01625578]]
final r2 0.9999989667133847 [ 1. -2.  2.  2. -1.] [[4.01625578]]
final r2 0.9999989667133847 [ 1. -2.  2.  2. -1.] [[4.01625578]]
final r2 0.9999989667133847 [ 1. -2.  2.  2. -1.] [[4.01625578]]
final r2 0.9999989667133847 [ 1. -2.  2.  2. -1.] [[4.01625578]]
