In [None]:
# Let's implement the LinearOperator builder, save it as a .py module for download,
# and run a numerical demonstration with figures and printed checks.

import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as spla
import matplotlib.pyplot as plt
from pathlib import Path

def linear_interpolation_operator(x_data, x_new, *, extrapolate=True, dtype=None):
    """
    Build a SciPy LinearOperator A such that (A @ y_new) gives the values at x_data
    obtained by linear interpolation of y_new defined on the knot grid x_new.
    
    Parameters
    ----------
    x_data : (m,) array_like
        Sample locations where we want interpolated values. May be unsorted and can have duplicates.
    x_new : (n,) array_like
        Knot grid (must be strictly increasing; no duplicates). y_new is defined at these points.
    extrapolate : bool, default=True
        If True, linearly extrapolate beyond [x_new[0], x_new[-1]] using the end intervals.
        If False, clamp outside values to the nearest endpoint (constant extension).
    dtype : numpy dtype, optional
        Dtype for the resulting LinearOperator. If None, inferred from inputs (float64 typical).
    
    Returns
    -------
    A : scipy.sparse.linalg.LinearOperator with shape (m, n)
        The interpolation operator.
        
    Notes
    -----
    - This operator is *purely* the linear interpolation map; it does not denoise or smooth.
    - For a vector y_new (values at x_new), A @ y_new gives values on x_data.
    - rmatvec is provided (A.T @ v) which is useful for least-squares solvers.
    """
    x_data = np.asarray(x_data, dtype=float).ravel()
    x_new = np.asarray(x_new, dtype=float).ravel()
    if x_new.ndim != 1 or x_data.ndim != 1:
        raise ValueError("x_data and x_new must be 1D")
    if x_new.size < 2:
        raise ValueError("x_new must contain at least two distinct points for linear interpolation")
    if not np.all(np.isfinite(x_data)) or not np.all(np.isfinite(x_new)):
        raise ValueError("x_data and x_new must be finite")
    if np.any(np.diff(x_new) <= 0):
        raise ValueError("x_new must be strictly increasing (no duplicates)")
    
    m = x_data.size
    n = x_new.size
    # For each x_data[i], find left interval index j such that x_new[j] <= x_data[i] <= x_new[j+1].
    j_left = np.searchsorted(x_new, x_data, side="right") - 1
    # Handle outside range:
    j_left = np.clip(j_left, 0, n - 2)
    j_right = j_left + 1
    
    xL = x_new[j_left]
    xR = x_new[j_right]
    dx = xR - xL  # positive by strict increasing
    
    # Compute linear weights
    t = (x_data - xL) / dx
    if not extrapolate:
        # clamp t to [0,1] outside the domain (constant extension)
        t = np.clip(t, 0.0, 1.0)
    wR = t
    wL = 1.0 - t
    
    # Infer dtype
    if dtype is None:
        dtype = np.result_type(float,)
    
    def matvec(y_new):
        y_new = np.asarray(y_new, dtype=dtype).ravel()
        if y_new.size != n:
            raise ValueError(f"Expected vector of length {n}, got {y_new.size}")
        return wL * y_new[j_left] + wR * y_new[j_right]
    
    def rmatvec(v):
        v = np.asarray(v, dtype=dtype).ravel()
        if v.size != m:
            raise ValueError(f"Expected vector of length {m}, got {v.size}")
        out = np.zeros(n, dtype=dtype)
        # Accumulate contributions to the two neighboring knots
        np.add.at(out, j_left, wL * v)
        np.add.at(out, j_right, wR * v)
        return out
    
    A = spla.LinearOperator(shape=(m, n), matvec=matvec, rmatvec=rmatvec, dtype=dtype)
    # Attach some helpful attributes for introspection (not required by SciPy)
    A.j_left = j_left
    A.j_right = j_right
    A.wL = wL
    A.wR = wR
    A.x_data = x_data
    A.x_new = x_new
    
    return A

# --- Optional: function to materialize a sparse matrix (CSR) with two nonzeros per row ---
def interpolation_csr(A):
    """
    Materialize a CSR sparse matrix corresponding to the given LinearOperator built above.
    """
    j_left = A.j_left
    j_right = A.j_right
    wL = A.wL
    wR = A.wR
    m, n = A.shape
    # Each row has up to two entries
    row_idx = np.repeat(np.arange(m), 2)
    col_idx = np.empty(2*m, dtype=int)
    col_idx[0::2] = j_left
    col_idx[1::2] = j_right
    data = np.empty(2*m, dtype=A.dtype)
    data[0::2] = wL
    data[1::2] = wR
    # If some rows have identical left/right (can only happen if n==1, which we disallow), we'd need to combine.
    M = sp.csr_matrix((data, (row_idx, col_idx)), shape=(m, n))
    return M

