# overfit_demo.ipynb
# WESmith 06/27/23
## Demonstrate underfittig/overfitting of simple functions using pytorch
## using some techniques from
### https://pytorch.org/tutorials/beginner/pytorch_with_examples.html?highlight=polynomial

In [None]:
import torch
import torch.nn as nn
import numpy    as np
import math
import matplotlib.pyplot as plt

In [None]:
lim         = 1.0
npts        = 1000
max_order   = 7 # maximum polynomial order to use
noise_scale = 0.1

In [None]:
min, max = (-lim, lim) #(-0.1, 0.1)  # x limits
x = torch.linspace(min, max, npts)

## POLYNOMIAL BASIS USED TO CREATE DATA AND TO FIT THE NOISY DATA

In [None]:
p  = torch.tensor(range(max_order + 1))
xx = x.unsqueeze(-1).pow(p)  # important to turn (npts) vector into (npts,1) vector for this to work
fig = plt.figure(figsize=(6, 6))
plt.plot(x, xx)
plt.grid()
plt.show()

In [None]:
# set seed here if desired
coeffs  = torch.randn(max_order + 1)
y       = xx @ coeffs  # clean random signal using polynomial basis
train   = y + noise_scale * torch.randn(npts)  # noisy signal
test    = y + noise_scale * torch.randn(npts)

In [None]:
fig = plt.figure(figsize=(12, 6))
plt.plot(x, y,     'r',  label='original polynomial')
plt.plot(x, train, 'b.', label='training data')
plt.plot(x, test,  'g.', label='testing data')
plt.legend()
plt.grid()
plt.show()

In [None]:
# this model uses the fixed polynomial basis as input: it is just estimating
# the max_order + 1 unknown polynomial coefficients
# turn off bias in NN, since constant term is in polynomial
# this is a one-neuron model, with max_order + 1 inputs to the neuron
# this is pure classical linear regression using SGD instead of algebraic (AT*A)^-1 * AT
model = nn.Sequential(nn.Linear(max_order + 1, 1, bias=False), nn.Flatten(0, 1))

In [None]:
# explanation of model dimensions:
# (n_samples, max_order + 1) input array x ((max_order + 1) x 1) array to be trained = (n_samples x 1)
# nn.Flatten(0, 1) transforms (n_samples x 1) array into (n_samples) array output
# nn.Flatten(start_dim, end_dim) multiplies start_dim x intermediate_dims x end_dim to flatten that range
# see nn.Flatten? examples
#nn.Flatten?

In [None]:
def train_model(n_iter, model, train, target, test=None, lr=1e-3, print_iter=100):
    optimizer = torch.optim.RMSprop(model.parameters(), lr=lr)
    loss_fn   = nn.MSELoss(reduction='sum')
    for t in range(n_iter):
        y_pred = model(train)
        loss   = loss_fn(y_pred, target)
        if t % print_iter == 0:
            if test is None:
                print(f'iter: {t:5}, train: {loss.item():10.3f}')
            else:
                with torch.no_grad(): # make sure gradients aren't affected
                    loss_test = loss_fn(y_pred, test)
                print(f'iter: {t:5}, train: {loss.item():10.3f}, test: {loss_test.item():10.3f}')
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [None]:
# xx is the FIXED polynomial basis, the model() contains the learned poly coefficients
# NOTE that this isn't the usual paradigm for NN training: normally the training set
# is input to the model, and the target is compared to the predictions; here the training
# set IS the target, and the training set is the xx, the fixed polynomial basis
# ie, here xx = training, train = target
train_model(10001, model, xx, train, test=test, lr=1e-3, print_iter=500)

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 5), sharex=False)
xvals = np.arange(len(coeffs))
ax1.plot(xvals, coeffs, 'ro', label='true coeffs')
ax1.plot(xvals, model[0].weight.squeeze().detach().numpy(), 'bo', label='estimated coeffs')
ax1.grid()
ax1.set_title('True and Estimated Polynomial Coefficients')
ax1.legend()
ax2.plot(x, y, 'r', label='true polynomial')
ax2.plot(x, model(xx).detach().numpy(), 'b', label='estimated polynomial')
ax2.grid()
ax2.set_title('True and Estimated Polynomial')
ax2.legend()
plt.tight_layout()
plt.show()

## FIT POLYNOMIAL TO SINE

