# Exploratory Time Series Smoothing And Changepoints

Here are some libraries I explored when smoothing the output time series (after inference) to either make changepoint detection easier or to remove noise.

In [1]:
import matplotlib.pyplot as plt
import pylops
import ruptures as rpt
from math import log
from ruptures.base import BaseCost
import numpy as np

# PyLops L2

In [None]:
# this one using L2 regularisation 
y = [] # put the series in here 

timesteps = 500
Iop = pylops.Identity(timesteps)
D2op = pylops.SecondDerivative(timesteps, edge=True)
lamda = 1e2

xinv = pylops.optimization.leastsquares.regularized_inversion(
    Iop, y, [D2op], epsRs=[np.sqrt(lamda / 2)], **dict(iter_lim=30)
)[0]

plt.figure(figsize=(10, 5))
plt.plot(x, "k", lw=3, label="x")
plt.plot(y, ".k", label="y=x+n")
plt.plot(xinv, "r", lw=5, label="xinv")
plt.legend()
plt.title("L2 inversion")
plt.tight_layout()

# PyLops Total Variation Regularisation

In [None]:
Dop = pylops.FirstDerivative(nx, edge=True, kind="backward")
mu = 0.01
lamda = 0.3
niter_out = 50
niter_in = 3

xinv = pylops.optimization.sparsity.splitbregman(
    Iop,
    y,
    [Dop],
    niter_outer=niter_out,
    niter_inner=niter_in,
    mu=mu,
    epsRL1s=[lamda],
    tol=1e-4,
    tau=1.0,
    **dict(iter_lim=30, damp=1e-10)
)[0]

plt.figure(figsize=(10, 5))
plt.plot(x, "k", lw=3, label="x")
plt.plot(y, ".k", label="y=x+n")
plt.plot(xinv, "r", lw=5, label="xinv")
plt.legend()
plt.title("TV inversion")
plt.tight_layout()

# Ruptures

In [None]:
algo = rpt.Pelt(model="rbf", min_size=3).fit() # make sure signal is a numpy array
result = algo.predict(pen=10)

class MyCost(BaseCost):

    """Custom cost for exponential signals."""

    # The 2 following attributes must be specified for compatibility.
    model = ""
    min_size = 2

    def fit(self, signal):
        """Set the internal parameter."""
        self.signal = signal
        return self

    def error(self, start, end):
        """Return the approximation cost on the segment [start:end].

        Args:
            start (int): start of the segment
            end (int): end of the segment

        Returns:
            float: segment cost
        """
        sub = self.signal[start:end]
        return (end - start) * log(sub.mean())
    
    
# creation of data
a = np.random.exponential(scale=1, size=100)
b = np.random.exponential(scale=2, size=200)
signal, bkps = np.r_[a, b, a], [100, 300, 400]
# cost
algo = rpt.Pelt(custom_cost=MyCost()).fit(signal)
my_bkps = algo.predict(pen=10)
# display
rpt.display(signal, bkps, my_bkps)
plt.show()

In [None]:
from ruptures.costs import CostLinear
import numpy as np

class CustomCost:
    def __init__(self):
        self.model = "l2"

    def fit(self, signal):
        self.signal = signal
        return self

    def error(self, start, end):
        segment = self.signal[start:end]
        mean_segment = np.mean(segment)
        return np.sum((segment - mean_segment) ** 2)

    @property
    def model(self):
        return self._model

    @model.setter
    def model(self, value):
        self._model = value

# Usage with PELT algorithm
signal, bkps = rpt.pw_constant(n_samples, n_dims, n_bkps, noise_std=1.)
algo = rpt.Pelt(custom_cost=CustomCost())
penalty = 20
result = algo.fit(signal).predict(pen=penalty)
rpt.display(signal, bkps, result)