# Save module for download
module_code = r'''
import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as spla

def linear_interpolation_operator(x_data, x_new, *, extrapolate=True, dtype=None):
    x_data = np.asarray(x_data, dtype=float).ravel()
    x_new = np.asarray(x_new, dtype=float).ravel()
    if x_new.ndim != 1 or x_data.ndim != 1:
        raise ValueError("x_data and x_new must be 1D")
    if x_new.size < 2:
        raise ValueError("x_new must contain at least two distinct points for linear interpolation")
    if not np.all(np.isfinite(x_data)) or not np.all(np.isfinite(x_new)):
        raise ValueError("x_data and x_new must be finite")
    if np.any(np.diff(x_new) <= 0):
        raise ValueError("x_new must be strictly increasing (no duplicates)")
    
    m = x_data.size
    n = x_new.size
    j_left = np.searchsorted(x_new, x_data, side="right") - 1
    j_left = np.clip(j_left, 0, n - 2)
    j_right = j_left + 1
    
    xL = x_new[j_left]
    xR = x_new[j_right]
    dx = xR - xL
    
    t = (x_data - xL) / dx
    if not extrapolate:
        t = np.clip(t, 0.0, 1.0)
    wR = t
    wL = 1.0 - t
    
    if dtype is None:
        dtype = np.result_type(float,)
    
    def matvec(y_new):
        y_new = np.asarray(y_new, dtype=dtype).ravel()
        if y_new.size != n:
            raise ValueError(f"Expected vector of length {n}, got {y_new.size}")
        return wL * y_new[j_left] + wR * y_new[j_right]
    
    def rmatvec(v):
        v = np.asarray(v, dtype=dtype).ravel()
        if v.size != m:
            raise ValueError(f"Expected vector of length {m}, got {v.size}")
        out = np.zeros(n, dtype=dtype)
        np.add.at(out, j_left, wL * v)
        np.add.at(out, j_right, wR * v)
        return out
    
    A = spla.LinearOperator(shape=(m, n), matvec=matvec, rmatvec=rmatvec, dtype=dtype)
    A.j_left = j_left
    A.j_right = j_right
    A.wL = wL
    A.wR = wR
    A.x_data = x_data
    A.x_new = x_new
    return A

def interpolation_csr(A):
    j_left = A.j_left
    j_right = A.j_right
    wL = A.wL
    wR = A.wR
    m, n = A.shape
    row_idx = np.repeat(np.arange(m), 2)
    col_idx = np.empty(2*m, dtype=int)
    col_idx[0::2] = j_left
    col_idx[1::2] = j_right
    data = np.empty(2*m, dtype=A.dtype)
    data[0::2] = wL
    data[1::2] = wR
    M = sp.csr_matrix((data, (row_idx, col_idx)), shape=(m, n))
    return M
'''
path = Path("/mnt/data/linear_interpolation_operator.py")
path.write_text(module_code)

# --- Demonstration ---
rng = np.random.default_rng(42)
m = 80  # number of data points
# Generate unsorted x_data in [0, 10], and introduce some duplicates
x_data = np.sort(rng.uniform(0, 10, size=m))
if m >= 6:
    dup_idx = rng.choice(np.arange(1, m-1), size=6, replace=False)
    x_data[dup_idx] = x_data[dup_idx - 1]  # create duplicates

# A smooth "true" function to test correctness
def f(x):
    return np.sin(0.6 * x) + 0.1 * x

y_true = f(x_data)
y_noisy = y_true + 0.05 * rng.standard_normal(size=m)  # add small noise

# New grid (knots) where y_new will be defined
x_new = np.linspace(-1.0, 11.0, 60)  # allows extrapolation beyond data range

# Build operator
A = linear_interpolation_operator(x_data, x_new, extrapolate=True)

# 1) Forward check: if we take y_new = f(x_new), A @ y_new should approximate f(x_data) very closely
y_new_true = f(x_new)
forward_eval = A @ y_new_true
max_abs_err = np.max(np.abs(forward_eval - y_true))
l2_rel_err = np.linalg.norm(forward_eval - y_true) / np.linalg.norm(y_true)

print("Forward interpolation check (no noise):")
print(f"  max |A f(x_new) - f(x_data)| = {max_abs_err:.3e}")
print(f"  relative L2 error           = {l2_rel_err:.3e}")
print()

# 2) Fit y_new to noisy data via least-squares: min ||A y_new - y_noisy||_2
lsqr_res = spla.lsqr(A, y_noisy, atol=1e-12, btol=1e-12, iter_lim=10000)
y_new_fit = lsqr_res[0]
resid_norm = np.linalg.norm(A @ y_new_fit - y_noisy)

print("Least-squares fit to noisy data:")
print(f"  ||A y_new_fit - y_noisy||_2 = {resid_norm:.3e}")
print(f"  LSQR iters = {lsqr_res[2]}, flag = {lsqr_res[1]} (0 means converged)")
print()

# --- Plots ---
# Plot 1: Forward interpolation check against true function
plt.figure()
plt.plot(x_new, y_new_true, label="y_new = f(x_new) (knots)")
plt.scatter(x_data, y_true, s=18, label="f(x_data) (truth at data)")
plt.scatter(x_data, forward_eval, s=10, marker='x', label="A @ f(x_new) at data")
plt.xlabel("x")
plt.ylabel("y")
plt.legend()
plt.title("Forward interpolation check")

# Plot 2: Noisy data and LSQ piecewise-linear fit
plt.figure()
plt.scatter(x_data, y_noisy, s=14, label="Noisy data (x_data, y_noisy)")
plt.plot(x_new, y_new_fit, label="Fitted y_new on knots (piecewise-linear model)")
plt.xlabel("x")
plt.ylabel("y")
plt.legend()
plt.title("Least-squares fit via LinearOperator")

# Show the figures
plt.show()

# Provide the saved module path for download
f"Saved module to: {path}"
