Skip to content

Commit

Permalink
Merge branch 'master' of github.com:oetiker/znapzend
Browse files Browse the repository at this point in the history
  • Loading branch information
oetiker committed Aug 5, 2014
2 parents 8558e5b + f814cc6 commit ad2d8ac
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 13 deletions.
11 changes: 9 additions & 2 deletions bin/znapzendzetup
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ B<znapzendzetup> I<command> [I<options...>]
where 'command' is one of the following:
create [--recursive] [--mbuffer=<path>] [--mbuffersize=<size>] \
create [--recursive] [--mbuffer=<path>[:<port>]] [--mbuffersize=<size>] \
[--pre-snap-command=<command>] \
[--post-snap-command=<command>] \
[--tsformat=<format>] --donotask \
Expand All @@ -371,7 +371,7 @@ where 'command' is one of the following:
delete [--dst=key] <src_dataset>
edit [--recursive=on|off] [--mbuffer=<path>|off] [--mbuffersize=<size>] \
edit [--recursive=on|off] [--mbuffer=<path>[:<port>]|off] [--mbuffersize=<size>] \
[--pre-snap-command=<command>|off] \
[--post-snap-command=<command>|off] \
[--tsformat=<format>] --donotask \
Expand Down Expand Up @@ -466,6 +466,13 @@ If B<--tsformat> string is suffixed by a 'Z', times will be in UTC. E.g.:
Specify the path to your copy of the mbuffer utility.
=item B<--mbuffer>=I</usr/bin/mbuffer:31337>
Specifiy the path to your copy of the mbuffer utility and the port used
on the destination. Caution: znapzend will send the data directly
from source mbuffer to destination mbuffer, thus data stream is B<not>
encrypted.
=item B<--mbuffersize>=I<number>{B<b>|B<k>|B<M>|B<G>}
The size of the mbuffer can be set with the B<--mbuffersize> option. It
Expand Down
4 changes: 4 additions & 0 deletions lib/ZnapZend.pm
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ my $sendRecvCleanup = sub {
$self->zZfs->destroySnapshots($toDestroy);
}
$self->zLog->info('done with backupset ' . $backupSet->{src} . ' ' . (time - $startTime). ' seconds');

return 1;
};

my $createSnapshot = sub {
Expand Down Expand Up @@ -222,6 +224,8 @@ my $createSnapshot = sub {
system($backupSet->{post_znap_cmd})
&& $self->zLog->warn("running post snapshot command on $backupSet->{src} failed");
}

return 1;
};

