Skip to content

Commit

Permalink
Changed class BlackScholesPriceProcess to use a forward curve.
Browse files Browse the repository at this point in the history
Also changed the format of calibration params to be similar
to the n-factor markovian price process code.
  • Loading branch information
johnbywater committed Sep 19, 2017
1 parent f652f9d commit 3cb079e
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 57 deletions.
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,23 @@ Calibrate from historical data. In this example, we can just register some calib
price_process_name = 'quantdsl.priceprocess.blackscholes.BlackScholesPriceProcess'

calibration_params = {
'GAS-LAST-PRICE': 10,
'POWER-LAST-PRICE': 11,
'GAS-ACTUAL-HISTORICAL-VOLATILITY': 30,
'POWER-ACTUAL-HISTORICAL-VOLATILITY': 20,
'GAS-POWER-CORRELATION': 0.4,
'market': ['GAS', 'POWER'],
'sigma': [0.3, 0.2],
'rho': [
[1.0, 0.4],
[0.4, 1.0],
],
'curve': {
'GAS': [
('2011-1-1', 10),
],
'POWER': [
('2011-1-1', 11),
],
},
}


market_calibration = app.register_market_calibration(
price_process_name,
calibration_params
Expand Down
48 changes: 34 additions & 14 deletions quantdsl/priceprocess/blackscholes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,38 @@
import datetime
import math

import dateutil.parser
import scipy
import scipy.linalg
from scipy import array, sort, searchsorted
from scipy.linalg import LinAlgError

from quantdsl.exceptions import DslError
from quantdsl.priceprocess.base import PriceProcess
from quantdsl.priceprocess.base import PriceProcess, datetime_from_date
from quantdsl.priceprocess.base import get_duration_years


class ForwardCurve(object):
def __init__(self, name, data):
self.name = name
self.data = data
self.by_date = dict(
[(datetime_from_date(dateutil.parser.parse(d)), v) for (d, v) in self.data]
)
self.sorted = sort(array(self.by_date.keys()))

def get_price(self, date):
try:
price = self.by_date[date]
except:
# Search for earlier date.
index = searchsorted(self.sorted, date) - 1
if index < 0:
raise KeyError("Fixing date {} not found in '{}' forward curve.".format(date, self.name))
price = self.by_date[self.sorted[index]]
return price


class BlackScholesPriceProcess(PriceProcess):

def simulate_future_prices(self, observation_date, requirements, path_count, calibration_params):
Expand All @@ -26,15 +49,12 @@ def simulate_future_prices(self, observation_date, requirements, path_count, cal
# motions, the actual historical volatility, and the last price.
for commodity_name, brownian_motions in all_brownian_motions:
# Get the 'last price' for this commodity.
param_name = '%s-LAST-PRICE' % commodity_name
last_price = self.get_calibration_param(param_name, calibration_params)

# Get the 'actual historical volatility' for this commodity.
param_name = '%s-ACTUAL-HISTORICAL-VOLATILITY' % commodity_name
actual_historical_volatility = self.get_calibration_param(param_name, calibration_params)

sigma = actual_historical_volatility / 100.0
index = calibration_params['market'].index(commodity_name)
sigma = calibration_params['sigma'][index]
curve = ForwardCurve(commodity_name, calibration_params['curve'][commodity_name])
for fixing_date, brownian_rv in brownian_motions:
last_price = curve.get_price(fixing_date)
T = get_duration_years(observation_date, fixing_date)
simulated_value = last_price * scipy.exp(sigma * brownian_rv - 0.5 * sigma * sigma * T)
yield commodity_name, fixing_date, fixing_date, simulated_value
Expand Down Expand Up @@ -136,15 +156,15 @@ def get_brownian_motions(self, observation_date, requirements, path_count, calib
return all_brownian_motions

def get_correlation_from_calibration(self, market_calibration, name_i, name_j):
market_name_pair = tuple(sorted([name_i, name_j]))
calibration_name = "%s-%s-CORRELATION" % market_name_pair
index_i = market_calibration['market'].index(name_i)
index_j = market_calibration['market'].index(name_j)
try:
correlation = market_calibration[calibration_name]
correlation = market_calibration['rho'][index_i][index_j]
except KeyError as e:
msg = "Can't find correlation between '%s' and '%s' in market calibration params: %s: %s" % (
market_name_pair[0],
market_name_pair[1],
market_calibration.keys(),
name_i,
name_j,
market_calibration,
e
)
raise DslError(msg)
Expand Down
26 changes: 26 additions & 0 deletions quantdsl/tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,32 @@ def tearDown(self):
class ContractValuationTestCase(ApplicationTestCaseMixin):
price_process_name = DEFAULT_PRICE_PROCESS_NAME
calibration_params = {
'market': ['#1', '#2', 'NBP', 'TTF', 'SPARKSPREAD'],
'sigma': [0.5, 0.5, 0.5, 0.4, 0.4],
'curve': {
'#1': [
('2011-1-1', 10),
],
'#2': [
('2011-1-1', 10),
],
'NBP': [
('2011-1-1', 10),
],
'TTF': [
('2011-1-1', 11),
],
'SPARKSPREAD': [
('2011-1-1', 1),
],
},
'rho': [
[1.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 1.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 1.0, 0.4, 0.0],
[0.0, 0.0, 0.4, 1.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 1.0],
],
'#1-LAST-PRICE': 10,
'#2-LAST-PRICE': 10,
'#1-ACTUAL-HISTORICAL-VOLATILITY': 50,
Expand Down
19 changes: 14 additions & 5 deletions quantdsl/tests/test_market_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@ def test_register_market_simulation(self):
# Set up the market calibration.
price_process_name = DEFAULT_PRICE_PROCESS_NAME
calibration_params = {
'#1-LAST-PRICE': 10,
'#2-LAST-PRICE': 20,
'#1-ACTUAL-HISTORICAL-VOLATILITY': 10,
'#2-ACTUAL-HISTORICAL-VOLATILITY': 20,
'#1-#2-CORRELATION': 0.5,
'market': ['#1', '#2'],
'sigma': [0.1, 0.2],
'curve': {
'#1': [
('2011-1-1', 10),
],
'#2': [
('2011-1-1', 20),
],
},
'rho': [
[1.0, 0.5],
[0.5, 1.0],
],
}
market_calibration = self.app.register_market_calibration(price_process_name, calibration_params)

Expand Down
146 changes: 113 additions & 33 deletions quantdsl/tests/test_price_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@ def test_simulate_future_prices_no_requirements(self):
observation_date=datetime.datetime(2011, 1, 1),
requirements=[],
path_count=2,
calibration_params={'#1-LAST-PRICE': 10, '#1-ACTUAL-HISTORICAL-VOLATILITY': 50},
calibration_params={
'market': ['#1'],
'sigma': [0.5],
'curve': {
'#1': (
('2011-1-1', 10),
)
},
},
))
prices = [(p[0], p[1], p[2], p[3].all()) for p in prices] # For scipy.
self.assertEqual(prices, [])
self.assertEqual(list(prices), [])

def test_simulate_future_prices_one_market_zero_volatility(self):
prices = list(self.p.simulate_future_prices(
Expand All @@ -30,8 +37,13 @@ def test_simulate_future_prices_one_market_zero_volatility(self):
observation_date=datetime.datetime(2011, 1, 1),
path_count=2,
calibration_params={
'#1-LAST-PRICE': 10,
'#1-ACTUAL-HISTORICAL-VOLATILITY': 0,
'market': ['#1'],
'sigma': [0.0],
'curve': {
'#1': (
('2011-1-1', 10),
)
},
},
))
prices = [(p[0], p[1], p[2], p[3].mean()) for p in prices] # For scipy.
Expand All @@ -42,15 +54,23 @@ def test_simulate_future_prices_one_market_zero_volatility(self):

def test_simulate_future_prices_one_market_high_volatility(self):
prices = list(self.p.simulate_future_prices(
requirements=[],
requirements=[
('#1', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#1', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
],
observation_date=datetime.datetime(2011, 1, 1),
path_count=1000,
calibration_params={
'#1-LAST-PRICE': 10,
'#1-ACTUAL-HISTORICAL-VOLATILITY': 50,
'market': ['#1'],
'sigma': [0.5],
'curve': {
'#1': (
('2011-1-1', 10),
)
},
},
))
prices = [p[2].mean() for p in prices[1:]] # For scipy.
prices = [p[3].mean() for p in prices[1:]] # For scipy.
expected_prices = [10]
for price, expected_price in zip(prices, expected_prices):
self.assertNotEqual(price, expected_price)
Expand All @@ -59,19 +79,25 @@ def test_simulate_future_prices_one_market_high_volatility(self):
def test_simulate_future_prices_two_markets_zero_volatility(self):
prices = list(self.p.simulate_future_prices(
requirements=[
('#1', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1), scipy.array([ 10., 10.]).mean()),
('#1', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2), scipy.array([ 10., 10.]).mean()),
('#2', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1), scipy.array([ 20., 20.]).mean()),
('#2', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2), scipy.array([ 20., 20.]).mean()),
('#1', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#1', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
('#2', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#2', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
],
observation_date=datetime.datetime(2011, 1, 1),
path_count=200000, calibration_params={
'#1-LAST-PRICE': 10,
'#1-ACTUAL-HISTORICAL-VOLATILITY': 0,
'#2-LAST-PRICE': 20,
'#2-ACTUAL-HISTORICAL-VOLATILITY': 0,
'#1-#2-CORRELATION': 0,
},
'market': ['#1', '#2'],
'sigma': [0.0, 0.0],
'curve': {
'#1': (
('2011-1-1', 10),
),
'#2': (
('2011-1-1', 20),
)
},
'rho': [[1, 0], [0, 1]]
}
))
prices = [(p[0], p[1], p[2], p[3].mean()) for p in prices] # For scipy.
self.assertEqual(prices, [
Expand All @@ -83,34 +109,88 @@ def test_simulate_future_prices_two_markets_zero_volatility(self):

def test_simulate_future_prices_two_markets_high_volatility_zero_correlation(self):
prices = list(self.p.simulate_future_prices(
requirements=[],
requirements=[
('#1', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#1', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
('#2', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#2', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
],
observation_date=datetime.datetime(2011, 1, 1),
path_count=1000, calibration_params={
'#1-LAST-PRICE': 10,
'#1-ACTUAL-HISTORICAL-VOLATILITY': 50,
'#2-LAST-PRICE': 20,
'#2-ACTUAL-HISTORICAL-VOLATILITY': 50,
'#1-#2-CORRELATION': 0,
'market': ['#1', '#2'],
'sigma': [0.5, 0.5],
'curve': {
'#1': (
('2011-1-1', 10),
),
'#2': (
('2011-1-1', 20),
)
},
'rho': [[1, 0], [0, 1]]
},
))
prices = [p[2].mean() for p in prices] # For scipy.
prices = [p[3].mean() for p in prices] # For scipy.
expected_prices = [10, 10, 20, 20]
for price, expected_price in zip(prices, expected_prices):
self.assertAlmostEqual(price, expected_price, places=0)

def test_simulate_future_prices_two_markets_high_volatility_positive_correlation(self):
prices = list(self.p.simulate_future_prices(
requirements=[],
requirements=[
('#1', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#1', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
('#2', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#2', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
],
observation_date=datetime.datetime(2011, 1, 1),
path_count=1000, calibration_params={
'#1-LAST-PRICE': 10,
'#1-ACTUAL-HISTORICAL-VOLATILITY': 50,
'#2-LAST-PRICE': 20,
'#2-ACTUAL-HISTORICAL-VOLATILITY': 50,
'#1-#2-CORRELATION': 0.5,
'market': ['#1', '#2'],
'sigma': [0.5, 0.5],
'curve': {
'#1': (
('2011-1-1', 10),
),
'#2': (
('2011-1-1', 20),
)
},
'rho': [[1, 0.5], [0.5, 1]]
},
))
prices = [p[2].mean() for p in prices] # For scipy.
assert len(prices)
prices = [p[3].mean() for p in prices] # For scipy.
expected_prices = [10, 10, 20, 20]
for price, expected_price in zip(prices, expected_prices):
self.assertAlmostEqual(price, expected_price, places=0)

def test_simulate_future_prices_from_longer_curve(self):
prices = list(self.p.simulate_future_prices(
requirements=[
('#1', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#1', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
('#2', datetime.datetime(2011, 1, 1), datetime.datetime(2011, 1, 1)),
('#2', datetime.datetime(2011, 1, 2), datetime.datetime(2011, 1, 2)),
],
observation_date=datetime.datetime(2011, 1, 1),
path_count=1000, calibration_params={
'market': ['#1', '#2'],
'sigma': [0.5, 0.5],
'curve': {
'#1': (
('2011-1-1', 10),
('2011-1-2', 15)
),
'#2': (
('2011-1-1', 20),
('2011-1-2', 25)
)
},
'rho': [[1, 0.5], [0.5, 1]]
},
))
expected_prices = [10, 15, 20, 25]
prices = [p[3].mean() for p in prices] # For scipy.

for price, expected_price in zip(prices, expected_prices):
self.assertAlmostEqual(price, expected_price, places=0)

0 comments on commit 3cb079e

Please sign in to comment.