In [None]:
'''
WeakIdent Tutorial part II: PDE identification using weak form

Author: Wenbo Hao

Note: the whole tutorial is adapted from the WeakIdent-Python code developed by
Mengyi Tang, available at the link below.
https://github.com/sunghakang/WeakIdent/tree/main/WeakIdent-Python

Date: 11/19/2024

Introduction:

This file is the second among two files of a tutorial for WeakIdent: a method
to identify partial differential equations from spatiotemperal data leveraging
the weak form of PDEs. This file shows the procedure to solve the identification
problem Wc = b for coefficient vector c once we get the weak feature matrices
W and b, including performing colomn normalization on W, selecting the support
of c based on sparsity level and trimming error, and recovering the coefficients.
The document didn't perform error normalization or find highly dynamic regions
for identification.  For a more comprehensive code, see
https://github.com/sunghakang/WeakIdent/tree/main/WeakIdent-Python.
The paper that develops this method is on
https://doi.org/10.1016/j.jcp.2023.112069. If you found WeakIdent useful in your
research, please consider citing it:

@article{tang2023weakident,
  title={WeakIdent: Weak formulation for Identifying Differential Equation using
  Narrow-fit and Trimming},
  author={Tang, Mengyi and Liao, Wenjing and Kuske, Rachel and Kang, Sung Ha},
  journal={Journal of Computational Physics},
  pages={112069},
  year={2023},
  publisher={Elsevier}
}

'''

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import scipy.linalg
from typing import Tuple

In [None]:
# Load our feature matrix [b|W] for the Weak Form Identification problem b = Wc.

# Note that the variable W defined below is the augemented matrix [b|W] but not
# W alone. The first colomn of our variable W refers to the LHS feature b and
# the other colomns refer to the RHS feature W.

# The weakfeature matrix W is generated in the Tutorial Part I: data preparation
# for WeakIdent

from google.colab import drive
drive.mount('/content/drive')

import pickle

with open('/content/drive/My Drive/WeakIdent_tutorial_W.pkl', 'rb') as f:
    W = pickle.load(f)

print(W)

Mounted at /content/drive
[[-0.49105281  0.08151466  0.04872886 ...  0.01989212  0.35632694
   2.73898463]
 [-0.49542526  0.08151466  0.05659904 ...  0.0294907   0.40414009
   0.96449738]
 [-0.47585739  0.08151466  0.06319881 ...  0.03966488  0.40021846
  -1.32547417]
 ...
 [ 0.30372365  0.08151466 -0.02656423 ... -0.00302245 -0.03494819
   0.24789304]
 [ 0.2916584   0.08151466 -0.02899285 ... -0.003777   -0.02377685
   0.63852418]
 [ 0.27253655  0.08151466 -0.03004175 ... -0.0041389  -0.00405409
   0.90497655]]


In [None]:
# This is the main function to identify PDE for all equations (variables)
def diff_eqn_identification(
        tau: float, num_of_variables: int, dim_x: int, is_1d_ode: bool,
        W_b_large: np.array,S_b_large: np.array) -> Tuple[np.array, np.array,
                                                          np.array]:

    """
    This function performs identification using given feature matrix
    (including lhs and rhs), scale matrix and row index of highly dynamic region.

    Args:
        tau (float): trimming score. Defaults to 0.05.
        num_of_variables(int) : number of variables (equations) of given data.
        dim_x(int): spatial dimension of given data.
        is_1d_ode (bool): whether given data is 1d-ode type.
        W_b_large (np.array): feature matrix.
        S_b_large (np.array): scale matrix, used for error normalization

    Returns:
        W(np.array):  original RHS feature matrix.
        b(np.array):  original LHS feature matrix, it can be u_t or v_t in pdes.
        c(np.array):  identified sparse coefficient vector of size (L, n)
    """

    # Convert the scale matrix for W_large to vectors of float 64 data type
    # and computes the corresponding scale matrix for the lhs and rhs
    scales_W_b = np.mean(np.abs(S_b_large), axis=0)
    scales_lhs_features, scales_rhs_features = scales_W_b, 1/scales_W_b.flatten()

    # Seperate the feature matrix W_b_large to LHS feature b and RHS feature W
    b = W_b_large[:, :num_of_variables]
    W = W_b_large[:, num_of_variables:]

    # Performing error normalization to W and b
    # This part is not yet implemented!!!
    b_tilda = b
    W_tilda = W

    # Initialize the coefficient array c
    c_pred = np.zeros((W_b_large.shape[1] - num_of_variables, num_of_variables))

    # set up maximum sparsity level. For a predetermined dictionary, one can use
    # a number <= # total features as maximum sparsity level. This number is
    # suggested to be an appropriate number to reduce computation cost.
    sparsity = set_sparsity_level(is_1d_ode, num_of_variables, dim_x)

    for num_var in range(num_of_variables):
        support_pred, coeff_sp = weak_ident_feature_selection(
            W, b, b_tilda, W_tilda, sparsity, scales_rhs_features,
            scales_lhs_features, num_var, tau)
        c_pred[support_pred, num_var] = coeff_sp
    return W, b, c_pred/100


