diff --git a/.pylintrc b/.pylintrc index 899ce58..c1208f0 100644 --- a/.pylintrc +++ b/.pylintrc @@ -5,3 +5,10 @@ disable= invalid-name, protected-access, too-many-arguments + +[TYPECHECK] + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +ignored-modules=numpy.* diff --git a/pyesg/diffusion_process.py b/pyesg/diffusion_process.py new file mode 100644 index 0000000..82afa23 --- /dev/null +++ b/pyesg/diffusion_process.py @@ -0,0 +1,181 @@ +"""Classes for stochastic diffusion process""" +from typing import Dict, Optional, Tuple, Union +import numpy as np +from scipy import stats + + +class DiffusionProcess: + """ + Base class for a stochastic diffusion process. + + Provides the framework for implementing specific stochastic models as subclasses, + including a __call__ method that describes how to generate new samples from the + process, given a start value and a delta-t. + """ + + def __repr__(self) -> str: + return f"" + + def __call__( + self, + value: Union[float, np.ndarray], + dt: float, + random_state: Optional[Union[int, np.random.RandomState]] = None, + ) -> np.ndarray: + """ + Simulates the next value or array of values from the process, + given a delta-t expressed in years, e.g. 1/12 for monthly. Can + be deterministically drawn if a random_state is specified. + + Parameters + ---------- + value : Union[float, np.ndarray], the starting value or array + dt : float, the discrete time elapsed between value(t) and value(t+dt) + random_state : Optional[int, np.random.RandomState], either an integer seed or a + numpy RandomState object directly, if reproducibility is desired + + Returns + ------- + samples : np.ndarray, the next values in the process + """ + raise NotImplementedError() + + @property + def _stochastic_dist(self): + """ + Returns a scipy distribution rvs method that can be used to generate stochastic + samples for new values in the diffusion process. Default is standard normal. + """ + return stats.norm.rvs + + def _dW( + self, + size: Tuple[int, ...], + random_state: Optional[Union[int, np.random.RandomState]] = None, + ) -> np.ndarray: + """ + Returns a batch of random values for the process, given a size and optional + random_state integer or object + """ + return self._stochastic_dist(size=size, random_state=random_state) + + @property + def _coefs(self) -> Dict[str, Optional[float]]: + """Returns a dictionary of parameters required for this process""" + raise NotImplementedError() + + def _is_fit(self) -> bool: + """Returns a boolean indicating whether or not the process has been fit""" + return all([v is not None for v in self._coefs.values()]) + + def sample( + self, + init: Union[float, np.ndarray], + n_scen: int, + n_year: int, + n_step: int, + random_state: Optional[Union[int, np.random.RandomState]] = None, + ) -> np.ndarray: + """ + Sample from the diffusion process for a number of scenarios and time steps. + + Parameters + ---------- + init : Union[float, np.ndarray], either a single start value that will be + broadcast to all scenarios, or a start value array that should match the + shape of "n_scen" + n_scen : int, the number of scenarios to generate + n_year : int, the number of years per scenario + n_step : int, the number of steps per year; e.g. 1 for annual time steps, 12 + for monthly, 24 for bi-weekly, 52 for weekly, 252 (or 365) for daily + random_state : Optional[int, np.random.RandomState], either an integer seed or a + numpy RandomState object directly, if reproducibility is desired + + Returns + ------- + samples : np.ndarray with shape (n_scen, 1 + n_years*step_size), with the scenario + results from the process + """ + if not self._is_fit(): + raise RuntimeError("Model parameters haven't been fit yet!") + + # set a function-level pseudo random number generator, either by creating a new + # RandomState object with the integer argument, or using the RandomState object + # directly passed in the arguments. + if isinstance(random_state, int): + prng = np.random.RandomState(random_state) + else: + prng = random_state + + # create a shell array that we will populate with values once they are available + samples = np.empty(shape=(n_scen, 1 + n_year * n_step)) + + # overwrite first value of each scenario (the first column) with the init value + # confirm that if init is passed as an array that it matches the n_scen shape + try: + samples[:, 0] = init + except ValueError as e: + raise ValueError("'init' should have the same length as 'n_scen'") from e + + # generate the next value recursively, but vectorized across scenarios (columns) + for i in range(n_year * n_step): + samples[:, i + 1] = self( + value=samples[:, i], dt=1 / n_step, random_state=prng + ) + return samples + + +class Vasicek(DiffusionProcess): + """Vasicek short-rate model""" + + def __init__(self) -> None: + super().__init__() + self.k: Optional[float] = None + self.theta: Optional[float] = None + self.sigma: Optional[float] = None + + def __call__( + self, + value: Union[float, np.ndarray], + dt: float, + random_state: Optional[Union[int, np.random.RandomState]] = None, + ) -> np.ndarray: + if isinstance(value, float): + # convert to a one-element array + value = np.array(value) + dW = self._dW(size=value.shape, random_state=random_state) + return value + self.k * (self.theta - value) * dt + self.sigma * dt ** 0.5 * dW + + @property + def _coefs(self) -> Dict[str, Optional[float]]: + return dict(k=self.k, theta=self.theta, sigma=self.sigma) + + +class CoxIngersollRoss(DiffusionProcess): + """Cox-Ingersoll-Ross short-rate model""" + + def __init__(self) -> None: + super().__init__() + self.k: Optional[float] = None + self.theta: Optional[float] = None + self.sigma: Optional[float] = None + + def __call__( + self, + value: Union[float, np.ndarray], + dt: float, + random_state: Optional[Union[int, np.random.RandomState]] = None, + ) -> np.ndarray: + if isinstance(value, float): + # convert to a one-element array + value = np.array(value) + dW = self._dW(size=value.shape, random_state=random_state) + return ( + value + + self.k * (self.theta - value) * dt + + self.sigma * value ** 0.5 * dt ** 0.5 * dW + ) + + @property + def _coefs(self) -> Dict[str, Optional[float]]: + return dict(k=self.k, theta=self.theta, sigma=self.sigma) diff --git a/tests/test_diffusion_process.py b/tests/test_diffusion_process.py new file mode 100644 index 0000000..6a9575b --- /dev/null +++ b/tests/test_diffusion_process.py @@ -0,0 +1,73 @@ +"""Tests for the Vasicek Model""" +import unittest +import numpy as np + +from pyesg.diffusion_process import CoxIngersollRoss, Vasicek + + +class TestVasicek(unittest.TestCase): + """Test Vasicek Model""" + + def test_sample_shape(self): + """Ensure the sample has the correct shape""" + model = Vasicek() + model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 + samples = model.sample(0.03, 1000, 30, 12, random_state=None) + self.assertEqual(samples.shape, (1000, 30 * 12 + 1)) + + def test_sample_first_value1(self): + """Ensure the first scenario has the init value if init is a float""" + model = Vasicek() + model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 + init = 0.03 + samples = model.sample(init, 1000, 30, 12, random_state=None) + self.assertListEqual(list(samples[:, 0]), [0.03] * 1000) + + def test_sample_first_value2(self): + """Ensure the first scenario has the init value if init is an array""" + model = Vasicek() + model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 + init = np.array([0.03]) + samples = model.sample(init, 1000, 30, 12, random_state=None) + self.assertListEqual(list(samples[:, 0]), [0.03] * 1000) + + def test_raises_value_error(self): + """Ensure we raise a ValueError if init shape doesn't match n_scen""" + model = Vasicek() + model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 + init = np.array([0.03, 0.03]) + self.assertRaises(ValueError, model.sample, init, 1000, 30, 12) + + +class TestCoxIngersollRoss(unittest.TestCase): + """Test Cox-Ingersoll-Ross Model""" + + def test_sample_shape(self): + """Ensure the sample has the correct shape""" + model = CoxIngersollRoss() + model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 + samples = model.sample(0.03, 1000, 30, 12, random_state=None) + self.assertEqual(samples.shape, (1000, 30 * 12 + 1)) + + def test_sample_first_value1(self): + """Ensure the first scenario has the init value if init is a float""" + model = CoxIngersollRoss() + model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 + init = 0.03 + samples = model.sample(init, 1000, 30, 12, random_state=None) + self.assertListEqual(list(samples[:, 0]), [0.03] * 1000) + + def test_sample_first_value2(self): + """Ensure the first scenario has the init value if init is an array""" + model = CoxIngersollRoss() + model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 + init = np.array([0.03]) + samples = model.sample(init, 1000, 30, 12, random_state=None) + self.assertListEqual(list(samples[:, 0]), [0.03] * 1000) + + def test_raises_value_error(self): + """Ensure we raise a ValueError if init shape doesn't match n_scen""" + model = CoxIngersollRoss() + model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 + init = np.array([0.03, 0.03]) + self.assertRaises(ValueError, model.sample, init, 1000, 30, 12)