In [1]:
import numpy as np
from numpy.linalg import lstsq, inv
from scipy.linalg import sqrtm
from scipy.sparse import diags, csr_matrix
import plotly.express as px
import plotly.graph_objects as go
from plotly.offline import init_notebook_mode, iplot
from plotly.subplots import make_subplots
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error as mse
from functools import reduce

init_notebook_mode(connected=True) 
np.random.seed(42)

# helper functions
def addVertLinePlotly(fig, x0=0, y0=0, y1=1):
    """ plots a vertical line to the given figure at position x"""
    fig.add_shape(dict(type="line", x0=x0, x1=x0, y0=y0, y1=1.2*y1, 
                       line=dict(color="LightSeaGreen", width=1)))
    return


def secondOrderDiff(y):
    """ calculate the 2nd order difference for a given series y """
    a = [y[i] - 2*y[i+1] + y[i+2] for i in range(len(y)-2)]
    return a 

In [2]:
# test functions
def f1(x): 
    return np.tanh(-(x-5)) + np.exp(-(x)**2) + np.random.randn(len(x))*0.1

def f2(xmin=0, xmax=1, n=1000, a=1, b=1, noiseType="gauss"): 
    """ either gaussian noices or decreasing noise"""
    x = np.linspace(xmin,xmax, n)
    noise = []
    for i in x:
        noise.append(np.random.normal(0, (xmax - i)/(xmax - xmin)*0.25))
    
    if noiseType is "gauss":
        return x, a / (1 + (b*x)**2) + np.random.randn(len(x))*0.25
    else:
        return x, a / (1 + (b*x)**2) + noise
    
def f3(x): 
    return 0.5*np.sin(x) + 3.5*np.linspace(0,1,len(x)) + np.random.randn(len(x))*0.2

def f4(x1, x2): 
    return x1**3 - 3*x1*x2**2 #+ np.random.randn(len(x1))*0.1

In [5]:
#def B_Splines
k, m = 10, 2
x = np.linspace(-100, 10,1000)

plot_ModelMatrixBSpline(x, k)

In [4]:
#######################################################################
# trusted
#######################################################################
# BSpline functionality    
#######################################################################
def bSpline(x, k, i, m=2):
    """ 
    evaluate i-th b-spline basis function of order m at the 
    values in x, given knot loactions in k
    """
    if m==-1:
        # return 1 if x is in {k[i], k[i+1]}, otherwise 0
        return (x >= k[i]) & (x < k[i+1]).astype(int)
    else:
        #print("m = ", m, "\t i = ", i)
        z0 = (x - k[i]) / (k[i+m+1] - k[i])
        z1 = (k[i+m+2] - x) / (k[i+m+2] - k[i+1])
        return z0*bSpline(x, k, i, m-1) + z1*bSpline(x, k, i+1, m-1)

# trusted
def modelMatrixBSpline(x, k=0, m=2):
    """ 
    set up model matrix for the B-spline basis
    one needs k + m + 1 knots for a spline basis of order m with k 
    parameters,
    k - number of parameters (== number of B-splines)
    """
    n = len(x) # n = number of data
    assert (k > 0), "number of B-splines k need to be specified and larger than 0"
    xmin, xmax = np.min(x), np.max(x)
    xk = np.quantile(a=x, q=np.linspace(0,1,k))
    dx = xk[m+3] - xk[m+2]
    xk = np.insert(xk, 0, np.arange(xmin-(m+1)*dx, xmin, dx))
    xk = np.append(xk, np.arange(xmax+dx, xmax+(m+2)*dx, dx))
    X = np.zeros(shape=(n, k))
    for i in range(k):
        X[:,i] = bSpline(x=x, k=xk, i=i+1, m=m)
    return X, xk

#trusted
def pSplinePenalty(k):
    """ 
    compute the P-spline second order difference penalty matrix of 
    dimension k according to p.150 Wood 2006 
    """
    assert (type(k) is int), "Type of input k is not int"
    a =np.eye(k, dtype=np.int)
    P = np.diff(a, axis=0)
    S = P.T @ P
    return S

def plot_ModelMatrixBSpline(x, k, m=2):
    X, xk = modelMatrixBSpline(x, k=k, m=m)

    fig = go.Figure()
    for i in range(X.shape[1]):
        fig.add_trace(go.Scatter(x=x, y=X[:,i], 
                                 name=f"BSpline {i+1}", mode="lines"))
    for i in xk:
        addVertLinePlotly(fig, x0=i)
    fig.show()
    return