# This is the function that identifies the coefficient vector for a single
# equation (variable).
def weak_ident_feature_selection(W: np.array,
                                 b: np.array,
                                 b_tilda: np.array,
                                 W_tilda: np.array,
                                 k: int,
                                 s_rhs: np.array,
                                 s_lhs: np.float64,
                                 num_var: int,
                                 tau=0.05) -> np.ndarray:
    """
    This function returns the support identified by WeakIdent and coefficients
    values on this support

    Args:
        w (np.array): (feature matrix array) of shape (N, L) where L is the
        number of rhs features and N is number of data points.
        b (np.array): (dynamic variable u_t) of shape (N, n), where n is the num
        of equations (variables)
        b_tilda (np.array): error normalized vector b of shape (N, n).
        w_tilda (np.array): error normalized matrix w of shape (N, L).
        k (np.array): maximum allowed sparsity level.
        s_rhs (np.array): scale vector s for rhs features, array of shape (L, ).
        s_lhs (np.float64): scale number for lhs feature (u_t).
        num_var: the index of equation (variable) for which we are performing
        identification
        tau (float, optional): trimming score. Defaults to 0.05.

    Returns:
        np.ndarray: a support vector that stores the index of finalized candiate
        features for each variable.
    """

    # Extracting the feature matrices for the one equation (variable) we are
    # working with
    b_one_var = b[:, num_var].reshape(-1, 1)
    b_tilda_one_var = b_tilda[:, num_var].reshape(-1, 1)
    s_lhs_one_var = s_lhs[num_var]

    # Initialize support dictionary and list of cross validation error
    support_list = {}
    cv_err_list = []

    # performing colomn normalization on W
    W_column_norm = np.linalg.norm(W, axis=0).reshape(1, -1)
    column_normalized_W = W / W_column_norm

    # Iterate over maximum sparsity level. For each sparsity level returns
    # append a support to the dictionary
    for i in range(k):
        support = subspace_persuit(column_normalized_W,
                                   b_one_var / np.linalg.norm(b_one_var, 2), i)
        c_pred = narrow_fit(W_tilda, b_tilda_one_var, support, s_rhs,
                            s_lhs_one_var)
        trim_score = compute_trim_score(W_column_norm, support, c_pred)
        support_list[i] = support
        cv_err_list.append(
            compute_cross_validation_err_v2(support, W_tilda, b_tilda_one_var))

        # performing trimming on the support, cutting the terms with trim score
        # smaller than tau
        while len(support) > 1:
            idx_least_imp_feature = np.where(trim_score == trim_score.min())[0]
            if trim_score[idx_least_imp_feature[0]] > tau:
                break
            idx_least_imp_feature = idx_least_imp_feature[0]
            support = np.delete(support, idx_least_imp_feature)
            c_pred = narrow_fit(W_tilda, b_tilda_one_var, support,
                                s_rhs, s_lhs_one_var)
            trim_score = compute_trim_score(W_column_norm, support, c_pred)
            cv_err_list[i] = compute_cross_validation_err_v2(
                support, W_tilda, b_tilda_one_var)
            support_list[i] = support
    # Convert the list of cross validation error to np.array and choose the
    # support corresponding to sparsity levels with the smallest cross
    # validation error
    cv_err_list = np.array(cv_err_list)
    cross_idx = np.argmin(cv_err_list)
    support_pred = support_list[int(cross_idx)]

    # Performing least square fit on the predicted support and scale the result
    # back
    relative_scale = s_rhs / s_lhs_one_var
    coeff_sp = least_square_adp(
        W_tilda[:, support_pred],
        b_tilda_one_var) * (relative_scale[support_pred].reshape(-1, 1))

    return support_pred, coeff_sp.flatten()


# The function performs least-square fitting
def least_square_adp(A: np.array, b: np.array) -> np.array:

    """
    This script solves least square problem Ax = b. In the case of A and b both
    being a constant number,
    it simply performs division x = b / A.

    Args:
        A (np.array): array of shape (N, L).
        b (np.array): array of shape (N, n).

    Returns:
        np.array: array of shape (L, n).
    """

    if A.shape[0] == 1:
        x = b / A
    else:
        x, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
    return x


