Skip to content

Commit

Permalink
Start embedding a manifest.yaml into backup tarballs (#2867)
Browse files Browse the repository at this point in the history
  • Loading branch information
vEpiphyte committed Oct 5, 2022
1 parent 6f3264a commit 91cef91
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 17 deletions.
79 changes: 68 additions & 11 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import os
import time
import fcntl
Expand All @@ -13,6 +14,7 @@
import contextlib
import multiprocessing

import yaml
import aiohttp
import tornado.web as t_web

Expand Down Expand Up @@ -80,7 +82,7 @@ def wrapped(*args, **kwargs):

return decrfunc

async def _doIterBackup(path, chunksize=1024):
async def _doIterBackup(path, meta, chunksize=1024):
'''
Create tarball and stream bytes.
Expand All @@ -95,7 +97,17 @@ async def _doIterBackup(path, chunksize=1024):

def dowrite(fd):
with tarfile.open(output_filename, 'w|gz', fileobj=fd) as tar:
tar.add(path, arcname=os.path.basename(path))
arcname = os.path.basename(path)
meta['backup:arcname'] = arcname + '/'
buf = yaml.safe_dump(meta, default_flow_style=False, default_style='',
explicit_start=True, explicit_end=True, sort_keys=True).encode()
tarinfo = tarfile.TarInfo(name='manifest.yaml')
tarinfo.size = len(buf)
bffd = io.BytesIO(buf)
tar.addfile(tarinfo, fileobj=bffd)
# Add the actual backup data
tar.add(path, arcname=arcname)

fd.close()

coro = s_coro.executor(dowrite, file1)
Expand All @@ -109,7 +121,7 @@ def dowrite(fd):
await coro
await link0.fini()

async def _iterBackupWork(path, linkinfo):
async def _iterBackupWork(path, linkinfo, meta):
'''
Inner setup for backup streaming.
Expand All @@ -123,12 +135,12 @@ async def _iterBackupWork(path, linkinfo):
logger.info(f'Getting backup streaming link for [{path}].')
link = await s_link.fromspawn(linkinfo)

await s_daemon.t2call(link, _doIterBackup, (path,), {})
await s_daemon.t2call(link, _doIterBackup, (path, meta), {})
await link.fini()

logger.info(f'Backup streaming for [{path}] completed.')

def _iterBackupProc(path, linkinfo):
def _iterBackupProc(path, linkinfo, meta):
'''
Multiprocessing target for streaming a backup.
'''
Expand All @@ -137,7 +149,7 @@ def _iterBackupProc(path, linkinfo):
s_common.setlogging(logger, **linkinfo.get('logconf'))

logger.info(f'Backup streaming process for [{path}] starting.')
asyncio.run(_iterBackupWork(path, linkinfo))
asyncio.run(_iterBackupWork(path, linkinfo, meta))

class CellApi(s_base.Base):

Expand Down Expand Up @@ -1796,6 +1808,13 @@ async def iterBackupArchive(self, name, user):
linkinfo = await link.getSpawnInfo()
linkinfo['logconf'] = await self._getSpawnLogConf()

cellinfo = await self.getCellInfo()
meta = {
'cell:info': cellinfo,
'backup:date': s_common.now(),
'backup:version': (0, 1, 0),
}

await self.boss.promote('backup:stream', user=user, info={'name': name})

ctx = multiprocessing.get_context('spawn')
Expand All @@ -1804,7 +1823,7 @@ async def iterBackupArchive(self, name, user):
mesg = 'Streaming complete'

def getproc():
proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo))
proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo, meta))
proc.start()
return proc

Expand Down Expand Up @@ -1870,6 +1889,13 @@ async def iterNewBackupArchive(self, user, name=None, remove=False):
logger.debug(f'Removed {path}')
raise

cellinfo = await self.getCellInfo()
meta = {
'cell:info': cellinfo,
'backup:date': s_common.now(),
'backup:version': (0, 1, 0),
}