In [6]:
# trusted
def iterativeFit(x, y, k, type_="mi", lam_p=0.1, lam_c = 1e3, extremumIdx=None):
    # get B-Spline basis
    X, xk = modelMatrixBSpline(x, k=k)
    beta = np.zeros(X.shape[1])
    
    yExt = x[np.argmax(y)] if type_ is "peak" else x[np.argmin(y)]
    extremumIdx = int(np.argwhere(xk > yExt)[0]- 1)
    # print(f"Shape of model matrix: {X.shape}")
    # get P-Spline penalty matrix
    P_matrix = D1_differenceMatrix(k)
    # print(f"Shape of smooth-penalty matrix: {P_matrix.shape}")
    # get monotonicity penalty and weight matrix
    D1 = D1_differenceMatrix(k=k)    
    D2 = D2_differenceMatrix(k=k)
    
    if type_ in ["mi", "md"]:
        D = D1
    else:
        D = D2
        
    beta_hat = inv(X.T@X + lam_p * P_matrix.T @ P_matrix) @ X.T @ y
    y_hat = X @ beta_hat
    error = mse(y, y_hat)

    for i in range(3):
        V = V_weightMatrix(k=k, type_=type_, beta=beta_hat, extremumIdx=extremumIdx)
        xdot = X.T @ X
        pdot = P_matrix.T @ P_matrix
        ddot = D.T @ V @ D
        beta_hat = inv(xdot + lam_p * pdot + lam_c * ddot) @ X.T @ y
        y_hat = X @ beta_hat
        error = mse(y, y_hat)
        #print("Nonzeros in V: ", np.count_nonzero(np.diag(V)))
        #print(f"MSE: {np.round(error, 5)}")
    
    return beta_hat, y_hat

In [7]:
#######################################################################
# trusted
#######################################################################
# PENALTY MATRICES
#######################################################################
def V_weightMatrix(k=0, type_="mi", beta=None, extremumIdx=None):
    """
    calcualte the weight matrix for the different types of penalty,
    e.g. md, mi, convex, concave
    """
    notGivenError = " are of wrong type or not given!"
    if type_ is "mi":
        d = (np.diff(beta) < 0).astype(int)
        d = np.append(d, 0)
        addZeros = np.zeros(k)
    elif type_ is "md":
        d = (np.diff(beta) > 0).astype(int)
        d = np.append(d, 0)
        addZeros = np.zeros(k)
    elif type_ is "convex":
        assert (beta is not None), "beta"+notGivenError
        D2 = D2_differenceMatrix(k=k)
        d = (D2 @ beta < 0).astype(int)
    elif type_ is "concave":
        assert (beta is not None), "beta"+notGivenError
        D2 = D2_differenceMatrix(k=k)
        d = (D2 @ beta > 0).astype(int)
    elif type_ is "peak":
        assert (extremumIdx is not None), "extremumIdx"+notGivenError
        P = Peak_Matrix(k=k, peakIdx=extremumIdx)
        d = (P @ beta < 0).astype(int)
    elif type_ is "valley":
        assert (extremumIdx is not None), "extremumIdx"+notGivenError
        V = Valley_Matrix(k=k, valleyIdx=extremumIdx)
        d = (V @ beta < 0).astype(int)
    else:
        print(f"Requested type {type_} not implemented so far!")
        d = np.ones(k)
    
    offset = 0
    V = diags(d, offset, dtype=np.int).toarray()
    #print("Shape of V-Matrix: {}".format(V.shape))
   
    return V

# trusted
def Peak_Matrix(k=0, peakIdx=0):
    """ 
    calculate the peak penalty matrix for monotonic increasing
    and monotonic decreasing peak
    """
    assert (type(k) is int), "Type of input k is not int"
    assert (type(peakIdx) is int), "Type of input peakIdx is not int"

    dia = -1*np.ones(k)
    dia[peakIdx] = 0
    #dia[peakIdx-1:peakIdx+1] = 0
    dia[peakIdx+1:] = 1
    offdia = -1 * dia
    offset = [0,1]
    Peak = diags(np.array([dia, offdia]), offset, dtype=np.int).toarray()
    Peak[-1:] = 0
    return Peak

# trusted
def Valley_Matrix(k=0, valleyIdx=0):
    """ 
    calculate the peak penalty matrix for monotonic increasing
    and monotonic decreasing peak
    """
    assert (type(k) is int), "Type of input k is not int"
    assert (type(valleyIdx) is int), "Type of input peakIdx is not int"

    dia = 1*np.ones(k)
    dia[valleyIdx] = 0
    #dia[peakIdx-1:peakIdx+1] = 0
    dia[valleyIdx+1:] = -1
    offdia = -1 * dia
    offset = [0,1]
    Valley = diags(np.array([dia, offdia]), offset, dtype=np.int).toarray()
    Valley[-1:] = 0
    return Valley

