# Example 1: Heat Equation, Constant p(x)

### Problem Information
Type: Heat Equation

Linearity: Linear

### Equation

The heat equation takes the form:

$p(x) y_{xx} = f(x)$

$k T_{xx} = q(x)$

### Coefficients
p(x): Constant

$p(x) = 20.6$

Manganin, Room Temp, Source: https://permanent.access.gpo.gov/LPS112783/NSRDS-NBS-8.pdf

q(x): Constant (q=0)

f(x): 1.5*sin(x) + 2

### Learning Goals

Assume: Known $f(x)$

Input: Data in $u$

Output: Operator L, including parametric coefficient p(x)


In [1]:
%load_ext autoreload
%autoreload 2

# Third-Party Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Package Imports
from tools.variables import DependentVariable, IndependentVariable
from tools.term_builder import TermBuilder
from tools.differentiator import Differentiator
from tools.regressions import *
from tools.diffeqsolver import IVPSolver, BVPShooter
from tools.misc import report_learning_results, split_list
from tools.DatapoolProcessor import DatapoolProcessor
from tools.odes import sturm_liouville_function, generate_random_ics, piecewise_p
from tools.plotter import Plotter, compute_coefficients


np.random.seed(seed=1)

In [2]:
%%time

## STEP 1. SOLVE THE STURM-LIOUVILLE IVP

# Define the parameters of the S-L IVP
p = lambda x: 20.6 # Room temp Manganin
q = 0
r = 0
mu = 0
f = lambda x: 0.25*np.sin(x)+2

# If desired, impart nonlinearity here:
alpha = 0

# Randomly generate initial values to use for solving S-L IVPs
init_vals_list = generate_random_ics(exp_range=5, repeats=2, start_from = -1)

# Define the independent variable
x_min = 0.1; x_max = 30; dx = 0.1;

# Configure the IVP solver:
ivp_solver = IVPSolver(t_min=x_min, t_max=x_max, dt=dx)

# Prepare the BVP shooting method
bvp_solver = BVPShooter(ivp_solver, attempts=50)


# Create a lambda function for the ODE solver to use
# And get BVP type solutions
all_sols = []

for ics in init_vals_list:
    sl_func = lambda x, y: sturm_liouville_function(x, y, p=p(x), q=q, alpha=alpha, f=f(x))
    sol = bvp_solver.shoot(sl_func, ics, increment = 10)
    all_sols.append(sol) 

# And generate ODE solutions!
ode_sols = bvp_solver.get_unique_sols(all_sols)

# Report number of solutions generated.      
print("Created", len(ode_sols), "differential equation solutions.")

Created 8 differential equation solutions.
CPU times: user 14.2 s, sys: 90 ms, total: 14.2 s
Wall time: 14.2 s


In [3]:
%%time

## STEP 2. BUILD DATAPOOLS
# Datapools are a table of numerically evaluated terms, which will be used to create $\Theta$ and $\U$ and learn $\Xi$

# Provide target regression term (u):
u_term = 'd^{2}u/dx^{2}'

# Configure a differentiatior to numerically differentiate data
diff_options = Differentiator(diff_order = 3, 
                              diff_method = 'poly', 
                              cheb_width = int(((x_max-x_min)/dx)/20), 
                              cheb_degree = 4)


# Build library and u for each ODE solution
def build_datapools(ode_sols, diff_options, u_term, use_noise:bool = False, noise_mag:float = 0.001, noise_iters:int = 1):
    # Create empty lists to house the stacked theta's and u's 
    datapools = []

    for sol in ode_sols:
        # If the solution did not converge, do not use it.
        if sol.status != 0: continue

        # Prepare "Variable" objects from independent and dependent variables
        x_data = sol.t
        x = IndependentVariable('x', x_data, dx, poly_deg = 3)

        signal = sol.y[0]

        if use_noise: 
            for i in range(noise_iters):
                noise = noise_mag*np.std(signal)*np.random.randn(len(signal))
            noise = noise/noise_iters
            signal = signal + noise
            
        u = DependentVariable('u', signal, nonlinear_deg = 3)

        # Create a TermBuilder object for assembling the function library, theta
        term_builder = TermBuilder([x], [u], diff_options)

        # Inject the forcing function into the datapool
        term_builder.register_custom_term("f", f(x_data))

        # Use the TermBuilder to assemble the datapool (numerically evaluated terms)
        datapool = term_builder.assemble_library(u_term)

        # Add the datapool to datapool list
        datapools.append(datapool)
        
    return datapools


datapools = build_datapools(unique_sols, diff_options, u_term)
noisy_dps = build_datapools(unique_sols, diff_options, u_term, use_noise = True, noise_mag = 0.02, noise_iters = 500)

NameError: name 'unique_sols' is not defined

In [4]:
plt.plot(datapools[0]['x'], datapools[0]['u'], 'b.', label = "Clean")
plt.plot(noisy_dps[0]['x'], noisy_dps[0]['u'], 'k--', label = "Noise")
plt.xlabel("x", fontsize=16)
plt.ylabel("Temperature (u)", fontsize=16)
plt.legend()
plt.show()

NameError: name 'datapools' is not defined

In [None]:
%%time