await self.boss.promote('backup:stream', user=user, info={'name': name})

ctx = multiprocessing.get_context('spawn')
Expand All @@ -1878,7 +1904,7 @@ async def iterNewBackupArchive(self, user, name=None, remove=False):
mesg = 'Streaming complete'

def getproc():
proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo))
proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo, meta))
proc.start()
return proc

Expand Down Expand Up @@ -2671,10 +2697,41 @@ async def _initBootRestore(cls, dirn):
logger.warning(f'Extracting {tarpath} to {dirn}')

with tarfile.open(tarpath) as tgz:

# Check for manifest.yaml
mnfo = {}
try:
mbfd = tgz.extractfile('manifest.yaml')
except KeyError:
logger.warning('No manifest.yaml data found in the backup. Will be unable to confirm information about the backup.')
else:
logger.warning('Extracting manifest.yaml file from backup.')
mnfo = yaml.safe_load(mbfd.read().decode())

arcname = None

if mnfo:
cnfo = mnfo.get('cell:info')
btyp = cnfo.get('cell', {}).get('type')
etyp = cls.getCellType()
if btyp != etyp:
logger.warning(f'Backup type mismatch: Backup was made for {btyp}, current cell is {etyp}')
arcname = mnfo.get('backup:arcname') # type: str

for memb in tgz.getmembers():
if memb.name.find('/') == -1:
continue
memb.name = memb.name.split('/', 1)[1]
if arcname:
if memb.name.startswith(arcname):
memb.name = memb.name.split(arcname, 1)[1]
else:
logger.warning(f'Skipping file {memb.name}')
continue
else:
# Old convention, retained for backwards compatible restores.
if memb.name.find('/') == -1:
logger.warning(f'Skipping file {memb.name}')
continue
memb.name = memb.name.split('/', 1)[1]

logger.warning(f'Extracting {memb.name}')
tgz.extract(memb, dirn)

Expand Down
16 changes: 10 additions & 6 deletions synapse/tests/test_lib_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ def _exiterProc(pipe, srcdir, dstdir, lmdbpaths, logconf):
pipe.send('captured')
sys.exit(1)

def _backupSleep(path, linkinfo):
def _backupSleep(path, linkinfo, meta):
time.sleep(3.0)

async def _doEOFBackup(path):
return

async def _iterBackupEOF(path, linkinfo):
async def _iterBackupEOF(path, linkinfo, meta):
link = await s_link.fromspawn(linkinfo)
await s_daemon.t2call(link, _doEOFBackup, (path,), {})
link.writer.write_eof()
await link.fini()

def _backupEOF(path, linkinfo):
asyncio.run(_iterBackupEOF(path, linkinfo))
def _backupEOF(path, linkinfo, meta):
asyncio.run(_iterBackupEOF(path, linkinfo, meta))

def lock_target(dirn, evt1): # pragma: no cover
'''
Expand Down Expand Up @@ -1369,7 +1369,9 @@ async def _iterNewDup(self, user, name=None, remove=False):
self.len(1, nodes)

with tarfile.open(bkuppath4, 'r:gz') as tar:
bkupname = os.path.commonprefix(tar.getnames())
names = tar.getnames()
names.remove('manifest.yaml')
bkupname = os.path.commonprefix(names)
tar.extractall(path=dirn)

bkupdirn4 = os.path.join(dirn, bkupname)
Expand All @@ -1392,7 +1394,9 @@ async def _iterNewDup(self, user, name=None, remove=False):
await s_t_utils.alist(proxy.iterNewBackupArchive('eof', remove=True))

with tarfile.open(bkuppath5, 'r:gz') as tar:
bkupname = os.path.commonprefix(tar.getnames())
names = tar.getnames()
names.remove('manifest.yaml')
bkupname = os.path.commonprefix(names)
tar.extractall(path=dirn)

bkupdirn5 = os.path.join(dirn, bkupname)
Expand Down

0 comments on commit 91cef91

Please sign in to comment.