Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* sketching out a modification to the DiffusionProcess class design * add method for the stochastic process of the diffusion process * add DiffusionProcess __repr__ * update pylintrc to not flag numpy no-member errors * complete the new DiffusionProcess and implement a sample child Vasicek class * typo in vasicek __call__ formula * add CIR short-rate model to diffusion_process.py * remove __name__ == __main__ * add unit tests for diffusion_process.py * forgot to add initial value to the process delta
- Loading branch information
Showing
3 changed files
with
261 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
"""Classes for stochastic diffusion process""" | ||
from typing import Dict, Optional, Tuple, Union | ||
import numpy as np | ||
from scipy import stats | ||
|
||
|
||
class DiffusionProcess: | ||
""" | ||
Base class for a stochastic diffusion process. | ||
Provides the framework for implementing specific stochastic models as subclasses, | ||
including a __call__ method that describes how to generate new samples from the | ||
process, given a start value and a delta-t. | ||
""" | ||
|
||
def __repr__(self) -> str: | ||
return f"<pyesg.{self.__class__.__name__}>" | ||
|
||
def __call__( | ||
self, | ||
value: Union[float, np.ndarray], | ||
dt: float, | ||
random_state: Optional[Union[int, np.random.RandomState]] = None, | ||
) -> np.ndarray: | ||
""" | ||
Simulates the next value or array of values from the process, | ||
given a delta-t expressed in years, e.g. 1/12 for monthly. Can | ||
be deterministically drawn if a random_state is specified. | ||
Parameters | ||
---------- | ||
value : Union[float, np.ndarray], the starting value or array | ||
dt : float, the discrete time elapsed between value(t) and value(t+dt) | ||
random_state : Optional[int, np.random.RandomState], either an integer seed or a | ||
numpy RandomState object directly, if reproducibility is desired | ||
Returns | ||
------- | ||
samples : np.ndarray, the next values in the process | ||
""" | ||
raise NotImplementedError() | ||
|
||
@property | ||
def _stochastic_dist(self): | ||
""" | ||
Returns a scipy distribution rvs method that can be used to generate stochastic | ||
samples for new values in the diffusion process. Default is standard normal. | ||
""" | ||
return stats.norm.rvs | ||
|
||
def _dW( | ||
self, | ||
size: Tuple[int, ...], | ||
random_state: Optional[Union[int, np.random.RandomState]] = None, | ||
) -> np.ndarray: | ||
""" | ||
Returns a batch of random values for the process, given a size and optional | ||
random_state integer or object | ||
""" | ||
return self._stochastic_dist(size=size, random_state=random_state) | ||
|
||
@property | ||
def _coefs(self) -> Dict[str, Optional[float]]: | ||
"""Returns a dictionary of parameters required for this process""" | ||
raise NotImplementedError() | ||
|
||
def _is_fit(self) -> bool: | ||
"""Returns a boolean indicating whether or not the process has been fit""" | ||
return all([v is not None for v in self._coefs.values()]) | ||
|
||
def sample( | ||
self, | ||
init: Union[float, np.ndarray], | ||
n_scen: int, | ||
n_year: int, | ||
n_step: int, | ||
random_state: Optional[Union[int, np.random.RandomState]] = None, | ||
) -> np.ndarray: | ||
""" | ||
Sample from the diffusion process for a number of scenarios and time steps. | ||
Parameters | ||
---------- | ||
init : Union[float, np.ndarray], either a single start value that will be | ||
broadcast to all scenarios, or a start value array that should match the | ||
shape of "n_scen" | ||
n_scen : int, the number of scenarios to generate | ||
n_year : int, the number of years per scenario | ||
n_step : int, the number of steps per year; e.g. 1 for annual time steps, 12 | ||
for monthly, 24 for bi-weekly, 52 for weekly, 252 (or 365) for daily | ||
random_state : Optional[int, np.random.RandomState], either an integer seed or a | ||
numpy RandomState object directly, if reproducibility is desired | ||
Returns | ||
------- | ||
samples : np.ndarray with shape (n_scen, 1 + n_years*step_size), with the scenario | ||
results from the process | ||
""" | ||
if not self._is_fit(): | ||
raise RuntimeError("Model parameters haven't been fit yet!") | ||
|
||
# set a function-level pseudo random number generator, either by creating a new | ||
# RandomState object with the integer argument, or using the RandomState object | ||
# directly passed in the arguments. | ||
if isinstance(random_state, int): | ||
prng = np.random.RandomState(random_state) | ||
else: | ||
prng = random_state | ||
|
||
# create a shell array that we will populate with values once they are available | ||
samples = np.empty(shape=(n_scen, 1 + n_year * n_step)) | ||
|
||
# overwrite first value of each scenario (the first column) with the init value | ||
# confirm that if init is passed as an array that it matches the n_scen shape | ||
try: | ||
samples[:, 0] = init | ||
except ValueError as e: | ||
raise ValueError("'init' should have the same length as 'n_scen'") from e | ||
|
||
# generate the next value recursively, but vectorized across scenarios (columns) | ||
for i in range(n_year * n_step): | ||
samples[:, i + 1] = self( | ||
value=samples[:, i], dt=1 / n_step, random_state=prng | ||
) | ||
return samples | ||
|
||
|
||
class Vasicek(DiffusionProcess): | ||
"""Vasicek short-rate model""" | ||
|
||
def __init__(self) -> None: | ||
super().__init__() | ||
self.k: Optional[float] = None | ||
self.theta: Optional[float] = None | ||
self.sigma: Optional[float] = None | ||
|
||
def __call__( | ||
self, | ||
value: Union[float, np.ndarray], | ||
dt: float, | ||
random_state: Optional[Union[int, np.random.RandomState]] = None, | ||
) -> np.ndarray: | ||
if isinstance(value, float): | ||
# convert to a one-element array | ||
value = np.array(value) | ||
dW = self._dW(size=value.shape, random_state=random_state) | ||
return value + self.k * (self.theta - value) * dt + self.sigma * dt ** 0.5 * dW | ||
|
||
@property | ||
def _coefs(self) -> Dict[str, Optional[float]]: | ||
return dict(k=self.k, theta=self.theta, sigma=self.sigma) | ||
|
||
|
||
class CoxIngersollRoss(DiffusionProcess): | ||
"""Cox-Ingersoll-Ross short-rate model""" | ||
|
||
def __init__(self) -> None: | ||
super().__init__() | ||
self.k: Optional[float] = None | ||
self.theta: Optional[float] = None | ||
self.sigma: Optional[float] = None | ||
|
||
def __call__( | ||
self, | ||
value: Union[float, np.ndarray], | ||
dt: float, | ||
random_state: Optional[Union[int, np.random.RandomState]] = None, | ||
) -> np.ndarray: | ||
if isinstance(value, float): | ||
# convert to a one-element array | ||
value = np.array(value) | ||
dW = self._dW(size=value.shape, random_state=random_state) | ||
return ( | ||
value | ||
+ self.k * (self.theta - value) * dt | ||
+ self.sigma * value ** 0.5 * dt ** 0.5 * dW | ||
) | ||
|
||
@property | ||
def _coefs(self) -> Dict[str, Optional[float]]: | ||
return dict(k=self.k, theta=self.theta, sigma=self.sigma) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
"""Tests for the Vasicek Model""" | ||
import unittest | ||
import numpy as np | ||
|
||
from pyesg.diffusion_process import CoxIngersollRoss, Vasicek | ||
|
||
|
||
class TestVasicek(unittest.TestCase): | ||
"""Test Vasicek Model""" | ||
|
||
def test_sample_shape(self): | ||
"""Ensure the sample has the correct shape""" | ||
model = Vasicek() | ||
model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 | ||
samples = model.sample(0.03, 1000, 30, 12, random_state=None) | ||
self.assertEqual(samples.shape, (1000, 30 * 12 + 1)) | ||
|
||
def test_sample_first_value1(self): | ||
"""Ensure the first scenario has the init value if init is a float""" | ||
model = Vasicek() | ||
model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 | ||
init = 0.03 | ||
samples = model.sample(init, 1000, 30, 12, random_state=None) | ||
self.assertListEqual(list(samples[:, 0]), [0.03] * 1000) | ||
|
||
def test_sample_first_value2(self): | ||
"""Ensure the first scenario has the init value if init is an array""" | ||
model = Vasicek() | ||
model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 | ||
init = np.array([0.03]) | ||
samples = model.sample(init, 1000, 30, 12, random_state=None) | ||
self.assertListEqual(list(samples[:, 0]), [0.03] * 1000) | ||
|
||
def test_raises_value_error(self): | ||
"""Ensure we raise a ValueError if init shape doesn't match n_scen""" | ||
model = Vasicek() | ||
model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 | ||
init = np.array([0.03, 0.03]) | ||
self.assertRaises(ValueError, model.sample, init, 1000, 30, 12) | ||
|
||
|
||
class TestCoxIngersollRoss(unittest.TestCase): | ||
"""Test Cox-Ingersoll-Ross Model""" | ||
|
||
def test_sample_shape(self): | ||
"""Ensure the sample has the correct shape""" | ||
model = CoxIngersollRoss() | ||
model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 | ||
samples = model.sample(0.03, 1000, 30, 12, random_state=None) | ||
self.assertEqual(samples.shape, (1000, 30 * 12 + 1)) | ||
|
||
def test_sample_first_value1(self): | ||
"""Ensure the first scenario has the init value if init is a float""" | ||
model = CoxIngersollRoss() | ||
model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 | ||
init = 0.03 | ||
samples = model.sample(init, 1000, 30, 12, random_state=None) | ||
self.assertListEqual(list(samples[:, 0]), [0.03] * 1000) | ||
|
||
def test_sample_first_value2(self): | ||
"""Ensure the first scenario has the init value if init is an array""" | ||
model = CoxIngersollRoss() | ||
model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 | ||
init = np.array([0.03]) | ||
samples = model.sample(init, 1000, 30, 12, random_state=None) | ||
self.assertListEqual(list(samples[:, 0]), [0.03] * 1000) | ||
|
||
def test_raises_value_error(self): | ||
"""Ensure we raise a ValueError if init shape doesn't match n_scen""" | ||
model = CoxIngersollRoss() | ||
model.k, model.theta, model.sigma = 0.15, 0.045, 0.015 | ||
init = np.array([0.03, 0.03]) | ||
self.assertRaises(ValueError, model.sample, init, 1000, 30, 12) |