## STEP 3. ORGANIZE AND FORMAT DATA FOR LEARNING
## Generate training and test data sets

# Split the datapools into training and testing data
train_dps, test_dps = split_list(datapools)

# Create a DatapoolProcessor (trims/organizes/groups data)
cdpp = DatapoolProcessor(train_dps, u_term)
test_cdpp = DatapoolProcessor(test_dps, u_term)

# Group the terms
num_groups = len(train_dps[0])
grouped_theta, grouped_u = cdpp.group_data(num_groups=num_groups, return_stacked=True)
test_theta, test_u = test_cdpp.group_data(num_groups=num_groups, return_stacked=False)

In [None]:
%%time

## STEP 4. LEARN TERMS AND COEFFICIENTS
## Learn terms and coefficients via Grouped Ridge regression

# Group thresholding ridge regression
Xic,Tolc,Lossesc, uXc, uLc = TrainSGTRidge2(grouped_theta.copy(), 
                                      grouped_u.copy(), 
                                      test_theta.copy(),
                                      test_u.copy(), 
                                      lam = 1e-5, 
                                      epsilon = 1e-5,
                                      normalize = 2)

In [None]:
%%time

## STEP 3. ORGANIZE AND FORMAT DATA FOR LEARNING
## Generate training and test data sets

# Split the datapools into training and testing data
train_dps, test_dps = split_list(noisy_dps)

# Create a DatapoolProcessor (trims/organizes/groups data)
ndpp = DatapoolProcessor(train_dps, u_term)
test_ndpp = DatapoolProcessor(test_dps, u_term)

# Group the terms
num_groups = len(train_dps[0])
grouped_theta, grouped_u = ndpp.group_data(num_groups=num_groups, return_stacked=True)
test_theta, test_u = test_ndpp.group_data(num_groups=num_groups, return_stacked=False)

In [None]:
%%time

## STEP 4. LEARN TERMS AND COEFFICIENTS
## Learn terms and coefficients via Grouped Ridge regression

# Group thresholding ridge regression
Xin,Toln,Lossesn, uXn, uLn = TrainSGTRidge2(grouped_theta.copy(), 
                                      grouped_u.copy(), 
                                      test_theta.copy(),
                                      test_u.copy(), 
                                      lam = 1e-5, 
                                      epsilon = 1e-5,
                                      normalize = 2)

In [None]:
## Use a reporting method to print results:
print("Clean data results:")
report_learning_results(cdpp, Xic, Lossesc, uXc, uLc, report_all_possible=False)

print("Noisy data results:")
report_learning_results(ndpp, Xin, Lossesn, uXn, uLn, report_all_possible=False)

In [None]:
from tools.plotter import Plotter

### PLOT RESULTS

# Choose method to use results from
pde_find = Xic[np.argmin(Lossesc)]
test_mse = uXc[np.argmin(uLc)]
xi = pde_find


# Define text to add to figures
text_str = '\n'.join((
    r'$ODEs={}$'.format(len(unique_sols)),
    r'$Groups={}$'.format(num_groups),
    r'$\alpha={}$'.format(0)))
text_str = r'$ODEs={}$'.format(len(unique_sols))


## Plot results
pltr = Plotter(unique_sols, cdpp, xi, 
               independent_variable='x', 
               dependent_variable='u', 
               text_str=text_str, 
               text_props=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

pltr.plot_ode_solutions(title="Linear Sturm-Liouville Problem")
pltr.plot_xi_results(p,q,mu,r,f,alpha)
grouped_theta, grouped_u = cdpp.group_data(num_groups=num_groups, return_stacked=True)
pltr.plot_learned_vs_true_data(grouped_theta, k=0)
pltr.plot_p_and_q()

print("Clean Results:")
plt.show()


In [None]:
print("Noisy Results:")

# Choose method to use results from
pde_find = Xin[np.argmin(Lossesn)]
test_mse = uXn[np.argmin(uLn)]
xi = pde_find

## Plot results
pltr = Plotter(unique_sols, ndpp, xi, 
               independent_variable='x', 
               dependent_variable='u', 
               text_str=text_str, 
               text_props=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

pltr.plot_ode_solutions(title="Linear Sturm-Liouville Problem")
pltr.plot_xi_results(p,q,mu,r,f,alpha)
grouped_theta, grouped_u = cdpp.group_data(num_groups=num_groups, return_stacked=True)
pltr.plot_learned_vs_true_data(grouped_theta, k=0)
pltr.plot_p_and_q()

plt.show()


In [None]:
plt.figure()
plt.plot(datapools[0]['x'], datapools[0]['d^{2}u/dx^{2}'], label = "Clean")
plt.plot(noisy_dps[0]['x'], noisy_dps[0]['d^{2}u/dx^{2}'], label = "Noise")
plt.title("Clean vs Noisy $u_{xx}$", fontsize=18)
plt.xlabel("x", fontsize=16)
plt.ylabel(r'$u_{xx}$', fontsize=16)
plt.legend()


plt.figure()
for i, sol in enumerate(unique_sols):
    plt.plot(sol.t, sol.y[0], 'k--', label = "Sol {}".format(i))
plt.xlabel("x", fontsize=16)
plt.ylabel("Temperature (u)", fontsize=16)
plt.legend()
plt.show()