# This function is used to set up an appropriate maximum sparsity level for a
# differential equation.
def set_sparsity_level(is_1d_ode: bool, num_of_u: int, dim_x: int) -> int:
  """This function set maximum sparsity level for support recovery.

  Args:
      is_1d_ode (bool): whether or not given data is 1d ode data.
      num_of_u (int): number of variables.
      dim_x (int): spatial dimension.

  Returns:
      int: sparsity level
  """
  if is_1d_ode:
      if num_of_u <= 2:
          sparsity = 10
      else:
          sparsity = 15
  else:
      if num_of_u == 2 and dim_x == 2:
          sparsity = 25
      else:
          sparsity = 10
  return sparsity

# This function computes the trim score used for trimming
def compute_trim_score(w_column_norm: np.array, support: np.array,
                       c_pred: np.array) -> np.array:
    """
    This function computes the trimming score vector for given support and
    coefficient values.
    Args:
        w_column_norm (np.array): column norm vector of feature matrix w
        support (np.array): a support vector that stores the index of candiate
        features for each variable.
        c_pred (np.array): a coefficient vector that stores the values of
        candiate features for each variable.

    Returns:
        np.array: a trimming score vector for each candidate feature
    """
    trim_score = w_column_norm.flatten()[support] * c_pred.flatten()
    trim_score = trim_score / np.max(trim_score)
    return trim_score


# This funciton performs least square fitting once a support is identified
def narrow_fit(w: np.array, b: np.array, support: np.array, s_rhs: np.array,
               s_lhs: np.float64) -> np.array:
    """
    This function perform narrow least square fit for the problem wb = c.
    Notice that here w and c have to be error normalized features. Rescaling
    step should be done before this
    function is called. Narrow-fit is proposed in WeakIdent to help increase
    the accuracy of coefficients. A
    hyghly dynamic region is found and scaling matrix is calculated to compute
    error normalized feature matrix and normalized dynamic variable.

    Args:

        w (np.array): \Tilda{W}_narrow, error normalized feature matrix, array
        of shape (NxNt, L).
        b (np.array): \Tilda{b}_narrow, error normalized dynamic variable, array
        of shape (NxNt, 1).
        support (np.array): a support vector that stores the index of candiate
        features for each variable.
        s_rhs (np.array): scale vector s for rhs features, array of shape (L, ).
        s_lhs (np.float64): scale number for lhs feature (u_t).

    Returns:
        np.array: the coefficient values for a given support
    """
    c_pred = least_square_adp(w[:, support], b)
    c_pred = np.absolute(c_pred * s_rhs[support].reshape(-1, 1) / s_lhs)
    return c_pred


# This function finds a support given a sparsity level
def subspace_persuit(phi: np.array, b: np.array, k: int) -> np.array:

  """
  This function finds the support for a sparse vector c such that phi * x = b
  where k numbers in c
  are nonzero. Note: Subspace Persuit is a greedy algorithm which minimizes
  residual error when searching
  for a support.

  Args:
      phi (np.array): the RHS feature matrix W, array of shape (N, L)
      b (np.array): LHS feature matrix b, array of shape (N, 1)
      k (int): sparse level

  Returns:
      np.array: array of shape (k,). a support vector that stores the index of
      candiate features for each variable.
  """

  itermax = 15
  n = len(phi[0])
  is_disp = 0
  b_t = np.transpose(b)
  cv = np.absolute(np.matmul(b_t, phi))
  cv_index = np.argsort(-cv).flatten()
  lda = cv_index[:(k + 1)]
  phi_lda = phi[:, lda]
  if is_disp:
      print(np.sort(lda))
  x = least_square_adp(np.matmul(np.transpose(phi_lda), phi_lda),
                        np.matmul(np.transpose(phi_lda), b).reshape(-1, 1))
  r = b - np.matmul(phi_lda, x)
  res = np.linalg.norm(r, 2)
  if res < 10**(-12):
      X = np.zeros(n)
      X[lda] = np.transpose(x)
      supp = [i for i in range(phi.shape[1]) if X[i] != 0]
      return np.array(supp)
  usedlda = np.zeros(n)
  usedlda[lda] = 1
  for _ in range(itermax):
      res_old = res
      cv = np.absolute(np.matmul(np.transpose(r), phi))
      cv_index = np.argsort(-cv).flatten()
      sga = np.union1d(lda, cv_index[:k + 1])
      phi_sga = phi[:, sga]
      x_temp = least_square_adp(np.matmul(np.transpose(phi_sga), phi_sga),
                                np.matmul(np.transpose(phi_sga), b))
      x_temp_index = np.argsort(-np.absolute(x_temp.flatten())).flatten()
      lda = sga[x_temp_index[:k + 1]]
      phi_lda = phi[:, lda]
      usedlda[lda] = 1
      x = least_square_adp(np.matmul(np.transpose(phi_lda), phi_lda),
                            np.matmul(np.transpose(phi_lda), b))
      r = b - np.matmul(phi_lda, x)
      res = np.linalg.norm(r, 2)
      X = np.zeros(n)
      X[lda] = np.transpose(x)
      if res / res_old >= 1 or res < 10**(-12):
          supp = [i for i in range(phi.shape[1]) if X[i] != 0]
          return np.array(supp)