In [None]:
class Sine():
    '''
    npts  = number of points to evaluate sine over x = (-1, 1) interval
    freq  = cycles per (0, 1) x-axis interval (ie, NOT (-1, 1) interval), can be fractional
    amp   = amplitude of sine (default 1.0)
    phase = shift in DEGREES independent of frequency: positive shifts to the right
            ie: a shift in -90 deg will turn the sine into a cosine regardless of frequency
            (default 0.0 deg)
    NOTE: keeping x-axis from -1 to 1 for stability in neural-net applications
    NOTE2: 
    '''
    def __init__(self, npts, freq, amp=1.0, phase=0.0):
        self.npts  = npts
        self.freq  = freq
        self.amp   = amp
        self.phase = phase  # in degrees
        rad_phase  = phase * math.pi / 180
        self.x     = torch.linspace(-1.0, 1.0, npts)
        self.scale = 2.0 * math.pi * freq
        self.out   = amp * torch.sin(self.scale * self.x - rad_phase)

    def plot(self, polyfit=False, wid=14, hei=4):
        # polyfit = number of the polynomial fit (int)
        # polynomial fitting is only accurate for phase = 0.0
        if polyfit:
            self.sine_coeffs(polyfit + 1)
            p  = torch.tensor(range(polyfit + 1))
            xx = self.x.unsqueeze(-1).pow(p)  # create the polynomial basis
            y  = self.amp * xx @ self.coeffs  # matrix multiply  (N x polyfit) * (polyfit x 1)
        fig, ax = plt.subplots(figsize=(wid, hei))
        ax.plot(self.x, self.out, 'b', label=f'sin(2*pi*{self.freq}*x)')
        if polyfit:
            ax.plot(self.x, y, 'r', label=f'polynomial fit with {polyfit} terms')
        ax.grid()
        ax.set_aspect('auto')
        ax.autoscale(enable=True, tight=True)
        ax.set_ylim(-2.0, 2.0)
        plt.tight_layout()
        plt.legend()
        
    def __call__(self):
        return self.x, self.out
    
    def sine_coeffs(self, n):
        # convenience function to get the 'n' Taylor sine coefficients scaled to the frequency
        coeffs = torch.zeros(n)
        v = 1
        for i in range(n):
            if i % 2 == 1:
                coeffs[i] = v * self.scale**i / np.math.factorial(i)
                v *= -1
        self.coeffs = coeffs
        return self.coeffs

In [None]:
dd = Sine(1000, 5, amp=1.5, phase=0)
dd.plot(polyfit=31, wid=10,hei=3)
x, y = dd()
coeffs = dd.sine_coeffs(31)
x.shape, y.shape, coeffs

In [None]:
npts        = 1000
max_order   = 7 # maximum polynomial order to use
noise_scale = 1.0
freq        = 1.0 # sine freq
#sine_coeffs = torch.tensor(get_sine_coeffs(max_order + 1), dtype=torch.float32)
dd = Sine(npts, freq)

In [None]:
dd.plot(max_order, 8, 3)

In [None]:
#x = torch.linspace(-limit * math.pi, limit * math.pi, npts)
x, ysin = dd()
sine_coeffs = dd.sine_coeffs(max_order + 1)

In [None]:
sine_coeffs

In [None]:
p  = torch.tensor(range(max_order + 1))
xx = x.unsqueeze(-1).pow(p)

In [None]:
xx.shape

In [None]:
#ysin = torch.sin(x)

In [None]:
# same linear-regression model as above
model   = nn.Sequential(nn.Linear(max_order + 1, 1, bias=False), nn.Flatten(0, 1))
lr      = 1e-3  # learning rate

In [None]:
#def train_model(n_iter, model, train, target, test, lr=1e-3, print_iter=100):

In [None]:
train_model(60001, model, xx, ysin, lr=1e-2, print_iter=4000)

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6), sharex=False)
xvals = np.arange(max_order + 1)
ax1.plot(xvals, sine_coeffs, 'go', label='true coeffs')
ax1.plot(xvals, model[0].weight.squeeze().detach().numpy(), 'ro', label='estimated coeffs')
ax1.grid()
ax1.set_title('True and Estimated Polynomial Coefficients')
ax1.legend()
ax2.plot(x, ysin, '#BBBBBB', linewidth=10.0, label='exact sine')
ax2.plot(x, xx @ sine_coeffs, 'g', label='limited-Taylor-poly sine')
ax2.plot(x, model(xx).detach().numpy(), 'r', label='model-estimated sine')
ax2.set_ylim(-2.0, 2.0)
ax2.grid()
ax2.set_title('True and Estimated Coefficients')
ax2.legend()
plt.tight_layout()
plt.show()