# trusted
def D1_differenceMatrix(k=0):
    """
    calculated the first order difference matrix  
    returns a matrix of size [k x k-1]
    """
    assert (type(k) is int), "Type of input k is not int"
    d = np.array([-1*np.ones(k), np.ones(k)])
    offset=[0,1]
    D1 = diags(d,offset, dtype=np.int).toarray()
    D1[-1:] = 0.
    #print("Shape of D1-Matrix: {}".format(D1.shape))
    return D1

# trusted
def D2_differenceMatrix(k=0):
    """
    calculated the second order difference matrix  
    returns a matrix of size [k x k-1]
    """
    assert (type(k) is int), "Type of input k is not int"
    d = np.array([np.ones(k), -2*np.ones(k), np.ones(k)])
    offset=[0,1,2]
    D2 = diags(d,offset, dtype=np.int).toarray()
    D2[-2:] = 0.
    #print("Shape of D2-Matrix: {}".format(D2.shape))
    return D2

In [9]:
#x, y = f2(xmin=-3, xmax=3, noiseType="nope")
x = np.linspace(0,9,1000)
y = f3(x)

beta_hat, y_hat = iterativeFit(x, y, k=24, type_="mi", lam_p=1, lam_c=1e6)

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02)
fig.add_trace(go.Scatter(x=x, y=y, name="data", mode="markers"), row=1, col=1)
fig.add_trace(go.Scatter(x=x, y=y_hat, name="Fit"), row=1, col=1)
fig.add_trace(go.Scatter(x=x[:-1], y=np.diff(y_hat), name="Differenz in y"), row=2, col=1)
fig.update_yaxes(title_text="", type="log", row=2, col=1)

In [11]:
constrDict = {"smooth": 0, "mi":0, "md":0, "convex":0, "concave":0}
def additiveModel_Matrix(x1, x2, k, y=None):
    """ 
    calculate the model matrix for P-splines for more than one
    dimension
    """
    x1, x2 = np.sort(x1), np.sort(x2)
    X1, X2 = modelMatrixBSpline(x1, k=k), modelMatrixBSpline(x2, k=k)
    X = np.hstack((X1[0], X2[0]))
    SP1, SP2 = sqrtm(pSplinePenalty(k=k)), sqrtm(pSplinePenalty(k=k))
    SP = np.block([[SP1, np.zeros((SP1.shape[0], SP2.shape[1]))],
                   [np.zeros((SP2.shape[0], SP1.shape[1])), SP2]])
    X = np.vstack((X, SP))
    
    if type(y) is None:
        return X.astype(np.float)
    else:
        y = np.append(y, np.zeros(2*k))
        return X.astype(np.float), y

def penalizedModelMatrix(x, y, k=0, b0=None):
    assert (b0 is not None), "Give initial guess for parameter vector beta!"
    penalties = np.array(
        [pSplinePenalty(k=k), 
                 D1_differenceMatrix(k=k).T @ V_weightMatrix(k, "mi", b0, None) @ D1_differenceMatrix(k),
                 D1_differenceMatrix(k=k).T @ V_weightMatrix(k, "md", b0, None) @ D1_differenceMatrix(k),
                 D2_differenceMatrix(k=k).T @ V_weightMatrix(k, "convex", b0, None) @ D2_differenceMatrix(k),
                 D2_differenceMatrix(k=k).T @ V_weightMatrix(k, "concave", b0, None) @ D2_differenceMatrix(k),
        ])
    lam = np.array(list(constrDict.values())).reshape((len(constrDict.values()),-1))
    p = [i * lam[c] for c, i in enumerate(penalties)]
    p2 = list(map(sqrtm, p))
    p3 = [p2[i].astype(np.int) if m==0 else p2[i].astype(np.float) 
          for i,m in enumerate(lam)]

    X = modelMatrixBSpline(x, k)[0]
    prod = reduce((lambda x1,x2: np.vstack((x1,x2))), p3)
    X_aug = np.vstack((X, prod))
    y_aug = np.append(y, np.zeros(int(len(penalties)*k)))
    
    return X_aug, y_aug

