In [None]:
import numpy as np
import matplotlib.pyplot as plt
from warnings import warn
from typing import Callable

In [None]:
class CountingProcess:
    def __init__(self,
                 T: float,
                 claim_law: Callable[[float], float] = None,
                 u: float = 1.0,
                 p: float = 1.0,
                 process_name: str = "Counting process",
                 seed: int = None):
        if seed is not None:
            np.random.seed(seed)
        if T <= 0:
            raise ValueError("T must be strictly positive")
        self.T = T
        self.name = process_name
        self.events = self.generate_events()
        self.N = len(self.events)
        self.claim_law = claim_law if claim_law is not None else lambda: np.random.uniform(0,1)
        self.claims = self.generate_claims()
        self.u = u
        self.p = p
        self.res = 1000

    def generate_events(self):
        raise NotImplementedError("The method 'generate_events' must be implemented in a subclass")

    def generate_intensity(self):
        raise NotImplementedError("The method 'generate_intensity' must be implemented in a subclass")

    def get_mean_and_variance(self, t, variable="process"):
        raise NotImplementedError("The method 'get_mean_and_variance' must be implemented in a subclass")

    def generate_claims(self, claim_law=None):
        if claim_law is None:
            claim_law = self.claim_law
        return np.array([claim_law() for _ in range(self.N)])

    def generate_risk(self, u, p):
        time = [0.]
        resources = [u]
        for (t,c) in zip(self.events, self.claims):
            resources.append(resources[-1] + p*(t - time[-1]))  # add the new resource
            resources.append(resources[-1] - c)  # remove the sinister
            time += [t, t]
        return time, resources

    def _events_with_limits(self):
        return [0.] + self.events + [self.T]

    def _theory_on_ax(self, ax, variable="process"):
        try:
            x = np.linspace(0., self.T, self.res)
            mean, var = self.get_mean_and_variance(x, variable=variable)
            ax.plot(x, mean, color="green")
            ax.fill_between(x, mean - 1.96*np.sqrt(var), mean + 1.96*np.sqrt(var), color="green", alpha=0.1)
        except NotImplementedError:
            warn(f"The method 'get_mean_and_variance' is not implemented for variable '{variable}'")

    def _process_on_ax(self, ax, plot_theory=False):
        ax.step(self._events_with_limits(), list(range(self.N + 1)) + [self.N], where="post")
        if plot_theory:
            self._theory_on_ax(ax, variable="process")
        ax.vlines(self.events, *ax.get_ylim(), color="red", linewidth=0.5, linestyle="dashed")
        ax.set_xlim(0, self.T)
        ax.set_title(self.name)
        ax.set_xlabel("Time")
        ax.set_ylabel("$N(t)$")
        ax.grid()

    def _intensity_on_ax(self, ax, plot_theory=False):
        absintens, intens = self.generate_intensity()
        ax.plot(absintens, intens)
        if plot_theory:
            self._theory_on_ax(ax, variable="intensity")
        ax.vlines(self.events, *ax.get_ylim(), color="red", linewidth=0.5, linestyle="dashed")
        ax.set_xlim(0, self.T)
        ax.set_title(self.name + " intensity")
        ax.set_xlabel("Time")
        ax.set_ylabel("$\lambda(t)$")
        ax.grid()

    def _risk_on_ax(self, ax, plot_theory=False, additional_parameters=None):
        u, p, mean_claim, var_claim = additional_parameters
        absrisk, risk = self.generate_risk(u, p)
        ax.plot(absrisk, risk)
        if plot_theory:
            self._theory_on_ax(ax, variable="risk")
        ax.hlines(0, *ax.get_xlim(), color="blue", linestyle="dashed")
        ax.vlines(self.events, *ax.get_ylim(), color="red", linewidth=0.5, linestyle="dashed")
        ruin = np.array([t for (i,(t,r)) in enumerate(zip(absrisk[1:], risk[1:])) if r < 0 and risk[i] > 0])
        ax.scatter(ruin, np.zeros_like(ruin), color="red", zorder=10)
        ax.set_xlim(0, self.T)
        ax.set_title(self.name + f" risk (u={u}, p={p})")
        ax.set_xlabel("Time")
        ax.set_ylabel("Resources")
        ax.grid()

    def plot_process(self, plot_theory=False, dim=(10,6), save=None):
        fig, ax = plt.subplots(figsize=dim)
        self._process_on_ax(ax, plot_theory)
        if save is not None:
            fig.savefig(save, transparent=True)
        plt.show()

    def plot_intensity(self, plot_theory=False, dim=(10,6), save=None):
        fig, ax = plt.subplots(figsize=dim)
        self._intensity_on_ax(ax, plot_theory)
        if save is not None:
            fig.savefig(save, transparent=True)
        plt.show()

    def plot_risk(self, u, p, plot_theory=False, dim=(10,6), save=None):
        fig, ax = plt.subplots(figsize=dim)
        self._risk_on_ax(ax, plot_theory, [u, p, 0, 0])
        if save is not None:
            fig.savefig(save, transparent=True)
        plt.show()

    def plot_process_and_risk(self, u, p, plot_theory=False, dim=(10,6), save=None):
        fig, ax = plt.subplots(figsize=dim)
        self._process_on_ax(ax, plot_theory)
        self._risk_on_ax(ax, plot_theory, [u, p, 0, 0])
        if save is not None:
            fig.savefig(save, transparent=True)
        plt.show()

    def plot_process_and_intensity(self, plot_theory=False, dim=(10,8), save=None):
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=dim)
        self._process_on_ax(ax1, plot_theory=plot_theory)
        self._intensity_on_ax(ax2, plot_theory=plot_theory)
        fig.tight_layout()
        if save is not None:
            fig.savefig(save, transparent=True)
        plt.show()

    def plot_process_and_intensity_and_risk(self, u, p, plot_theory=False, dim=(10,10), save=None):
        fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=dim)
        self._process_on_ax(ax1, plot_theory)
        self._intensity_on_ax(ax2, plot_theory)
        self._risk_on_ax(ax3, plot_theory, [u, p, 0, 0])
        fig.tight_layout()
        if save is not None:
            fig.savefig(save, transparent=True)
        plt.show()