# This function computes the cross validation error (from a random split w.r.t.
# ratio 1/100).
def compute_cross_validation_err_v2(support: np.array,
                                    w: np.array,
                                    b: np.array,
                                    iter_max=30) -> np.float64:

    """
    This function returns the cross validation error for the support of vector c
    in the least square problem Wc = b.
    Note: Cross-validation error is computed over 30 times and take mean + std
    as our final result for a stablized error.
    Args:
        support (np.array): a support vector that stores the index of candiate
        features for each variable.
        w (np.array): error normalized matrix w of shape (N, L).
        b (np.array): error normalized vector b of shape (N, 1).
        iter_max (int, optional): maximum number of iterations. Defaults to 30.

    Returns:
        np.float64: cross-validation error (modified version).
    """

    err_cross_accu = []
    for _ in range(iter_max):
        err_cross_accu.append(compute_cross_validation_error(support, w, b))
    err_cross_accu = np.array(err_cross_accu)
    err = np.mean(err_cross_accu) + np.std(err_cross_accu)
    return err


def compute_cross_validation_error(support: np.array,
                                  w: np.array,
                                  b: np.array,
                                  ratio=1 / 100) -> np.float64:

  """
  This function computes the cross validation error (from a random split w.r.t.
  ratio 1/100).

  Args:
      support (np.array): a support vector that stores the index of candiate
      features for each variable.
      w (np.array): error normalized matrix w of shape (N, L).
      b (np.array): error normalized vector b of shape (N, 1).
      ratio (_type_, optional): ratio between two partitions of w. Defaults to
      1/100.

  Returns:
      np.float64: cross-validation error
  """

  n = len(b)
  inds = np.random.permutation(n)
  k = int(np.floor(n * ratio - 1))  # end of part 1
  w = w[inds, :]
  b = b[inds, :]
  coeff = np.zeros(w.shape[1])
  coeff[support] = least_square_adp(w[:k, support], b[:k]).flatten()
  e1 = np.linalg.norm(w[k:, :] @ coeff.reshape(-1, 1) - b[k:])
  coeff = np.zeros(w.shape[1])
  coeff[support] = least_square_adp(w[k:, support], b[k:]).flatten()
  e2 = np.linalg.norm(w[:k, :] @ coeff.reshape(-1, 1) - b[:k])
  return e1 * (1 - ratio) + e2 * ratio

In [None]:
'''
Now we test the PDE identificaion function using our feature matrix W generated
from the following convection diffusion equation:
u_{t} =  - a*u_x + niu*u_{xx} with a = 1, niu = 0.1
'''

# Initialize parameters for identification
tau = 0.05 # default the trimming threshold to 0.05

# Parameters related to the PDE
num_of_variables = 1 # num of equations (variables) to identify
dim_x = 1 # sptial dimension of given data
is_1d_ode = False
W_b_large = W
S_b_large = np.eye(W.shape[1] - num_of_variables) # Now we didn't implement the
# error normalization part and regard the error normalization matrix as the
# identity matrix

# Performing PDE identification using the model
W, b, c = diff_eqn_identification(tau, num_of_variables, dim_x, is_1d_ode,
                                  W_b_large, S_b_large)


# Printing our the solution
#print("RHS feature W: \n", W)
#print("\n LHS feature b: \n", b)
print("\n Predicted coefficient vector: \n", c)
print("\n Actual coefficient vector: \n",
      np.array([0, 0, -1, 0.1, 0, 0, 0, 0, 0, 0]).reshape(-1, 1))


 Predicted coefficient vector: 
 [[ 0.        ]
 [ 0.        ]
 [-0.99982319]
 [ 0.09998591]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]
 [ 0.        ]]

 Actual coefficient vector: 
 [[ 0. ]
 [ 0. ]
 [-1. ]
 [ 0.1]
 [ 0. ]
 [ 0. ]
 [ 0. ]
 [ 0. ]
 [ 0. ]
 [ 0. ]]
