Skip to content
  • 6 commits
  • 7 files changed
  • 0 commit comments
  • 2 contributors
View
5 lib/Pogo/Object/Event.pm
@@ -24,7 +24,7 @@ sub event_forward {
my ( $self, $opts, @events ) = @_;
if ( !exists $opts->{ forward_from } ) {
- LOGDIE "Missing mandatory param 'forward_from'";
+ LOGCONFESS "Missing mandatory param 'forward_from'";
}
for my $event ( @events ) {
@@ -64,7 +64,8 @@ Pogo::Object::Event - Additional Object::Event functions
my( $self ) = @_;
my $w = Pogo::Foo::Bar->new();
- $self->event_forward( $w, qw( foo_bar_this foo_bar_that ) );
+ $self->event_forward( {forward_from => $w },
+ qw( foo_bar_this foo_bar_that ) );
}
=head1 DESCRIPTION
View
99 lib/Pogo/Scheduler/Classic.pm
@@ -6,6 +6,9 @@ use Template::Parser;
use Template::Stash;
use YAML::Syck qw(Load LoadFile);
use Pogo::Util qw( array_intersection struct_traverse );
+use Pogo::Scheduler::Thread;
+use Pogo::Scheduler::Slot;
+use Pogo::Scheduler::Task;
use base qw( Pogo::Scheduler );
###########################################
@@ -14,8 +17,9 @@ sub new {
my( $class, %options ) = @_;
my $self = {
- threads => [],
- config => {},
+ thread_by_id => {},
+ threads => [],
+ config => {},
%options,
};
@@ -63,13 +67,13 @@ sub config_load {
leaf => sub {
my( $node, $path ) = @_;
- my $slot = join ".", @$path, $node;
+ my $slot = Pogo::Scheduler::Slot->new(
+ id => join(".", @$path, $node),
+ );
push @{ $self->{ slots } }, $slot;
}
} );
- $self->{ slots_vars } = $self->{ config }->{ tag };
-
# Traverse the configuration's "tag" structure and map hosts
# to slots (host slots constitute of the full path to the leaf nodes
# in the "tag" configuration).
@@ -99,7 +103,9 @@ sub slot_setup {
for my $slot ( @{ $self->{ slots } } ) {
my @parts = ();
- while( $slot =~ /(\$[^\$]*)/g ) {
+ my $slot_id = $slot->id();
+
+ while( $slot_id =~ /(\$[^\$]*)/g ) {
my $part = $1;
$part =~ s/^\$//;
$part =~ s/\.$//;
@@ -119,7 +125,7 @@ sub slot_setup {
delete $all_hosts->{ $host };
}
- $self->{ hosts_by_slot }->{ $slot } = \@hosts;
+ $self->{ hosts_by_slot }->{ $slot->id() } = \@hosts;
}
# what's left over is unconstrained
@@ -137,12 +143,30 @@ sub thread_setup {
array => sub {
my( $sequence, $path ) = @_;
- push @{ $self->{ threads } },
- [ map { join '.', @$path, $_ } @$sequence ];
+ my $thread = Pogo::Scheduler::Thread->new();
+
+ # for quick lookup later when tasks come back
+ $self->{ thread_by_id }->{ $thread->id() } = $thread;
+
+ for my $seq ( @$sequence ) {
+ my $slot = Pogo::Scheduler::Slot->new(
+ id => join('.', @$path, $seq),
+ );
+ $thread->slot_add( $slot );
+ }
+
+ push @{ $self->{ threads } }, $thread;
}
} );
- push @{ $self->{ threads } }, ["unconstrained"];
+ my $slot = Pogo::Scheduler::Slot->new(
+ id => "unconstrained",
+ );
+
+ my $thread = Pogo::Scheduler::Thread->new();
+ $thread->slot_add( $slot );
+
+ push @{ $self->{ threads } }, $thread;
}
###########################################
@@ -152,29 +176,54 @@ sub schedule {
$hosts = [] if !defined $hosts;
- my %hosts = map { $_ => 1 } @$hosts; # for faster lookups
+ DEBUG "Scheduling hosts ",
+ join( ", ", @$hosts );
- while( scalar @{ $self->{ threads } } ) {
+ my %host_lookup = map { $_ => 1 } @$hosts; # for faster lookups
- for my $thread ( @{ $self->{ threads } } ) {
+ $self->reg_cb( "task_mark_done", sub {
+ my( $c, $task ) = @_;
- # no more slots in the thread? get rid of the thread
- if( scalar @$thread == 0 ) {
- shift @{ $self->{ threads } };
- next;
- }
+ DEBUG "Scheduler received task_mark_done for task $task";
- my $slot = $thread->[ 0 ];
+ DEBUG "Forwarding 'task_mark_done' to thread ", $task->thread_id();
+ if( exists $self->{ thread_by_id }->{ $task->thread_id() } ) {
+ $self->{ thread_by_id }->{ $task->thread_id() }->event(
+ "task_mark_done", $task );
+ } else {
+ ERROR "Received task with unknown thread id: $task";
+ }
+ } );
- # run all the runnable hosts in the slot
- $self->slot_run( $slot, \%hosts );
+ for my $thread ( @{ $self->{ threads } } ) {
- # slot done, get rid of it
- shift @$thread;
+ next if $thread->is_done();
- # to into the next round immediately (later, we'll wait
- # until slot hosts are complete)
+ $thread->reg_cb( "task_run", sub {
+ my( $c, $task ) = @_;
+
+ DEBUG "Classic running task $task";
+ $self->event( "task_run", $task );
+ } );
+
+ for my $slot ( @{ $thread->{ slots } } ) {
+ for my $host (
+ @{ $self->{ hosts_by_slot }->{ $slot->id() } } ) {
+
+ if( exists $host_lookup{ $host } ) {
+ my $task = Pogo::Scheduler::Task->new(
+ id => $host,
+ slot_id => $slot,
+ thread_id => $thread,
+ host => $host,
+ );
+ $slot->task_add( $task );
+ }
+ }
}
+
+ # start thread in parallel with other threads
+ $thread->start();
}
}
View
10 lib/Pogo/Scheduler/Slot.pm
@@ -19,6 +19,7 @@ sub new {
my($class, %options) = @_;
my $self = {
+ tasks => [],
task_by_id => {},
next_task_idx => 0,
active_task_by_id => {},
@@ -35,6 +36,8 @@ sub task_add {
###########################################
my( $self, $task ) = @_;
+ DEBUG "Adding task $task to slot $self";
+
push @{ $self->{ tasks } }, $task;
$self->{ task_by_id }->{ $task->id() } = $task;
@@ -47,6 +50,9 @@ sub start {
###########################################
my( $self ) = @_;
+ DEBUG "Starting slot $self with tasks [",
+ join( ", ", @{ $self->{ tasks } } ), "]";
+
$self->reg_cb( "task_mark_done", sub {
my( $c, $task ) = @_;
@@ -55,6 +61,7 @@ sub start {
# Schedule all tasks (will change with constraints)
while( my $task = $self->task_next() ) {
+ # nothing to do here, task_next does everything
}
}
@@ -63,7 +70,10 @@ sub task_next {
###########################################
my( $self ) = @_;
+ DEBUG "Slot $self: Determine next task";
+
if( $self->{ next_task_idx } > $#{ $self->{ tasks } } ) {
+ DEBUG "Slot $self: No more tasks";
return undef;
}
View
8 lib/Pogo/Scheduler/Task.pm
@@ -9,7 +9,7 @@ use AnyEvent::Strict;
use base qw(Pogo::Object::Event);
use Pogo::Util qw( make_accessor id_gen );
-__PACKAGE__->make_accessor( $_ ) for qw( id slot thread);
+__PACKAGE__->make_accessor( $_ ) for qw( id slot_id thread_id);
use overload ( 'fallback' => 1, '""' => 'as_string' );
@@ -19,8 +19,8 @@ sub new {
my( $class, %options ) = @_;
my $self = {
- thread => "no_thread_defined",
- slot => "no_slot_defined",
+ thread_id => "no_thread_defined",
+ slot_id => "no_slot_defined",
%options,
};
@@ -38,7 +38,7 @@ sub as_string {
###########################################
my( $self ) = @_;
- return "$self->{ id }:$self->{ slot }:$self->{ thread }";
+ return "$self->{ id }:$self->{ slot_id }:$self->{ thread_id }";
}
1;
View
23 lib/Pogo/Scheduler/Thread.pm
@@ -9,7 +9,7 @@ use AnyEvent::Strict;
use base qw(Pogo::Object::Event);
use Pogo::Util qw( make_accessor id_gen );
-__PACKAGE__->make_accessor( $_ ) for qw( id slots );
+__PACKAGE__->make_accessor( $_ ) for qw( id slots is_done);
use overload ( 'fallback' => 1, '""' => 'as_string' );
@@ -22,12 +22,21 @@ sub new {
slots => [],
next_slot_idx => 0,
active_slot => undef,
+ is_done => 0,
%options,
};
$self->{ id } = id_gen( "thread" ) if ! defined $self->{ id };
bless $self, $class;
+
+ $self->reg_cb( "task_mark_done", sub {
+ my( $c, $task ) = @_;
+
+ $self->task_mark_done( $task );
+ } );
+
+ return $self;
}
###########################################
@@ -51,6 +60,9 @@ sub kick {
DEBUG "Thread $self: Next slot is $slot";
+ $self->event_forward( { forward_from => $slot },
+ "task_run" );
+
$slot->reg_cb( "slot_done", sub {
my( $c, $slot ) = @_;
DEBUG "Thread $self received slot_done from slot $slot";
@@ -60,6 +72,7 @@ sub kick {
$slot->start();
} else {
DEBUG "No more slots, thread $self done";
+ $self->is_done( 1 );
$self->event( "thread_done" );
}
}
@@ -69,6 +82,9 @@ sub start {
###########################################
my( $self ) = @_;
+ DEBUG "Starting thread $self with slots ",
+ join( ", ", @{ $self->{ slots } } );
+
$self->kick();
}
@@ -77,11 +93,10 @@ sub slot_next {
###########################################
my( $self ) = @_;
- DEBUG "thread calls slot_next";
-
if( ! $self->slots_left() ) {
$self->{ active_slot } = undef;
$self->event( "thread_done", $self );
+ DEBUG "No more slots left in thread $self";
return undef;
}
@@ -90,8 +105,6 @@ sub slot_next {
$self->{ next_slot_idx }++;
- DEBUG "Next slot is $slot";
-
return $slot;
}
View
13 t/014Sequence.t
@@ -42,19 +42,28 @@ my $bck = Pogo::Util::Bucketeer->new(
[ qw( host4 host5 host6 ) ],
] );
+my @timers = ();
+
$scheduler->reg_cb( "task_run", sub {
my( $c, $task ) = @_;
my $host = $task->{ host };
ok $bck->item( $host ), "host $host in seq";
- # Crunch, crunch, crunch. Task done. Report back.
- $scheduler->event( "task_finished", $task );
+ my $w = AnyEvent->timer( after => 0.1, cb => sub {
+ # Crunch, crunch, crunch. Task done. Report back.
+ DEBUG "Sending task_mark_done for task $task back to scheduler";
+ $scheduler->event( "task_mark_done", $task );
+ } );
+
+ push @timers, $w;
$bck->all_done and $cv->send(); # quit
} );
+$DB::single = 1;
+
# schedule all hosts
$scheduler->schedule( [ $scheduler->config_hosts() ] );
View
4 t/015SeqAttr.t
@@ -73,15 +73,17 @@ my @queue = ();
$scheduler->reg_cb( "task_run", sub {
my( $c, $task ) = @_;
+ DEBUG "*** Scheduled host $task";
my $host = $task->{ host };
push @queue, $host;
} );
$scheduler->schedule( [ $scheduler->config_hosts() ] );
+ # hosts 2/4 need to wait
cmp_deeply( \@queue,
- [ qw(host5 host1 host3 host6 host2 host4) ], "task queue" );
+ [ qw(host5 host1 host3 host6) ], "task queue" );
__END__

No commit comments for this range

Something went wrong with that request. Please try again.