## NEURAL NET TO SINE: NO POLYNOMIAL BASIS IS USED

In [None]:
npts        = 1000
noise_scale = 0.01
limit       = 2.0
batch_size  = 32

In [None]:
class NeuralNet(nn.Module):
    
    def __init__(self, n1, n2): # n1, n2 are the sizes of the layers
        super().__init__()
        # ReLU produced jagged results fitting the sine, Tanh much smoother
        self.stack = nn.Sequential(nn.Linear(1, n1), 
                                   nn.Tanh(),
                                   nn.Linear(n1, n2),
                                   # do not want a nonlinearity at the end of this regressiion model
                                   nn.Tanh(),
                                   nn.Linear(n2, 1))
        
    def forward(self, x):
        x = self.stack(x).squeeze()
        return x
    
    def num_params(self):  # WS addition
        count = 0
        for k in self.parameters():
            count += k.numel()
        return count

In [None]:
# set up torch Dataset object for the DataLoader
class SineData(torch.utils.data.Dataset):
    
    def __init__(self, limit, npts, noise_scale=0.0):
        super().__init__()
        # x is the training set: each value of x is like a 1-parameter feature
        # use view() to add singleton dimension to x
        self.x     = torch.linspace(-limit * math.pi, limit * math.pi, npts).view(-1, 1)
        self.targ  = torch.sin(self.x).squeeze() + noise_scale * torch.randn(npts)
        
    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.targ[idx]
    
    def plot(self): # WS addition
        fig = plt.figure()
        plt.plot(self.x.numpy(), self.targ.numpy(), 'b.')
        plt.grid()
        plt.show()

In [None]:
train_data = SineData(limit, npts, noise_scale)
test_data  = SineData(limit, npts, noise_scale)

In [None]:
train_data.plot()

In [None]:
test_data.plot()

In [None]:
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)

In [None]:
test_loader  = torch.utils.data.DataLoader(test_data, batch_size=batch_size, shuffle=True)

In [None]:
#for k in enumerate(train_loader):
#    print(k[0], k[1][0].shape, k[1][1].shape)

In [None]:
#d1, d2 = next(iter(test_loader))
#d1.T, d2

In [None]:
def train_with_batch(model, train_loader, test_loader=None, epoch=0, print_iter=3, lr=1e-3, pp=False):
    model.train()
    optimizer = torch.optim.RMSprop(model.parameters(), lr=lr)
    loss_fn   = nn.MSELoss(reduction='sum')
    for batch_idx, (data, target) in enumerate(train_loader):
        y_pred = model(data)
        loss   = loss_fn(y_pred, target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        txt = f'epoch: {epoch:4}, batch: {batch_idx:4}, train: {loss.item():10.3f}'
        if batch_idx % print_iter == 0 and pp:
            if test_loader is None:
                print(txt)
            else:
                with torch.no_grad(): # make sure gradients aren't affected
                    # target_loader will run out before test_loader since this
                    # called only periodically (assuming test batch size <= train batch size)
                    (test_data, test_target) = next(iter(test_loader))
                    y_pred    = model(test_data)
                    loss_test = loss_fn(y_pred, test_target)
                print(txt + f', test: {loss_test.item():10.3f}')

In [None]:
new_model = NeuralNet(10, 10)
lr = 1e-3
print(new_model.num_params())

In [None]:
for k in range(500):
    update = True if k % 50 == 0 else False
    train_with_batch(new_model, train_loader, test_loader=None,
                     epoch=k, print_iter=15, lr=1e-4, pp=update)

In [None]:
fig, ax1 = plt.subplots(1, 1, figsize=(12, 6), sharex=False)
x = train_data.x
ax1.plot(x.numpy(), torch.sin(x).squeeze(),        'r', label='true sine')
ax1.plot(x.numpy(), new_model(x).detach().numpy(), 'b', label='estimated sine')
ax1.grid()
ax1.set_title('True and Estimated Sine')
ax1.legend()
plt.show()