Permalink
Browse files

Enable cuncurrent execution using Coro.

  • Loading branch information...
1 parent 947f741 commit bac788552c5d6ca66d972334e03630567b0c9141 @kentaro committed Feb 27, 2012
Showing with 174 additions and 72 deletions.
  1. +2 −0 Build.PL
  2. +1 −22 bin/cinnamon
  3. +26 −7 lib/Cinnamon.pm
  4. +44 −8 lib/Cinnamon/Config.pm
  5. +35 −0 lib/Cinnamon/Config/Loader.pm
  6. +12 −23 lib/Cinnamon/DSL.pm
  7. +8 −4 lib/Cinnamon/Local.pm
  8. +6 −5 lib/Cinnamon/Remote.pm
  9. +40 −3 lib/Cinnamon/Runner.pm
View
@@ -19,6 +19,8 @@ my $build = Module::Build->new(
'IPC::Run',
'Net::OpenSSH',
'Log::Dispatch',
+ 'Class::Load',
+ 'Coro',
},
no_index => { 'directory' => [ 'inc' ] },
View
@@ -1,27 +1,6 @@
#!/usr/bin/env perl
use strict;
use warnings;
-
use Cinnamon;
-use Cinnamon::Logger;
-
-my ($role, $task) = (shift, shift);
-my @args = @ARGV;
-my $config = 'config/deploy.pl';
-
-if (@args && -e $args[-1]) {
- $config = pop @args;
-}
-
-if (!-e $config) {
- log error => 'usage: cinnamon $role $task [@args $config]';
- exit 1;
-}
-
-do $config;
-if ($@) {
- log error => $@;
- exit 1;
-}
-Cinnamon->run($role, $task, @args);
+Cinnamon->run(@ARGV);
View
@@ -5,18 +5,37 @@ use 5.008008;
our $VERSION = '0.01';
+use Class::Load ();
+
use Cinnamon::Config;
use Cinnamon::Runner;
+use Cinnamon::Logger;
sub run {
- my ($class, $role, $task, @args) = @_;
-
- Cinnamon::Config::set role => $role;
- Cinnamon::Config::set task => $task;
-
- for my $host (@{Cinnamon::Config::get_role || []}) {
- Cinnamon::Runner->start($host, @args);
+ my $class = shift;
+ my @args = Cinnamon::Config::load @_;
+ my $hosts = Cinnamon::Config::get_role || [];
+ my $runner = Cinnamon::Config::get('runner_class') || 'Cinnamon::Runner';
+
+ Class::Load::load_class $runner;
+
+ my $result = $runner->start($hosts, @args);
+ my (@success, @error);
+
+ for my $key (keys %{$result || {}}) {
+ if ($result->{$key}->{error}) {
+ push @error, $key;
+ }
+ else {
+ push @success, $key;
+ }
}
+
+ log info => sprintf(
+ "\n========================\n[success]: %s\n[error]: %s",
+ (join(', ', @success) || ''),
+ (join(', ', @error) || ''),
+ );
}
!!1;
@@ -2,46 +2,73 @@ package Cinnamon::Config;
use strict;
use warnings;
+use Coro;
+use Cinnamon::Config::Loader;
+
my %CONFIG;
my %ROLES;
my %TASKS;
+my $lock = new Coro::RWLock;
+
sub set ($$) {
my ($key, $value) = @_;
+
+ $lock->wrlock;
$CONFIG{$key} = $value;
+ $lock->unlock;
}
-sub get ($) {
- my ($key) = @_;
- $CONFIG{$key};
+sub get ($@) {
+ my ($key, @args) = @_;
+
+ $lock->rdlock;
+ my $value = $CONFIG{$key};
+ $lock->unlock;
+
+ $value = $value->(@args) if ref $value eq 'CODE';
+ $value;
}
sub set_role ($$) {
my ($role, $hosts) = @_;
+
+ $lock->wrlock;
$ROLES{$role} = $hosts;
+ $lock->unlock;
}
sub get_role (@) {
- my $role = ($_[0] || get 'role') or die "no role";
+ my $role = ($_[0] || get('role')) or die "no role";
+
+ $lock->rdlock;
my $hosts = $ROLES{$role};
- $hosts = $hosts->() if ref $hosts eq 'CODE';
+ $lock->unlock;
+ $hosts = $hosts->() if ref $hosts eq 'CODE';
ref $hosts eq 'ARRAY' ? $hosts : [$hosts];
}
sub set_task ($$$) {
my ($role, $task, $code) = @_;
$TASKS{$role} ||= {};
+
+ $lock->wrlock;
$TASKS{$role}->{$task} = $code;
+ $lock->unlock;
}
sub get_task (@) {
my ($role, $task) = @_;
- $role ||= get 'role' or die "no role";
- $task ||= get 'task' or die "no task";
+ $role ||= get('role') or die "no role";
+ $task ||= get('task') or die "no task";
+
+ $lock->rdlock;
+ my $value = $TASKS{$role}->{$task};
+ $lock->unlock;
- $TASKS{$role}->{$task};
+ $value;
}
sub user () {
@@ -52,4 +79,13 @@ sub user () {
};
}
+sub load (@) {
+ my ($role, $task) = (shift, shift);
+
+ set role => $role;
+ set task => $task;
+
+ Cinnamon::Config::Loader->load(@_);
+}
+
!!1;
@@ -0,0 +1,35 @@
+package Cinnamon::Config::Loader;
+use strict;
+use warnings;
+
+use Cinnamon::Logger;
+
+sub load {
+ my ($class, @args) = @_;
+ my $config = 'config/deploy.pl';
+
+ if (@args && -e $args[-1]) {
+ $config = pop @args;
+ }
+
+ if (!-e $config) {
+ log error => 'usage: cinnamon $role $task [@args $config]';
+ exit 1;
+ }
+
+ do $config;
+
+ if ($@) {
+ log error => $@;
+ exit 1;
+ }
+
+ if ($!) {
+ log error => $!;
+ exit 1;
+ }
+
+ @args;
+}
+
+!!1;
View
@@ -26,9 +26,7 @@ sub set ($$) {
sub get ($@) {
my ($name, @args) = @_;
- my $value = Cinnamon::Config::get $name;
- $value = $value->(@args) if ref $value eq 'CODE';
- $value;
+ Cinnamon::Config::get $name, @args;
}
sub role ($$) {
@@ -59,36 +57,27 @@ sub run (@) {
my (@cmd) = @_;
my ($stdout, $stderr);
my $host;
+ my $result;
if (ref $_ eq 'Cinnamon::Remote') {
- $host = $_->host;
- ($stdout, $stderr) = eval { $_->execute(@cmd) };
+ $host = $_->host;
+ $result = $_->execute(@cmd);
}
else {
- $host = 'localhost';
- ($stdout, $stderr) = eval { Cinnamon::Local->execute(@cmd) };
+ $host = 'localhost';
+ $result = Cinnamon::Local->execute(@cmd);
}
- if ($@) {
- my $message = sprintf "[%s] %s\n%s", $host, $@, join(' ', @cmd);
- log error => $message;
- exit 1;
+ if ($result->{has_error}) {
+ my $message = sprintf "%s: %s", $host, $result->{error}, join(' ', @cmd);
+ die $message;
}
else {
- my $message = sprintf "[%s] %s", $host, join(' ', @cmd);
- log info => $message;
- }
+ my $message = sprintf "[%s] %s: %s",
+ $host, join(' ', @cmd), ($result->{stdout} || $result->{stderr});
- if ($stdout) {
- chomp $stdout;
- print " STDOUT: $stdout\n";
- }
- if ($stderr) {
- chomp $stderr;
- print " STDERR: $stderr\n";
+ log info => $message;
}
-
- ($stdout, $stderr);
}
sub sudo (@) {
View
@@ -6,11 +6,15 @@ use IPC::Run ();
sub execute {
my ($class, @cmd) = @_;
- my $result = IPC::Run::run \@cmd, undef, \my $stdout, \my $stderr;
+ my $result = IPC::Run::run \@cmd, \my $stdin, \my $stdout, \my $stderr;
+ chomp for ($stdout, $stderr);
- Carp::croak $? if !$result;
-
- ($stdout, $stderr);
+ +{
+ stdout => $stdout,
+ stderr => $stderr,
+ has_error => !$result,
+ error => $?,
+ };
}
!!1;
@@ -1,7 +1,6 @@
package Cinnamon::Remote;
use strict;
use warnings;
-use Carp ();
use Net::OpenSSH;
sub new {
@@ -22,10 +21,12 @@ sub execute {
my ($self, @cmd) = @_;
my ($stdout, $stderr) = $self->connection->capture2(join(' ', @cmd));
- Carp::croak $self->connection->error
- if $self->connection->error;
-
- ($stdout, $stderr);
+ +{
+ stdout => $stdout,
+ stderr => $stderr,
+ has_error => !$self->connection->error,
+ error => $self->connection->error,
+ };
}
sub DESTROY {
@@ -2,12 +2,49 @@ package Cinnamon::Runner;
use strict;
use warnings;
+use Coro;
+
+use Cinnamon::Logger;
use Cinnamon::Config;
sub start {
- my ($class, $host, @args) = @_;
- my $task = Cinnamon::Config::get_task;
- $task->($host, @args);
+ my ($class, $hosts, @args) = @_;
+ my $task = Cinnamon::Config::get_task;
+ my $concurrency = Cinnamon::Config::get('concurrency') || 1;
+
+ my %result;
+ my @workers;
+ for my $job (@{$hosts || []}) {
+ my $i = scalar(@$hosts) % $concurrency;
+
+ $workers[$i] ||= [];
+ push @{$workers[$i]}, $job;
+
+ $result{$job} = +{ error => 0 };
+ }
+
+ my @coros;
+ for my $jobs (@workers) {
+ push @coros, async {
+ for my $job (@$jobs) {
+ eval { $task->($job, @args) };
+
+ if ($@) {
+ chomp $@;
+ log error => sprintf '[%s] %s', $job, $@;
+ $result{$job}->{error}++ ;
+ }
+ }
+ };
+ }
+
+ $_->join for @coros;
+ \%result;
+}
+
+sub execute {
+ my ($class, $host, $task, @args) = @_;
+ $task->($host, @args);
}
!!1;

0 comments on commit bac7885

Please sign in to comment.