In [None]:
class HawkesProcess(CountingProcess):
    def __init__(self,
                 mu: float,
                 alpha: float,
                 beta: float,
                 name: str = "Hawkes process",
                 **kwargs):
        if alpha <  0 or beta < alpha:
            raise ValueError(f"We must have 0 < alpha < beta (alpha={alpha} and beta={beta})")
        self.mu = mu
        self.alpha = alpha
        self.beta = beta
        super().__init__(process_name=name, **kwargs)

    def get_mean_and_variance(self, t, variable="process"):
        diff = self.beta - self.alpha
        mu_inf = self.beta*self.mu/diff
        diff_mu = self.mu - mu_inf
        if variable == "process":
            mean = mu_inf*t + diff_mu/diff * (1 - np.exp(-diff*t))
            var = self.beta**2*mu_inf/(diff**2)*t + \
                  self.alpha**2*(2*self.mu - mu_inf)/(2*diff**3) * (1 - np.exp(-2*diff*t)) - \
                  2*self.alpha*self.beta*diff_mu/(diff**2) * t * np.exp(-diff*t) + \
                  ( (self.beta+self.alpha)/(diff**2) * diff_mu - 2*self.alpha*self.beta/(diff**3) * mu_inf ) * (1 - np.exp(-diff*t))
        elif variable == "intensity":
            mean = mu_inf + diff_mu * np.exp(-diff*t)
            var = self.alpha**2*mu_inf/(2*diff) + self.alpha**2*diff_mu/diff*np.exp(-diff*t) - self.alpha**2*(2*self.mu-mu_inf)/(2*diff)*np.exp(-2*diff*t)
        elif variable == "risk":
            estimate_claim_parameters = [self.claim_law() for _ in range(1000)]
            claim_mean = np.mean(estimate_claim_parameters)
            mean_process, _ = self.get_mean_and_variance(t, variable="process")
            mean = self.u + self.p*t - claim_mean * mean_process
            var = 0
            warn("Risk variance is not implemented for Hawkes process", stacklevel=2)
        else:
            raise NotImplementedError(f"Not implemented yet for variable {variable}")
        return mean, np.clip(var, 0, np.inf)

    def phi(self, t):
        return self.alpha * np.exp(-self.beta * t)

    def lamb(self, t, tau):
        return self.mu + sum([self.phi(t-tk) for tk in tau])

    def generate_events(self):
        Tau = []
        s = n = t = 0
        while s < self.T:
            lambda_bar = self.lamb(s, Tau)
            u = np.random.uniform(0,1)
            w = -np.log(u)/lambda_bar
            s += w
            D = np.random.uniform(0,1)
            if D*lambda_bar <= self.lamb(s, Tau):
                n += 1
                t = s
                Tau.append(t)
        if t <= self.T:
            return Tau
        else:
            return Tau[:-1]

    def generate_intensity(self):
        x = np.linspace(0, self.T, self.res)
        intensity = self.mu * np.ones(self.res)
        points0 = self._events_with_limits()
        for i in range(len(points0)-1):
            p1, p2 = points0[i:i+2]
            i1 = (np.abs(x - p1)).argmin() + 1  # +1 for left continuity
            i2 = (np.abs(x - p2)).argmin() + 1  # +1 because of python range excludes last element
            if i == 0:
                intensity[i1:i2] = self.mu
            else:
                local_intensity = np.array([self.lamb(x[ii], points0[:i]) for ii in range(i1, i2)])
                intensity[i1:i2] = local_intensity
        return x, intensity

h = HawkesProcess(mu=1.2, alpha=0.6, beta=.65, T=100, seed=11)
h.plot_process_and_intensity_and_risk(u=1, p=2.5, plot_theory=True)

In [None]:
h.claims = h.generate_claims(claim_law=lambda: np.random.normal(5,2))
h.plot_risk(10, 8)

In [None]:
class PoissonProcess(CountingProcess):
    def __init__(self,
                 lamb: float,
                 name: str = "Poisson process",
                 **kwargs):
        if lamb <= 0:
            raise ValueError(f"lambda must be strictly positive (lambda={lamb})")
        self.lamb = lamb
        super().__init__(process_name=name, **kwargs)

    def get_mean_and_variance(self, t, variable="process"):
        raise NotImplementedError(f"Not implemented yet for variable {variable}")

    def generate_events(self):
        Tau = []
        s = 0
        while s < self.T:
            u = np.random.uniform(0,1)
            w = -np.log(u)/self.lamb
            s += w
            Tau.append(s)
        return Tau

    def generate_intensity(self):
        x = [0, self.T]
        intensity = [self.lamb, self.lamb]
        return x, intensity

p = PoissonProcess(lamb=2, T=20)
p.plot_process_and_intensity()

# Output for slides

## Title slide

In [None]:
h = HawkesProcess(mu=1.2, alpha=0.6, beta=.64, T=6, seed=16)
h.res = 5000
fig, ax = plt.subplots(figsize=(10,6))
absintens, intens = h.generate_intensity()
ax.plot(absintens, intens, color="red", linewidth=.5, alpha=.5)
ax.set_xlim(0, h.T)
ax.grid()
plt.axis("off")
fig.savefig("/tmp/fig.png", transparent=True, dpi=500, bbox_inches='tight')
plt.show()