Skip to content

Commit

Permalink
Allow mutiple sync.zfs resources on the same src
Browse files Browse the repository at this point in the history
This can be useful to setup different schedule or recursion policy for
different targets. For example recursion when syncing to nodes, but no
recursion (ie snaps replication) when syncing to drpnodes.

Encode the rid in the tosend/sent snaps.
  • Loading branch information
cvaroqui committed Jan 20, 2018
1 parent af04d14 commit 20080d4
Showing 1 changed file with 59 additions and 28 deletions.
87 changes: 59 additions & 28 deletions lib/resSyncZfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import rcStatus
import resSync
from rcZfs import a2pool_dataset, Dataset
from rcUtilities import bdecode

class SyncZfs(resSync.Sync):
"""define zfs sync resource to be zfs send/zfs receive between nodes
Expand Down Expand Up @@ -108,13 +109,16 @@ def create_snap(self, snap):
raise ex.excError

def get_src_info(self):
self.src_snap_sent = self.src_ds + '@sent'
self.src_snap_tosend = self.src_ds + '@tosend'
self.tosend = "tosend"
self.src_snap_sent_old = self.src_ds + "@sent"
self.src_snap_tosend_old = self.src_ds + "@tosend"
self.src_snap_sent = self.src_ds + "@"+self.rid.replace("#",".") + ".sent"
self.src_snap_tosend = self.src_ds + "@"+self.rid.replace("#",".") + ".tosend"

def get_dst_info(self):
self.dst_snap_sent = self.dst_ds + '@sent'
self.dst_snap_tosend = self.dst_ds + '@tosend'
self.dst_snap_sent_old = self.dst_ds + "@sent"
self.dst_snap_tosend_old = self.dst_ds + "@tosend"
self.dst_snap_sent = self.dst_ds + "@"+self.rid.replace("#",".") + ".sent"
self.dst_snap_tosend = self.dst_ds + "@"+self.rid.replace("#",".") + ".tosend"

def get_peersenders(self):
self.peersenders = set()
Expand Down Expand Up @@ -156,7 +160,10 @@ def destroy_all_snaps(self):

def zfs_send_incremental(self, node):
if not self.snap_exists(self.src_snap_sent, node):
return self.zfs_send_initial(node)
if dataset_exists(self.src_snap_sent_old, "snapshot"):
Dataset(self.src_snap_sent_old).rename(self.src_snap_sent)
else:
return self.zfs_send_initial(node)
if self.recursive:
send_cmd = [rcEnv.syspaths.zfs, "send", "-R", "-I",
self.src_snap_sent, self.src_snap_tosend]
Expand All @@ -166,19 +173,31 @@ def zfs_send_incremental(self, node):

receive_cmd = [rcEnv.syspaths.zfs, "receive", "-dF", self.dst_pool]
if node is not None:
receive_cmd = rcEnv.rsh.strip(' -n').split() + [node] + receive_cmd
_receive_cmd = rcEnv.rsh.strip(' -n').split()
if "-q" in _receive_cmd:
_receive_cmd.remove("-q")
receive_cmd = _receive_cmd + [node] + receive_cmd

self.log.info(' '.join(send_cmd + ["|"] + receive_cmd))
p1 = Popen(send_cmd, stdout=PIPE)
p2 = Popen(receive_cmd, stdin=p1.stdout, stdout=PIPE)
buff = p2.communicate()
if p2.returncode != 0:
if buff[1] is not None and len(buff[1]) > 0:
self.log.error(buff[1])
self.log.error("sync update failed")
raise ex.excError
if buff[0] is not None and len(buff[0]) > 0:
self.log.info(buff[0])

def do_it(retry=True):
p1 = Popen(send_cmd, stdout=PIPE)
p2 = Popen(receive_cmd, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)
buff = p2.communicate()
out = bdecode(buff[0])
err = bdecode(buff[1])
if p2.returncode != 0:
if retry and self.dst_snap_sent_old in err or self.dst_snap_tosend_old:
self.force_remove_snap(self.dst_snap_sent_old, node)
self.force_remove_snap(self.dst_snap_tosend_old, node)
return do_it(retry=False)
if err is not None and len(err) > 0:
self.log.error(err)
raise ex.excError("sync update failed")
if out is not None and len(out) > 0:
self.log.info(out)

do_it()

def zfs_send_initial(self, node=None):
if self.recursive:
Expand All @@ -190,19 +209,31 @@ def zfs_send_initial(self, node=None):

receive_cmd = [rcEnv.syspaths.zfs, "receive", "-dF", self.dst_pool]
if node is not None:
receive_cmd = rcEnv.rsh.strip(' -n').split() + [node] + receive_cmd
_receive_cmd = rcEnv.rsh.strip(' -n').split()
if "-q" in _receive_cmd:
_receive_cmd.remove("-q")
receive_cmd = _receive_cmd + [node] + receive_cmd

self.log.info(' '.join(send_cmd + ["|"] + receive_cmd))
p1 = Popen(send_cmd, stdout=PIPE)
p2 = Popen(receive_cmd, stdin=p1.stdout, stdout=PIPE)
buff = p2.communicate()
if p2.returncode != 0:
if buff[1] is not None and len(buff[1]) > 0:
self.log.error(buff[1])
self.log.error("full sync failed")
raise ex.excError
if buff[0] is not None and len(buff[0]) > 0:
self.log.info(buff[0])

def do_it(retry=True):
p1 = Popen(send_cmd, stdout=PIPE)
p2 = Popen(receive_cmd, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)
buff = p2.communicate()
out = bdecode(buff[0])
err = bdecode(buff[1])
if p2.returncode != 0:
if retry and self.dst_snap_sent_old in err or self.dst_snap_tosend_old:
self.force_remove_snap(self.dst_snap_sent_old, node)
self.force_remove_snap(self.dst_snap_tosend_old, node)
return do_it(retry=False)
if err is not None and len(err) > 0:
self.log.error(err)
raise ex.excError("full sync failed")
if out is not None and len(out) > 0:
self.log.info(out)

do_it()

def force_remove_snap(self, snap, node=None):
try:
Expand Down

0 comments on commit 20080d4

Please sign in to comment.