Skip to content

Commit

Permalink
MAINT: Group events by type before processing.
Browse files Browse the repository at this point in the history
Make the ordering in which processing of event types both explicit and
independent of the sort ordering of the incoming sources.

The overhead of creating the list per snapshot and the iterators appears
to be marginal in the minute data case when tested locally.

This patch is intended as part of the path towards making the trade
simulation loop not depend on consuming and tracking every trade event.
The timing of where last_sale_date was needed to be changed was proving
difficult to adapt in the previous model.

Should also allow the removal of sorting of the various source streams.
  • Loading branch information
Eddie Hebert committed May 20, 2015
1 parent bf3cb6e commit 862cfbb
Showing 1 changed file with 76 additions and 30 deletions.
106 changes: 76 additions & 30 deletions zipline/gens/tradesimulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from logbook import Logger, Processor
from pandas.tslib import normalize_date

Expand Down Expand Up @@ -206,46 +207,91 @@ def _process_snapshot(self, dt, snapshot, instant_fill):
blotter_process_trade = self.algo.blotter.process_trade
blotter_process_benchmark = self.algo.blotter.process_benchmark

for event in snapshot:
# Containers for the snapshotted events, so that the events are
# processed in a predictable order, without relying on the sorted order
# of the individual sources.

if event.type == DATASOURCE_TYPE.TRADE:
self.update_universe(event)
any_trade_occurred = True
if instant_fill:
events_to_be_processed.append(event)
else:
for txn, order in blotter_process_trade(event):
if txn.type == DATASOURCE_TYPE.TRANSACTION:
perf_process_transaction(txn)
elif txn.type == DATASOURCE_TYPE.COMMISSION:
perf_process_commission(txn)
perf_process_order(order)
perf_process_trade(event)
# There is only one benchmark per snapshot, will be set to the current
# benchmark iff it occurs.
benchmark = None
# trades and customs are initialized as a list since process_snapshot
# is most often called on market bars, which could contain trades or
# custom events.
trades = []
customs = []

# splits and dividends are processed once a day.
#
# The avoidance of creating the list every time this is called is more
# to attempt to show that this is the infrequent case of the method,
# since the performance benefit from deferring the list allocation is
# marginal. splits list will be allocated when a split occurs in the
# snapshot.
splits = None
# dividends list will be allocated when a dividend occurs in the
# snapshot.
dividends = None

for event in snapshot:
if event.type == DATASOURCE_TYPE.TRADE:
trades.append(event)
elif event.type == DATASOURCE_TYPE.BENCHMARK:
benchmark_event_occurred = True
perf_process_benchmark(event)
for txn, order in blotter_process_benchmark(event):
benchmark = event
elif event.type == DATASOURCE_TYPE.SPLIT:
if splits is None:
splits = []
splits.append(event)
elif event.type == DATASOURCE_TYPE.CUSTOM:
customs.append(event)
elif event.type == DATASOURCE_TYPE.DIVIDEND:
if dividends is None:
dividends = []
dividends.append(event)
else:
raise log.warn("Unrecognized event=%s".format(event))

# Handle benchmark first.
#
# Internal broker implementation depends on the benchmark being
# processed first so that transactions and commissions reported from
# the broker can be injected.
if benchmark is not None:
benchmark_event_occurred = True
perf_process_benchmark(benchmark)
for txn, order in blotter_process_benchmark(benchmark):
if txn.type == DATASOURCE_TYPE.TRANSACTION:
perf_process_transaction(txn)
elif txn.type == DATASOURCE_TYPE.COMMISSION:
perf_process_commission(txn)
perf_process_order(order)

for trade in trades:
self.update_universe(trade)
any_trade_occurred = True
if instant_fill:
events_to_be_processed.append(trade)
else:
for txn, order in blotter_process_trade(trade):
if txn.type == DATASOURCE_TYPE.TRANSACTION:
perf_process_transaction(txn)
elif txn.type == DATASOURCE_TYPE.COMMISSION:
perf_process_commission(txn)
perf_process_order(order)
perf_process_trade(trade)

elif event.type == DATASOURCE_TYPE.CUSTOM:
self.update_universe(event)
for custom in customs:
self.update_universe(custom)

elif event.type == DATASOURCE_TYPE.SPLIT:
if splits is not None:
for split in splits:
# process_split is not assigned to a variable since it is
# called rarely compared to the other event processors.
self.algo.blotter.process_split(event)
perf_process_split(event)

elif event.type == DATASOURCE_TYPE.DIVIDEND:
perf_process_dividend(event)
self.algo.blotter.process_split(split)
perf_process_split(split)

else:
raise log.warn("Unrecognized event=%s".format(event))
if dividends is not None:
for dividend in dividends:
perf_process_dividend(dividend)

if any_trade_occurred:
new_orders = self._call_handle_data()
Expand All @@ -256,13 +302,13 @@ def _process_snapshot(self, dt, snapshot, instant_fill):
# Now that handle_data has been called and orders have been placed,
# process the event stream to fill user orders based on the events
# from this snapshot.
for event in events_to_be_processed:
for txn, order in blotter_process_trade(event):
for trade in events_to_be_processed:
for txn, order in blotter_process_trade(trade):
if txn is not None:
perf_process_transaction(txn)
if order is not None:
perf_process_order(order)
perf_process_trade(event)
perf_process_trade(trade)

if benchmark_event_occurred:
return self.get_message(dt)
Expand Down

0 comments on commit 862cfbb

Please sign in to comment.