def fit_additiveModel_2d(x1, x2, y, k=1, plot_=False):
    """
    fit an additive model for 2 dimensions using P-Splines
    """
    X, y_aug = additiveModel_Matrix(x1=x1, x2=x2, k=k, y=y)
    beta_hat = lstsq(a=X, b=y_aug)[0]
    error = mse(y_aug, X @ beta_hat)
    print(f"Mean Squared Error: {error}")
    if plot_:
        fig = make_subplots(rows=2, cols=2,
                            specs=[[{}, {}], 
                                   [{"colspan": 2, "type":"scene"}, None]],)
        fig.add_trace(go.Scatter(x=x1, y=y, name="data", 
                                 mode="markers"), row=1, col=1)
        fig.add_trace(go.Scatter(x=x1, y=X@beta_hat, mode="markers+lines",
                                 name="Fit"), row=1, col=1)
        
        fig.add_trace(go.Scatter(x=x2, y=y, name="data", 
                                 mode="markers"), row=1, col=2)
        fig.add_trace(go.Scatter(x=x2, y=X@beta_hat, mode="markers+lines",
                                 name="Fit"), row=1, col=2)
        
        fig.add_trace(go.Scatter3d(x=x1, y=x2, z=y, name="data",
                                   mode="markers"), row=2, col=1)
        fig.add_trace(go.Scatter3d(x=x1, y=x2, z=X@beta_hat, name="fit", 
                                   mode="markers"), row=2, col=1)
        fig.update_traces(marker=dict(size=2))
        fig.show()
    return

def fit_additiveModel_2d_with_penalties(x1, x2, y, k=1, plot_=False):
    
    X1_pre = modelMatrixBSpline(x1, k=k)[0]
    X2_pre = modelMatrixBSpline(x2, k=k)[0]
    print("Shape of modelmatrix X1: ", X1_pre.shape)
    print("Shape of modelmatrix X2: ", X2_pre.shape)
    print("Shape of y: ", y.shape)
    b1_pre = lstsq(X1_pre, y)[0]
    b2_pre = lstsq(X2_pre, y)[0]
    
    X1, y1 = penalizedModelMatrix(x1, y, k=k, b0=b1_pre)
    X2, y2 = penalizedModelMatrix(x2, y, k=k, b0=b2_pre)
    X = np.hstack((X1, X2))
    print("Shape of the final model matrix: ", X.shape)
    print("Shape y1: ", y1.shape)
    print("Shape y2: ", y2.shape)
    y = y1 if np.sum(y1 - y2) < 1e-8 else print("Error")
    beta = lstsq(a=X, b=y)[0]
    
    if plot_:
        fig = make_subplots(rows=2, cols=2,
                            specs=[[{}, {}], 
                                   [{"colspan": 2, "type":"scene"}, None]],)
        fig.add_trace(go.Scatter(x=x1, y=y, name="data", 
                                 mode="markers"), row=1, col=1)
        fig.add_trace(go.Scatter(x=x1, y=X@beta, mode="markers+lines",
                                 name="Fit"), row=1, col=1)

        fig.add_trace(go.Scatter(x=x2, y=y, name="data", 
                                 mode="markers"), row=1, col=2)
        fig.add_trace(go.Scatter(x=x2, y=X@beta, mode="markers+lines",
                                 name="Fit"), row=1, col=2)

        fig.add_trace(go.Scatter3d(x=x1, y=x2, z=y, name="data",
                                   mode="markers"), row=2, col=1)
        fig.add_trace(go.Scatter3d(x=x1, y=x2, z=X@beta, name="fit", 
                                   mode="markers"), row=2, col=1)
        fig.update_traces(marker=dict(size=2))
        fig.show()
    
    return beta

In [12]:
def fit_1d(x, y, k=1, smooth=True, plot_=False):
    """
    """
    if smooth:
        beta_init = np.random.random(k)
        Xa, ya = penalizedModelMatrix(x, y, k=k, b0=beta_init)
    else:
        Xa, xk = modelMatrixBSpline(x=x, k=k)
        ya = y
    #print("Shape Xa: ", Xa.shape)
    #print("Shape ya: ", ya.shape)
    beta_hat = lstsq(a=Xa, b=ya)[0]
    error = mse(ya, Xa @ beta_hat)
    print(f"Mean Squared Error: {error}")
    if plot_:
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=x1, y=y, name="data", 
                                 mode="markers"))
        fig.add_trace(go.Scatter(x=x1, y=Xa@beta_hat, mode="lines",
                                 name="Fit"))
        fig.update_traces(marker=dict(size=2))
        fig.show()
    return beta_hat, Xa