my $sendWorker = sub {
Expand Down
10 changes: 8 additions & 2 deletions lib/ZnapZend/Config.pm
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,19 @@ my $checkBackupSets = sub {

# mbuffer property set? check if executable is available on remote host
if ($backupSet->{mbuffer} ne 'off'){
my ($mbuffer, $mbufferPort) = split /:/, $backupSet->{mbuffer}, 2;
my ($remote, $dataset) = $splitHostDataSet->($backupSet->{$dst});
my $file = ($remote ? "$remote:" : '') . $backupSet->{mbuffer};
my $file = ($remote ? "$remote:" : '') . $mbuffer;
$self->zfs->fileExistsAndExec($file)
or die "ERROR: executable '" . $backupSet->{mbuffer} . "' does not exist on $remote\n";
or die "ERROR: executable '" . $mbuffer . "' does not exist on $remote\n";
#check if mbuffer size is valid
$backupSet->{mbuffer_size} =~ /^\d+[bkMG%]?$/
or die "ERROR: mbuffer size '" . $backupSet->{mbuffer_size} . "' invalid\n";
#check if port is numeric
$mbufferPort && do {
$mbufferPort =~ /^\d{1,5}$/ && int($mbufferPort) < 65535
or die "ERROR: $mbufferPort not a valid port number\n";
};
}
}
#drop destination plans where destination is not given (e.g. calling create w/o a destination but a plan
Expand Down
99 changes: 90 additions & 9 deletions lib/ZnapZend/ZFS.pm
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package ZnapZend::ZFS;

use Mojo::Base -base;
use Mojo::IOLoop::ForkCall;
use POSIX qw(WNOHANG SIGTERM SIGKILL);

### attributes ###
has debug => sub { 0 };
has noaction => sub { 0 };
has nodestroy => sub { 1 };
has combinedDestroy => sub { 0 };
has sendDelay => sub { 3 };
has propertyPrefix => sub { q{org.znapzend} };
has sshCmdArray => sub { [qw(ssh -o Compression=yes -o CompressionLevel=1 -o Cipher=arcfour -o batchmode=yes)] };
has mbufferParam => sub { [qw(-q -s 128k -m)] }; #don't remove the -m as the buffer size will be added
Expand Down Expand Up @@ -242,7 +245,7 @@ sub lastAndCommonSnapshots {
my $dstDataSet = shift;
my $snapshotFilter = $_[0] || qr/.*/;

my $srcSnapshots = $self->listSnapshots($srcDataSet, $snapshotFilter);
my $srcSnapshots = $self->listSnapshots($srcDataSet, $snapshotFilter);
my $dstSnapshots = $self->listSnapshots($dstDataSet, $snapshotFilter);

return (undef, undef) if !@$srcSnapshots;
Expand All @@ -266,6 +269,7 @@ sub sendRecvSnapshots {
my $mbufferSize = shift;
my $snapFilter = $_[0] || qr/.*/;
my $remote;
my $mbufferPort;
my ($lastSnapshot, $lastCommon)
= $self->lastAndCommonSnapshots($srcDataSet, $dstDataSet, $snapFilter);

Expand All @@ -279,6 +283,7 @@ sub sendRecvSnapshots {
. "clean up destination (i.e. destroy existing snapshots on destination dataset)\n";

($remote, $dstDataSet) = $splitHostDataSet->($dstDataSet);
($mbuffer, $mbufferPort) = split /:/, $mbuffer, 2;

my @cmd;
if ($lastCommon){
Expand All @@ -288,17 +293,93 @@ sub sendRecvSnapshots {
@cmd = (['zfs', 'send', $lastSnapshot]);
}

my @mbCmd = $mbuffer ne 'off' ? ([$mbuffer, @{$self->mbufferParam}, $mbufferSize]) : () ;
my $recvCmd = ['zfs', 'recv' , '-F', $dstDataSet];

push @cmd, $self->$buildRemoteRefArray($remote, @mbCmd, $recvCmd);
#if mbuffer port is set, run in 'network mode'
if ($mbufferPort && $mbuffer ne 'off'){
my $recvPid;

my @recvCmd = $self->$buildRemoteRefArray($remote, [$mbuffer, @{$self->mbufferParam},
$mbufferSize, '-I', $mbufferPort], ['zfs', 'recv', '-F', $dstDataSet]);

my $cmd = $shellQuote->(@recvCmd);

my $fc = Mojo::IOLoop::ForkCall->new;
$fc->run(
#receive worker fork
sub {
my $cmd = shift;
my $debug = shift;
my $noaction = shift;

print STDERR "# $cmd\n" if $debug;

system($cmd) && die "ERROR: executing receive process\n" if !$noaction;
},
#arguments
[$cmd, $self->debug, $self->noaction],
#callback
sub {
my ($fc, $err) = @_;
print STDERR "# receive process on $remote done ($recvPid)\n" if $self->debug;
die $err if $err;
}
);
#spawn event
$fc->on(
spawn => sub {
my ($fc, $pid) = @_;

$recvPid = $pid;

$remote =~ s/^[^@]+\@//; #remove username if given
print STDERR "# receive process on $remote spawned ($pid)\n" if $self->debug;

push @cmd, [$mbuffer, @{$self->mbufferParam}, $mbufferSize,
'-O', "$remote:$mbufferPort"];

$cmd = $shellQuote->(@cmd);
print STDERR "# $cmd\n" if $self->debug;

#wait so remote mbuffer has enough time to start listening
sleep $self->sendDelay;

if (!$self->noaction && system($cmd)){
#command failed. check if child is alive and try to cleanup
kill SIGTERM, $pid;
sleep 1;
waitpid($pid, WNOHANG) || do {
kill SIGKILL, $pid;
sleep 1;
waitpid($pid, WNOHANG);
};
die "ERROR: cannot send snapshots to $dstDataSet"
. ($remote ? " on $remote\n" : "\n");
}
}
);
#error event
$fc->on(
error => sub {
#not yet implemented (will be in ForkCall 0.13)
my ($fc, $err) = @_;
die $err if $err;
}
);
#start forkcall event loop
$fc->ioloop->start if !$fc->ioloop->is_running;
}
else{
my @mbCmd = $mbuffer ne 'off' ? ([$mbuffer, @{$self->mbufferParam}, $mbufferSize]) : () ;
my $recvCmd = ['zfs', 'recv' , '-F', $dstDataSet];

my $cmd = $shellQuote->(@cmd);
print STDERR "# $cmd\n" if $self->debug;
push @cmd, $self->$buildRemoteRefArray($remote, @mbCmd, $recvCmd);

system($cmd) && die "ERROR: cannot send snapshots to $dstDataSet"
. ($remote ? " on $remote\n" : "\n") if !$self->noaction;
my $cmd = $shellQuote->(@cmd);
print STDERR "# $cmd\n" if $self->debug;

system($cmd) && die "ERROR: cannot send snapshots to $dstDataSet"
. ($remote ? " on $remote\n" : "\n") if !$self->noaction;
}

return 1;
}

Expand Down

0 comments on commit ad2d8ac

Please sign in to comment.