Permalink
Browse files

Initial version of parallel delivery

Parallel delivery works for omnipitr-archive.
  • Loading branch information...
1 parent 94e75e4 commit 6589edbc73aad76c92f581a32cc51b506a7de5f3 @depesz depesz committed Apr 13, 2012
Showing with 353 additions and 26 deletions.
  1. +5 −0 doc/omnipitr-archive.pod
  2. +59 −25 lib/OmniPITR/Program/Archive.pm
  3. +288 −0 lib/OmniPITR/Tools/ParallelSystem.pm
  4. +1 −1 test/test-lib/02.make.master
@@ -76,6 +76,11 @@ I<omnipitr-archive> (with this pidfile) can run at the same time.
Trying to run second copy of I<omnipitr-archive> will result in an error.
+=item --parallel-jobs (-PJ)
+
+Number of parallel jobs that I<omnipitr-archive> can spawn to deliver archives
+to remote destinations.
+
=item --verbose (-v)
Log verbosely what is happening.
@@ -13,7 +13,8 @@ use File::Spec;
use File::Path qw( mkpath rmtree );
use File::Copy;
use Storable;
-use Getopt::Long;
+use Getopt::Long qw( :config no_ignore_case );
+use OmniPITR::Tools::ParallelSystem;
=head1 run()
@@ -30,7 +31,9 @@ sub run {
$self->read_state();
$self->prepare_temp_directory( basename( $self->{ 'segment' } ) );
$self->make_all_necessary_compressions();
+ $self->log->time_start( 'Segment delivery' ) if $self->verbose;
$self->send_to_destinations();
+ $self->log->time_finish( 'Segment delivery' ) if $self->verbose;
$self->cleanup();
$self->log->log( 'Segment %s successfully sent to all destinations.', $self->{ 'segment' } );
return;
@@ -53,6 +56,31 @@ Important notice - this function has to have the ability to choose whether to us
sub send_to_destinations {
my $self = shift;
+ my $all_ok = 1;
+
+ my $handle_finish = sub {
+ my $job = shift;
+ $self->log->log( 'Sending %s to %s ended in %.6fs', $job->{ 'local_file' }, $job->{ 'destination_file_path' }, $job->{ 'ended' } - $job->{ 'started' } ) if $self->verbose;
+ if ( $job->{ 'status' } ) {
+ if ( $job->{ 'is_backup' } ) {
+ $self->log->error( "Sending segment %s to backup destination %s generated (ignored) error: %s", $job->{ 'local_file' }, $job->{ 'destination_file_path' }, $job );
+ }
+ else {
+ $self->log->error( "Cannot send segment %s to %s : %s", $job->{ 'local_file' }, $job->{ 'destination_file_path' }, $job );
+ $all_ok = 0;
+ }
+ }
+ else {
+ $self->{ 'state' }->{ 'sent' }->{ $job->{ 'destination_type' } }->{ $job->{ 'dst_path' } } = 1;
+ }
+ return;
+ };
+
+ my $runner = OmniPITR::Tools::ParallelSystem->new(
+ 'max_jobs' => $self->{ 'parallel-jobs' },
+ 'on_finish' => $handle_finish,
+ );
+
for my $destination_type ( qw( local remote ) ) {
next unless my $dst_list = $self->{ 'destination' }->{ $destination_type };
for my $dst ( @{ $dst_list } ) {
@@ -70,24 +98,25 @@ sub send_to_destinations {
$destination_file_path =~ s{/*\z}{};
$destination_file_path .= '/' . basename( $local_file );
- my $comment = 'Sending ' . $local_file . ' to ' . $destination_file_path;
-
- $self->log->time_start( $comment ) if $self->verbose;
- my $response = run_command( $self->{ 'temp-dir' }, $self->{ 'rsync-path' }, $local_file, $destination_file_path );
- $self->log->time_finish( $comment ) if $self->verbose;
-
- if ( $response->{ 'error_code' } ) {
- if ( $is_backup ) {
- $self->log->error( "Sending segment %s to backup destination %s generated (ignored) error: %s", $local_file, $destination_file_path, $response );
- }
- else {
- $self->log->fatal( "Cannot send segment %s to %s : %s", $local_file, $destination_file_path, $response );
- }
- }
- $self->{ 'state' }->{ 'sent' }->{ $destination_type }->{ $dst->{ 'path' } } = 1;
- $self->save_state();
+ $runner->add_command(
+ 'command' => [ $self->{ 'rsync-path' }, $local_file, $destination_file_path ],
+ 'is_backup' => $is_backup,
+ 'local_file' => $local_file,
+ 'destination_file_path' => $destination_file_path,
+ 'destination_type' => $destination_type,
+ 'dst_path' => $dst->{ 'path' },
+ );
}
}
+
+ $ENV{ 'TMPDIR' } = $self->{ 'temp-dir' };
+
+ $runner->run();
+
+ $self->save_state();
+
+ $self->log->fatal( 'There are fatal errors. Dying.' ) unless $all_ok;
+
return;
}
@@ -254,13 +283,14 @@ sub read_args {
my @argv_copy = @ARGV;
my %args = (
- 'data-dir' => '.',
- 'temp-dir' => $ENV{ 'TMPDIR' } || '/tmp',
- 'gzip-path' => 'gzip',
- 'bzip2-path' => 'bzip2',
- 'lzma-path' => 'lzma',
- 'rsync-path' => 'rsync',
- 'nice-path' => 'nice',
+ 'data-dir' => '.',
+ 'temp-dir' => $ENV{ 'TMPDIR' } || '/tmp',
+ 'gzip-path' => 'gzip',
+ 'bzip2-path' => 'bzip2',
+ 'lzma-path' => 'lzma',
+ 'rsync-path' => 'rsync',
+ 'nice-path' => 'nice',
+ 'parallel-jobs' => 1,
);
croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-archive.pod' )
@@ -280,14 +310,15 @@ sub read_args {
'state-dir|s=s',
'temp-dir|t=s',
'nice-path|np=s',
+ 'parallel-jobs|PJ=i',
'verbose|v',
'not-nice|nn',
);
croak( '--log was not provided - cannot continue.' ) unless $args{ 'log' };
$args{ 'log' } =~ tr/^/%/;
- for my $key ( qw( data-dir dst-backup temp-dir state-dir pid-file verbose gzip-path bzip2-path lzma-path nice-path force-data-dir rsync-path not-nice ) ) {
+ for my $key ( qw( data-dir dst-backup temp-dir state-dir pid-file verbose gzip-path bzip2-path lzma-path nice-path force-data-dir rsync-path not-nice parallel-jobs ) ) {
$self->{ $key } = $args{ $key };
}
@@ -381,6 +412,9 @@ sub validate_args {
my $file_size = ( -s $segment_file_name );
$self->log->fatal( 'Given segment (%s) has incorrect size (%u vs %u).', $segment_file_name, $file_size, $expected_size ) unless $expected_size == $file_size;
}
+ $self->log->fatal( 'Parallel jobs value not given?!' ) unless defined $self->{ 'parallel-jobs' };
+ $self->log->fatal( 'Parallel jobs is not integer (%s)', $self->{ 'parallel-jobs' } ) unless $self->{ 'parallel-jobs' } =~ m{\A\d+\z};
+ $self->log->fatal( 'Parallel jobs is not >= 1 (%s)', $self->{ 'parallel-jobs' } ) unless $self->{ 'parallel-jobs' } >= 1;
$self->{ 'segment' } = $segment_file_name;
return;
Oops, something went wrong.

0 comments on commit 6589edb

Please sign in to comment.