Skip to content

Commit

Permalink
Renamed from dependency_graph_id to contract_specification_id.
Browse files Browse the repository at this point in the history
Removed DependencyGraph class.
  • Loading branch information
johnbywater committed Sep 13, 2017
1 parent b3e9c8c commit 0e505b6
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 192 deletions.
6 changes: 3 additions & 3 deletions quantdsl/application/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ def loop_on_evaluation_queue(self, call_result_lock, compute_pool=None, result_c
usage_counters=usage_counters,
)

def evaluate_call_and_queue_next_calls(self, contract_valuation_id, dependency_graph_id, call_id, lock):
def evaluate_call_and_queue_next_calls(self, contract_valuation_id, contract_specification_id, call_id, lock):
evaluate_call_and_queue_next_calls(
contract_valuation_id=contract_valuation_id,
dependency_graph_id=dependency_graph_id,
contract_specification_id=contract_specification_id,
call_id=call_id,
call_evaluation_queue=self.call_evaluation_queue,
contract_valuation_repo=self.contract_valuation_repo,
Expand Down Expand Up @@ -231,7 +231,7 @@ def evaluate(self, contract_specification, market_simulation):
return self.start_contract_valuation(contract_specification.id, market_simulation.id)

def get_result(self, contract_valuation):
call_result_id = make_call_result_id(contract_valuation.id, contract_valuation.dependency_graph_id)
call_result_id = make_call_result_id(contract_valuation.id, contract_valuation.contract_specification_id)
return self.call_result_repo[call_result_id]

def calc_call_count(self, contract_specification_id):
Expand Down
4 changes: 2 additions & 2 deletions quantdsl/domain/model/call_leafs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def leaf_ids(self):
return self._leaf_ids


def register_call_leafs(dependency_graph_id, leaf_ids):
created_event = CallLeafs.Created(entity_id=dependency_graph_id, leaf_ids=leaf_ids)
def register_call_leafs(contract_specification_id, leaf_ids):
created_event = CallLeafs.Created(entity_id=contract_specification_id, leaf_ids=leaf_ids)
call_leafs = CallLeafs.mutator(event=created_event)
publish(created_event)
return call_leafs
Expand Down
10 changes: 5 additions & 5 deletions quantdsl/domain/model/call_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ class Created(EventSourcedEntity.Created):
class Discarded(EventSourcedEntity.Discarded):
pass

def __init__(self, result_value, perturbed_values, contract_valuation_id, call_id, dependency_graph_id, **kwargs):
def __init__(self, result_value, perturbed_values, contract_valuation_id, call_id, contract_specification_id, **kwargs):
super(CallResult, self).__init__(**kwargs)
self._result_value = result_value
self._perturbed_values = perturbed_values
self._contract_valuation_id = contract_valuation_id
self._call_id = call_id
self._dependency_graph_id = dependency_graph_id
self._contract_specification_id = contract_specification_id

@property
def call_id(self):
Expand All @@ -41,7 +41,7 @@ def contract_valuation_id(self):
return self._contract_valuation_id

@property
def dependency_graph_id(self):
def contract_specification_id(self):
return self._contract_valuation_id

@property
Expand All @@ -54,7 +54,7 @@ def scalar_result_value(self):
return result_value


def register_call_result(call_id, result_value, perturbed_values, contract_valuation_id, dependency_graph_id):
def register_call_result(call_id, result_value, perturbed_values, contract_valuation_id, contract_specification_id):
call_result_id = make_call_result_id(contract_valuation_id, call_id)
created_event = CallResult.Created(entity_id=call_result_id,
result_value=result_value,
Expand All @@ -63,7 +63,7 @@ def register_call_result(call_id, result_value, perturbed_values, contract_valua
call_id=call_id,
# Todo: Don't persist this, get the contract valuation object when needed.
# Todo: Also save the list of fixing dates separately (if needs to be saved).
dependency_graph_id=dependency_graph_id,
contract_specification_id=contract_specification_id,
)
call_result = CallResult.mutator(event=created_event)

Expand Down
10 changes: 5 additions & 5 deletions quantdsl/domain/model/contract_valuation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ class Created(EventSourcedEntity.Created):
class Discarded(EventSourcedEntity.Discarded):
pass

def __init__(self, market_simulation_id, dependency_graph_id, **kwargs):
def __init__(self, market_simulation_id, contract_specification_id, **kwargs):
super(ContractValuation, self).__init__(**kwargs)
self._market_simulation_id = market_simulation_id
self._dependency_graph_id = dependency_graph_id
self._contract_specification_id = contract_specification_id

@property
def market_simulation_id(self):
return self._market_simulation_id

@property
def dependency_graph_id(self):
return self._dependency_graph_id
def contract_specification_id(self):
return self._contract_specification_id


def start_contract_valuation(contract_specification_id, market_simulation_id):
contract_valuation_id = create_contract_valuation_id()
contract_valuation_created = ContractValuation.Created(
entity_id=contract_valuation_id,
market_simulation_id=market_simulation_id,
dependency_graph_id=contract_specification_id,
contract_specification_id=contract_specification_id,
)
contract_valuation = ContractValuation.mutator(event=contract_valuation_created)
publish(contract_valuation_created)
Expand Down
31 changes: 0 additions & 31 deletions quantdsl/domain/model/dependency_graph.py

This file was deleted.

24 changes: 12 additions & 12 deletions quantdsl/domain/services/contract_valuations.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def evaluate_contract_in_series(contract_valuation_id, contract_valuation_repo,
assert isinstance(contract_valuation, ContractValuation), contract_valuation

# Get the contract specification ID.
contract_specification_id = contract_valuation.dependency_graph_id
contract_specification_id = contract_valuation.contract_specification_id

# Get the market simulation.
market_simulation = market_simulation_repo[contract_valuation.market_simulation_id]
Expand Down Expand Up @@ -102,7 +102,7 @@ def evaluate_contract_in_series(contract_valuation_id, contract_valuation_repo,
result_value=result_value,
perturbed_values=perturbed_values,
contract_valuation_id=contract_valuation_id,
dependency_graph_id=contract_specification_id,
contract_specification_id=contract_specification_id,
)

# # Check for results that should be deleted.
Expand Down Expand Up @@ -133,11 +133,11 @@ def evaluate_contract_in_parallel(contract_valuation_id, contract_valuation_repo
contract_valuation = contract_valuation_repo[contract_valuation_id]
# assert isinstance(contract_valuation, ContractValuation), contract_valuation

dependency_graph_id = contract_valuation.dependency_graph_id
contract_specification_id = contract_valuation.contract_specification_id

if result_counters is not None:
assert usage_counters is not None
for call_id in regenerate_execution_order(dependency_graph_id, call_link_repo):
for call_id in regenerate_execution_order(contract_specification_id, call_link_repo):
call_dependencies = call_dependencies_repo[call_id]
call_dependents = call_dependents_repo[call_id]
assert isinstance(call_dependencies, CallDependencies)
Expand All @@ -149,11 +149,11 @@ def evaluate_contract_in_parallel(contract_valuation_id, contract_valuation_repo
result_counters[call_result_id] = [None] * (count_dependencies - 1)
usage_counters[call_result_id] = [None] * (count_dependents - 1)

call_leafs = call_leafs_repo[dependency_graph_id]
call_leafs = call_leafs_repo[contract_specification_id]
# assert isinstance(call_leafs, CallLeafs)

for call_id in call_leafs.leaf_ids:
call_evaluation_queue.put((dependency_graph_id, contract_valuation_id, call_id))
call_evaluation_queue.put((contract_specification_id, contract_valuation_id, call_id))
gevent.sleep(0)


Expand All @@ -166,11 +166,11 @@ def loop_on_evaluation_queue(call_evaluation_queue, contract_valuation_repo, cal
if isinstance(call_evaluation_queue, gevent.queue.Queue):
gevent.sleep(0)
try:
dependency_graph_id, contract_valuation_id, call_id = item
contract_specification_id, contract_valuation_id, call_id = item

evaluate_call_and_queue_next_calls(
contract_valuation_id=contract_valuation_id,
dependency_graph_id=dependency_graph_id,
contract_specification_id=contract_specification_id,
call_id=call_id,
call_evaluation_queue=call_evaluation_queue,
contract_valuation_repo=contract_valuation_repo,
Expand All @@ -191,7 +191,7 @@ def loop_on_evaluation_queue(call_evaluation_queue, contract_valuation_repo, cal
call_evaluation_queue.task_done()


def evaluate_call_and_queue_next_calls(contract_valuation_id, dependency_graph_id, call_id, call_evaluation_queue,
def evaluate_call_and_queue_next_calls(contract_valuation_id, contract_specification_id, call_id, call_evaluation_queue,
contract_valuation_repo, call_requirement_repo, market_simulation_repo,
call_dependencies_repo, call_result_repo, simulated_price_repo,
call_dependents_repo, perturbation_dependencies_repo,
Expand Down Expand Up @@ -247,7 +247,7 @@ def evaluate_call_and_queue_next_calls(contract_valuation_id, dependency_graph_i
result_value=result_value,
perturbed_values=perturbed_values,
contract_valuation_id=contract_valuation_id,
dependency_graph_id=dependency_graph_id,
contract_specification_id=contract_specification_id,
)

# Find next calls.
Expand Down Expand Up @@ -285,7 +285,7 @@ def evaluate_call_and_queue_next_calls(contract_valuation_id, dependency_graph_i
# Otherwise put things directly on the queue.
next_call_ids = []
for next_call_id in ready_generator:
call_evaluation_queue.put((dependency_graph_id, contract_valuation_id, next_call_id))
call_evaluation_queue.put((contract_specification_id, contract_valuation_id, next_call_id))
gevent.sleep(0)

finally:
Expand All @@ -295,7 +295,7 @@ def evaluate_call_and_queue_next_calls(contract_valuation_id, dependency_graph_i

# Queue the next calls (if there are any - see above).
for next_call_id in next_call_ids:
call_evaluation_queue.put((dependency_graph_id, contract_valuation_id, next_call_id))
call_evaluation_queue.put((contract_specification_id, contract_valuation_id, next_call_id))


def find_dependents_ready_to_be_evaluated(contract_valuation_id, call_id, call_dependents_repo, call_dependencies_repo,
Expand Down
4 changes: 2 additions & 2 deletions quantdsl/domain/services/simulated_prices.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ def simulate_future_prices(market_simulation, market_calibration):
calibration_params=market_calibration.calibration_params)


def identify_simulation_requirements(dependency_graph_id, call_requirement_repo, call_link_repo,
def identify_simulation_requirements(contract_specification_id, call_requirement_repo, call_link_repo,
call_dependencies_repo, market_dependencies_repo, observation_date, requirements):
assert isinstance(call_requirement_repo, CallRequirementRepository)
assert isinstance(call_link_repo, CallLinkRepository)

all_perturbation_dependencies = {}

for call_id in regenerate_execution_order(dependency_graph_id, call_link_repo):
for call_id in regenerate_execution_order(contract_specification_id, call_link_repo):

# Get the stubbed expression.
call_requirement = call_requirement_repo[call_id]
Expand Down
15 changes: 6 additions & 9 deletions quantdsl/infrastructure/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from filelock import FileLock

from quantdsl.application.base import QuantDslApplication
Expand All @@ -7,15 +6,15 @@


class CeleryCallEvaluationQueueFacade(object):

def put(self, item):
dependency_graph_id, contract_valuation_id, call_id = item
contract_specification_id, contract_valuation_id, call_id = item
try:
celery_evaluate_call.delay(dependency_graph_id, contract_valuation_id, call_id)
# result = celery_evaluate_call(dependency_graph_id, contract_valuation_id, call_id)
celery_evaluate_call.delay(contract_specification_id, contract_valuation_id, call_id)
# result = celery_evaluate_call(contract_specification_id, contract_valuation_id, call_id)
except OSError as e:
raise Exception("Celery call failed (is RabbitMQ running?): %s" % e)


_quantdsl_app_singleton = None


Expand All @@ -35,17 +34,15 @@ def close_quant_dsl_app_for_celery_worker():
_quantdsl_app_singleton = None



@celery_app.task
def celery_evaluate_call(dependency_graph_id, contract_valuation_id, call_id):

def celery_evaluate_call(contract_specification_id, contract_valuation_id, call_id):
quantdsl_app = get_quant_dsl_app_for_celery_worker()

assert isinstance(quantdsl_app, QuantDslApplication)

quantdsl_app.evaluate_call_and_queue_next_calls(
contract_valuation_id=contract_valuation_id,
dependency_graph_id=dependency_graph_id,
contract_specification_id=contract_specification_id,
call_id=call_id,
lock=FileLock('/tmp/quantdsl-results-lock'),
)
Expand Down

0 comments on commit 0e505b6

Please sign in to comment.