In [14]:
# test tensor product splines
np.random.seed(42)
np.set_printoptions(precision=3, suppress=True)
x = np.linspace(0,3,25)
y = np.linspace(10, 100, 25)
xg, yg = np.meshgrid(x,y)
#xg, yg = xg.flatten(), yg.flatten()
k = 6
zg = np.sin(xg) + np.cos(yg/10)**2
z = np.sin(x) + np.cos(y/10)**2
X, xk = modelMatrixBSpline(x, k=k)
Y, yk = modelMatrixBSpline(x, k=k)

beta_ls = lstsq(a=X, b=z)[0]
print(beta_ls)
print(X.shape)

#plot_ModelMatrixBSpline(x, k=7)
#plot_ModelMatrixBSpline(y, k=10)
#px.scatter_3d(x=xg, y=yg, z=z).show()
#print("Shape x: ", xg.shape)
#print("Shape z: ", zg.shape)
beta, Xa = fit_1d(x=xg.flatten(), y=zg.flatten(), k=k, smooth=False, plot_=0)
# fig = go.Figure(go.Surface(x=xg, y=yg, z=zg))
#fig.show()
print(beta)
print(beta_ls)

[-0.678  2.119  0.542  2.48   0.203  1.814]
(25, 6)
Mean Squared Error: 0.1257316987579962
[0.481 1.107 1.486 1.537 1.214 0.656]
[-0.678  2.119  0.542  2.48   0.203  1.814]



`rcond` parameter will change to the default of machine precision times ``max(M, N)`` where M and N are the input matrix dimensions.


`rcond` parameter will change to the default of machine precision times ``max(M, N)`` where M and N are the input matrix dimensions.



In [58]:
!pip install ndsplines

Collecting ndsplines
  Downloading ndsplines-0.1.1-cp37-cp37m-win_amd64.whl (84 kB)
Installing collected packages: ndsplines
Successfully installed ndsplines-0.1.1


In [59]:
!pip install cython



In [46]:
# Test out ndsplines 0.1.1 
# https://pypi.org/project/ndsplines/

import ndsplines
import numpy as np
from scipy import interpolate
import numpy as np

NUM_X = 5
NUM_Y = 5
x = np.linspace(-3, 3, NUM_X)
y = np.linspace(-3, 3, NUM_Y)
meshx, meshy = np.meshgrid(x,y, indexing='ij')
input_coords = np.stack((meshx, meshy), axis=-1)
K = np.array([[1, -0.7,], [-0.7, 1.5]])
meshz = np.exp(-np.einsum(K, [1,2,], input_coords, [...,1], input_coords, [...,2])) + 0.1 * np.random.randn(NUM_X,NUM_Y)
#print("meshx: \n", meshx)
#print("meshy: \n", meshy)
#print("input_coords: \n", input_coords)
xt = [-1, 0, 1]
yt = [-1, 0, 1]
k = 3
xt = np.r_[(x[0],)*(k+1), xt, (x[-1],)*(k+1)]
yt = np.r_[(y[0],)*(k+1), yt, (y[-1],)*(k+1)]
ts = [xt, yt]

samplex = input_coords.reshape((-1,2))
print("Samplex.shape: ", samplex.shape)
print("Shape input_coords: ", input_coords.shape)


Samplex.shape:  (25, 2)
Shape input_coords:  (5, 5, 2)


In [60]:
xt = [-1, 0, 1]
yt = [-1, 0, 1]
k = 3
xt = np.r_[(x[0],)*(k+1),
          xt,
          (x[-1],)*(k+1)]
yt = np.r_[(y[0],)*(k+1),
          yt,
          (y[-1],)*(k+1)]
          
ts = [xt, yt]

samplex = input_coords.reshape((-1,2))
sampley = meshz.reshape((-1))

spl = ndsplines.make_lsq_spline(samplex, sampley, ts, np.array([3,3]))
X = spl.observation_matrix

In [24]:
a = np.arange(25).reshape(5,5)
print("a: \n", a)
np.einsum(a, [0,0], [0])

a: 
 [[ 0  1  2  3  4]
 [ 5  6  7  8  9]
 [10 11 12 13 14]
 [15 16 17 18 19]
 [20 21 22 23 24]]


array([ 0,  6, 12, 18, 24])

In [75]:
print(X.shape)
fig = go.Figure()
for i in range(X.shape[1]):
    fig.add_trace(go.Scatter(x=x, y=X[:,i], 
                             name=f"BSpline {i+1}", mode="lines"))
for i in xk:
    addVertLinePlotly(fig, x0=i)
fig.show()

(2500, 49)
