Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: tradespring/TradeSpring
base: e8c71108ca
...
head fork: tradespring/TradeSpring
compare: 66824ac88e
Checking mergeability… Don't worry, you can still create the pull request.
  • 12 commits
  • 8 files changed
  • 0 commit comments
  • 1 contributor
View
192 bin/livespring
@@ -4,10 +4,11 @@ use strict;
use AnyMQ;
use AnyEvent;
use YAML::Syck qw(LoadFile);
-use Getopt::Long qw(:config pass_through);
+use Getopt::Long qw(GetOptionsFromArray :config pass_through);
+
use Log::Log4perl;
use TradeSpring;
-use TradeSpring::Util qw(broker_args_from_spec init_quote local_broker);
+use TradeSpring::Util qw(broker_args_from_spec init_quote_simple init_quote_history local_broker);
my $test_broker;
my $logconf;
@@ -66,76 +67,125 @@ if ($ps) {
$logger->info( $ps_object->info );
}
-my $localbus = AnyMQ->new;
-my $bar = $localbus->topic('agbar');
-init_quote( bus => $bus,
- node => $quote->{node},
- loadcnt => $quote->{loadcnt},
- tf => $deployment->{timeframe},
- code => $deployment->{code},
- on_load => sub {
- my ($session, $calc) = @_;
- my $broker_client = $bus->new_listener($bus->topic($session->{tick_channel}));
- $broker_client->on_error(sub { $logger->fatal(join(',',@_)) } );
- $broker_client->poll(sub {
- my $msg = shift;
- $broker->on_price($msg->{price}, $msg->{volume}, { timestamp => $msg->{timestamp} } );
- });
- TradeSpring::init_terminal($bus, $session, $calc, $deployment->{timeframe});
- my $end = $session->{session_end};
- my $start = $session->{session_start};
- if ($init_cb) {
- if ($start - 450 > AnyEvent->time) {
- my $w; $w = AnyEvent->timer(
- after => $start - 450 - AnyEvent->time,
- cb => sub {
- $init_cb->($session);
- undef $w;
- });
- }
- else {
- $init_cb->($session);
- }
- }
- for my $strategy_spec (@{$deployment->{strategy}}) {
- my ($strategy_name, @args) = split(/\s+/, $strategy_spec);
- local @ARGV = (@args, '--contract_code', $deployment->{code});
-
- local $_;
- my $strategy = TradeSpring::load_strategy($strategy_name, $calc, $broker);
- if ($ps_object) {
- $strategy->ps($ps_object);
- $strategy->ps_store($ps_store);
- $strategy->cost($ps->{cost})
- if $ps->{cost};
- }
- my $client = $localbus->new_listener($bar);
- $client->on_error(sub {
- $logger->fatal(join(',',@_));
- });
- $client->poll(sub {
- $strategy->i($calc->prices->count-1);
- $strategy->run();
- });
-
- TradeSpring::pre_run_strategy($session, $strategy) if $deployment->{daytrade};
- if ($end > AnyEvent->time) {
- my $w; $w = AnyEvent->timer(
- after => $end - AnyEvent->time,
- cb => sub {
- $strategy->end;
- undef $w;
- });
- }
-
-
- }
- },
- on_bar => sub {
+sub init_broker_quote {
+ my $session = shift;
+ my $broker_client = $bus->new_listener($bus->topic($session->{tick_channel}));
+ $broker_client->on_error(sub { $logger->fatal(join(',',@_)) } );
+ $broker_client->poll(sub {
my $msg = shift;
- $bar->publish($msg);
- }
- );
+ $broker->on_price($msg->{price}, $msg->{volume}, { timestamp => $msg->{timestamp} } );
+ });
+}
+
+sub invoke_init_cb {
+ my ($when, $session, $init_cb) = @_;
+ if ($when > AnyEvent->time) {
+ my $w; $w = AnyEvent->timer(
+ after => $when - AnyEvent->time,
+ cb => sub {
+ $init_cb->($session);
+ undef $w;
+ });
+ }
+ else {
+ $init_cb->($session);
+ }
+}
+
+my $localbus = AnyMQ->new;
+
+sub init_strategy {
+ my ($strategy_name, $session, $calc, $broker, $ps_object, $bar, $args) = @_;
+ local @ARGV = (@$args, '--contract_code', $deployment->{code});
+
+ local $_;
+ my $strategy = TradeSpring::load_strategy($strategy_name, $calc, $broker);
+ if ($ps_object) {
+ $strategy->ps($ps_object);
+ $strategy->ps_store($ps_store);
+ $strategy->cost($ps->{cost})
+ if $ps->{cost};
+ }
+ my $client = $localbus->new_listener($bar);
+ $client->on_error(sub {
+ $logger->fatal(join(',',@_));
+ });
+ $client->poll(sub {
+ $strategy->i($calc->prices->count-1);
+ $strategy->run();
+ });
+
+ TradeSpring::pre_run_strategy($session, $strategy) if $deployment->{daytrade};
+ my $end = $session->{session_end} + 10;
+ if ($end > AnyEvent->time) {
+ my $w; $w = AnyEvent->timer(
+ after => $end - AnyEvent->time,
+ cb => sub {
+ $strategy->end;
+ undef $w;
+ });
+ }
+}
+
+my %by_tf;
+for my $strategy_spec (@{$deployment->{strategy}}) {
+ my ($strategy_name, @args) = split(/\s+/, $strategy_spec);
+ my %entry = ( timeframe => $deployment->{timeframe},
+ loadcnt => $quote->{loadcnt},
+ name => $strategy_name,
+ args => \@args,
+ );
+ eval {
+ GetOptionsFromArray(\@args,
+ "timeframe=s" => \$entry{timeframe},
+ "loadcnt=i" => \$entry{loadcnt})
+ } or die "incorrect strategy spec '$strategy_spec': $@";
+
+ push @{$by_tf{$entry{timeframe}} ||= []}, \%entry;
+}
+
+sub init_strategy_by_tf {
+ my ($session, $tf, $cb) = @_;
+ my $bar = $localbus->topic("agbar/$tf");
+
+ init_quote_history(
+ $bus->topic({name => 'pagmctrl.'.$quote->{node}}),
+ $session, $tf,
+ code => $deployment->{code},
+ loadcnt => $quote->{loadcnt},
+ on_load => sub {
+ my $calc = shift;
+ for my $strategy_entry (@{$by_tf{$tf}}) {
+ init_strategy($strategy_entry->{name}, $session, $calc, $broker, $ps_object, $bar, $strategy_entry->{args});
+ }
+ $cb->($calc) if $cb;
+ },
+ on_bar => sub {
+ my $msg = shift;
+ $bar->publish($msg);
+ });
+}
+
+init_quote_simple(
+ bus => $bus,
+ node => $quote->{node},
+ code => $deployment->{code},
+ on_load => sub {
+ my ($session) = @_;
+ init_broker_quote($session);
+ invoke_init_cb($session->{session_start} - 450, $session, $init_cb)
+ if $init_cb;
+
+ for my $tf (keys %by_tf) {
+ init_strategy_by_tf($session, $tf,
+ $deployment->{timeframe} && $deployment->{timeframe} eq $tf
+ ? sub {
+ my $calc = shift;
+ TradeSpring::init_terminal($bus, $session, $calc, $tf);
+ } : ());
+ }
+ }
+);
$runloop->(__FILE__, $ready_cv) if $runloop;
exit $quit->recv;
View
7 lib/TradeSpring/BracketOrder.pm
@@ -45,15 +45,14 @@ method new_bracket_order ($entry_order, $stp, $tp, %args) {
my $p = TradeSpring::Position->new(
broker => $self->broker, %args,
on_entry => sub {
- my ($pos, $price, $qty) = @_;
- $self->format_order($entry_order, $price, $qty);
+ my ($pos, $price, $qty, $o) = @_;
+ $self->format_order($o->{order}, $price, $qty);
$self->fill_position($pos->direction, $price, $qty, $submit_i,
$entry_annotation->());
$on_entry->(@_) if $on_entry;
},
on_exit => sub {
- my ($pos, $type, $price, $qty) = @_;
- my $o = $self->broker->get_order( $pos->exit_id_map->{$type} );
+ my ($pos, $type, $price, $qty, $o) = @_;
$self->format_order($o->{order}, $price, $qty);
$self->fill_position($pos->direction*-1, $price, $qty, $self->i, exit_type => $type);
$on_exit->(@_) if $on_exit;
View
2  lib/TradeSpring/Config.pm
@@ -132,7 +132,7 @@ method load_synth_broker($contract, $config, $deployment) {
require TradeSpring::Broker::Partition;
(TradeSpring::Broker::Partition->new_with_traits
( backends => $backends,
- traits => ['Position', 'Stop', 'Timed', 'Update', 'Attached', 'OCA'],
+ traits => ['Position', 'Stop', 'Timed', 'Attached', 'OCA'],
), @loops);
}
View
29 lib/TradeSpring/FSA.pm
@@ -57,7 +57,10 @@ method new_fsa($dir, $price, $qty, $stp_price) {
type => 'stp',
qty => $qty,
};
+ $self->new_raw_fsa($order, $stp_price);
+}
+method new_raw_fsa($order, $stp_price) {
$self->new_fsa2(
{ submit => {
do => sub {
@@ -77,8 +80,7 @@ method new_fsa2($conditions, $stp_price, $on_enter) {
pending => {
do => sub {
my $state = shift;
- my $order = $state->notes('order');#, undef);
- $state->notes('order', undef);
+ my $order = $state->notes('order');
my $dir = $order->{dir};
$state->notes('dir', $dir);
$state->notes('qty', $order->{qty});
@@ -86,7 +88,7 @@ method new_fsa2($conditions, $stp_price, $on_enter) {
my $submit_i = $self->i;
my $id = $self->broker->register_order(
- $order,
+ { %$order },
on_match => sub {
my ($price, $qty) = @_;
$self->debug("matched!");
@@ -98,20 +100,20 @@ method new_fsa2($conditions, $stp_price, $on_enter) {
on_error => sub {
},
on_summary => sub {
- my $id = $state->notes('order');
+ my $id = $state->notes('order_id');
if ($_[0]) {
my $o = $self->broker->get_order($id);
$state->result($_[0]);
- $self->format_order($o->{order}, $state->notes('order_price'), $_[0]);
+ $self->format_order($o->{order}, $o->{order}{price}, $_[0], $state->notes('order_price'));
$self->log('TradeSpring.Position')->info("position entered: ($o->{order}{dir}) $o->{order}{price} x $_[0] @ $o->{last_fill_time}");
$state->notes(submit_i => $submit_i);
$state->notes(entry_price =>$o->{order}{price});
my $new = $state->machine->try_switch();
}
});
- $state->notes(order => $id);
+ $state->notes(order_id => $id);
$state->notes(stp_price => $stp_price) if $stp_price;
- my $default_stp = $state->notes('order_price') * ( 1 - $self->initial_stp * $dir);
+ my $default_stp = $self->initial_stp_price($dir, $state->notes('order_price'));
my $stp = $state->notes('stp_price');
$state->notes(stp_price => $default_stp)
if !$stp || $default_stp * $dir > $stp * $dir;
@@ -171,7 +173,7 @@ around order_annotation => sub {
method initial_stp { 0.005 };
method _submit_exit_order($type, $order, $state) {
- my $entry_id = $state->notes('order');
+ my $entry_id = $state->notes('order_id');
$order->{oca_group} = $entry_id;
$state->notes('exit_id_map', {}) unless $state->notes('exit_id_map');
my $exit_id_map = $state->notes('exit_id_map');
@@ -197,7 +199,7 @@ method _submit_exit_order($type, $order, $state) {
on_summary => sub {
my $o = $self->broker->get_order($id);
if ($_[0]) {
- $self->format_order($o->{order}, $order->{price} || $self->broker->{last_price}, $_[0]);
+ $self->format_order($o->{order}, $o->{order}{price}, $_[0]);
$self->log('TradeSpring.Position')->info("position exited: ($o->{order}{dir}) $o->{order}{price} x $_[0] @ $o->{last_fill_time}");
$self->fill_position($o->{order}{dir}, $o->{order}{price}, $_[0], $self->i, exit_type => $type, $self->exit_map($state));
$state->machine->curr_state->result($_[0]);
@@ -215,11 +217,12 @@ method exit_map($state) { return () }
method load_from_state($state) {
for my $entry (@$state) {
my $notes = $entry->{notes};
- my $fsa = $self->new_fsa($notes->{dir}, $notes->{price},
- $notes->{qty}, $notes->{stp_price});
+ my $fsa = $self->new_raw_fsa($notes->{order});
%{ $fsa->notes } = %{ $entry->{notes} };
$fsa->curr_state($entry->{curr_state});
push @{$self->fsa}, $fsa;
+ $fsa->try_switch();
+
}
}
@@ -234,8 +237,8 @@ after 'end' => sub {
for my $f (@{ $self->fsa } ) {
if ($f->at('pending')) {
- $self->broker->cancel_order( $f->notes('order'), sub {
- $self->debug("order @{[ $f->notes('order') ]} cancelled: ".join(',', @_) );
+ $self->broker->cancel_order( $f->notes('order_id'), sub {
+ $self->debug("order @{[ $f->notes('order_id') ]} cancelled: ".join(',', @_) );
});
}
elsif ($f->at('entered')) {
View
7 lib/TradeSpring/OrderReport.pm
@@ -17,15 +17,16 @@ method _build_order_report_fh {
return $fh;
}
-method format_order($order, $price, $qty) {
+method format_order($order, $filled_price, $qty, $order_price) {
return unless $self->order_report_fh;
+ $order_price ||= $order->{orig_order} ? $order->{orig_order}{price} : $self->broker->{last_price};
syswrite $self->order_report_fh,
join(',',
$self->date,
$order->{dir},
$qty,
- $order->{price} || $self->broker->{last_price},
- $price,
+ $order_price,
+ $filled_price,
0, # triggering time
0, # submission time
AnyEvent->now, # report time
View
4 lib/TradeSpring/Position.pm
@@ -53,7 +53,7 @@ method _submit_exit_order($type, $order) {
my $o = $self->broker->get_order($id);
$self->status('exited');
if ($_[0]) {
- $self->on_exit->($self, $type, $o->{order}{price}, $_[0]);
+ $self->on_exit->($self, $type, $o->{order}{price}, $_[0], $o);
$self->log->info("position exited: ($o->{order}{dir}) $o->{order}{price} x $_[0] @ $o->{last_fill_time}");
}
});
@@ -105,7 +105,7 @@ method create ($entry, $stp, $tp) {
if ($_[0]) {
my $o = $self->broker->get_order($self->entry_id);
$self->status('entered');
- $self->on_entry->($self, $o->{order}{price}, $_[0]);
+ $self->on_entry->($self, $o->{order}{price}, $_[0], $o);
$self->log->info("position entered: ($o->{order}{dir}) $o->{order}{price} x $_[0] @ $o->{last_fill_time}");
}
}));
View
10 lib/TradeSpring/Strategy.pm
@@ -4,6 +4,7 @@ use DateTime;
use Method::Signatures::Simple;
use MooseX::ClassAttribute;
use TradeSpring::Position;
+use POSIX qw(ceil floor);
use List::Util qw(sum);
@@ -34,6 +35,7 @@ has ps => (is => "rw");
has cost => (is => "rw", isa => 'Num', default => sub { 0 });
+has initial_stp => (is => "rw", isa => "Num", default => sub { 0.01 });
method BUILD {
if (my $class = $self->ps_class) {
@@ -55,6 +57,14 @@ method BUILD {
method load($prev, $first, $last) {
}
+method dir_round($dir, $price) {
+ $dir > 0 ? ceil($price) : floor($price);
+}
+
+method initial_stp_price($dir, $price) {
+ $self->dir_round(-$dir, $price * ( 1 - $self->initial_stp * $dir));
+}
+
method get_position_qty($r) {
return $self->position_qty unless $self->ps;
View
90 lib/TradeSpring/Util.pm
@@ -8,6 +8,9 @@ our @EXPORT = our @EXPORT_OK =
qw(parse_broker_spec
broker_args_from_spec
init_quote
+ init_quote_simple
+ init_quote_history
+ init_quote_with_history
local_broker
);
@@ -47,7 +50,94 @@ sub broker_args_from_spec {
%args );
}
+sub init_quote_simple {
+ my %args = @_;
+ my $bus = $args{bus};
+
+ my $myself = $bus->topic("livespring-$$");
+ my $client = $bus->new_listener($myself);
+ $client->on_error(sub {
+ logger->fatal(join(',',@_));
+ });
+
+ my $pagm = $args{pagm} || $bus->topic({name => 'pagmctrl.'.$args{node}});
+
+ my $session;
+ $client->poll(
+ sub {
+ my $msg = shift;
+
+ if ($msg->{type} eq 'pagm.session') {
+ $session = $msg;
+ $args{on_load}->($session);
+ undef $client;
+ }
+ else {
+ logger->error("unhandled message: ".Dumper($msg)); use Data::Dumper;
+ }
+ return 1;
+ });
+
+ $pagm->publish({ type => 'pagm.session',
+ code => $args{code},
+ reply => $myself->name });
+ return $pagm;
+}
+
+sub init_quote_with_history {
+ my %args = @_;
+ my $on_load = delete $args{on_load};
+ my $bus = $args{bus};
+ my $pagm = $args{pagm} || $bus->topic({name => 'pagmctrl.'.$args{node}});
+ init_quote_simple(
+ %args,
+ on_load => sub {
+ my $session = shift;
+ my $tf = $args{tf};
+ init_quote_history($pagm, $session, $tf,
+ %args, on_load => sub { $on_load->($session, @_) } );
+ });
+}
+
+sub init_quote_history {
+ my ($pagm, $session, $tf, %args) = @_;
+ my $bus = $pagm->bus;
+ my $myself = $bus->topic("livespring-$$-$tf");
+ my $client = $bus->new_listener($myself);
+ my $calc;
+ $client->on_error(sub {
+ logger->fatal(join(',',@_));
+ });
+ $client->poll(
+ sub {
+ my $msg = shift;
+ if ($msg->{type} eq 'history') {
+ my $prices = $msg->{prices};
+ logger->info("loaded ".(scalar @{$prices})." items for $args{code}/$tf from pagm: $prices->[0][5] - $prices->[-1][5]");
+ my $p = Finance::GeniusTrader::Prices->new;
+ my $timeframe = Finance::GeniusTrader::DateTime::name_to_timeframe($tf);
+ $p->{prices} = $prices;
+ $p->set_timeframe($timeframe);
+ $calc = Finance::GeniusTrader::Calculator->new($p);
+ $client->subscribe($bus->topic($session->{ag_channel}.$tf));
+ $args{on_load}->($calc);
+ }
+ elsif ($msg->{type} eq 'agbar') {
+ my $prices = $msg->{data};
+ $calc->prices->add_prices($prices);
+ $args{on_bar}->();
+ }
+ else {
+ logger->error("unhandled message: ".Dumper($msg)); use Data::Dumper;
+ }
+ });
+ $pagm->publish({type => 'pagm.history', code => $args{code},
+ timeframe => $tf, count => $args{loadcnt},
+ reply => $myself->name });
+}
+
sub init_quote {
+ Carp::carp "deprecated";
my %args = @_;
my $calc;

No commit comments for this range

Something went wrong with that request. Please try again.