Skip to content

Commit

Permalink
Support for backing up non-running PG cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Mike Krieger authored and Daniel Farina committed Aug 4, 2012
1 parent 6de5822 commit f7986b7
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 8 deletions.
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.5.11
0.5.12
27 changes: 25 additions & 2 deletions wal_e/cmd.py
Expand Up @@ -33,6 +33,7 @@ def gevent_monkey(*args, **kwargs):
from wal_e.piper import popen_sp
from wal_e.worker.psql_worker import PSQL_BIN, psql_csv_run
from wal_e.worker.s3_worker import LZOP_BIN, MBUFFER_BIN
from wal_e.worker.pg_controldata_worker import CONFIG_BIN, PgControlDataParser

# TODO: Make controllable from userland
log_help.configure(
Expand Down Expand Up @@ -177,6 +178,13 @@ def main(argv=None):
'tunable number of bytes per second', dest='rate_limit',
metavar='BYTES_PER_SECOND',
type=int, default=None)
backup_push_parser.add_argument(
'--while-offline',
help='Set to true if backing up a PG cluster that is in a stopped state '
'(for example, a replica that you stop/start when taking a backup)',
dest='while_offline',
metavar='WHILE_OFFLINE',
type=bool, default=False)

wal_fetch_parser = subparsers.add_parser(
'wal-fetch', help='fetch a WAL file from S3',
Expand Down Expand Up @@ -287,7 +295,19 @@ def main(argv=None):
elif subcommand == 'backup-list':
backup_cxt.backup_list(query=args.QUERY, detail=args.detail)
elif subcommand == 'backup-push':
external_program_check([LZOP_BIN, PSQL_BIN, MBUFFER_BIN])
if args.while_offline:
# we need to query pg_config first for the
# pg_controldata's bin location
external_program_check([CONFIG_BIN])
parser = PgControlDataParser(args.PG_CLUSTER_DIRECTORY)
controldata_bin = parser.controldata_bin()
external_programs = [
LZOP_BIN,
MBUFFER_BIN,
controldata_bin]
else:
external_programs = [LZOP_BIN, PSQL_BIN, MBUFFER_BIN]
external_program_check(external_programs)
rate_limit = args.rate_limit
if rate_limit is not None and rate_limit < 8192:
logger.error(
Expand All @@ -297,8 +317,11 @@ def main(argv=None):
'greater than 8192'))
sys.exit(1)

while_offline = args.while_offline
backup_cxt.database_s3_backup(
args.PG_CLUSTER_DIRECTORY, rate_limit=rate_limit,
args.PG_CLUSTER_DIRECTORY,
rate_limit=rate_limit,
while_offline=while_offline,
pool_size=args.pool_size)
elif subcommand == 'wal-fetch':
external_program_check([LZOP_BIN])
Expand Down
32 changes: 27 additions & 5 deletions wal_e/operator/s3_operator.py
Expand Up @@ -27,6 +27,7 @@
from wal_e.piper import popen_sp
from wal_e.storage import s3_storage
from wal_e.worker.psql_worker import PSQL_BIN, PgBackupStatements
from wal_e.worker.pg_controldata_worker import PgControlDataParser


logger = log_help.WalELogger(__name__, level=logging.INFO)
Expand Down Expand Up @@ -295,7 +296,8 @@ def database_s3_fetch(self, pg_cluster_dir, backup_name, pool_size):

p.join(raise_error=True)

def database_s3_backup(self, *args, **kwargs):

def database_s3_backup(self, data_directory, *args, **kwargs):
"""
Uploads a PostgreSQL file cluster to S3
Expand All @@ -309,12 +311,29 @@ def database_s3_backup(self, *args, **kwargs):

upload_good = False
backup_stop_good = False
while_offline = False
start_backup_info = None
if 'while_offline' in kwargs:
while_offline = kwargs.pop('while_offline')

try:
start_backup_info = PgBackupStatements.run_start_backup()
version = PgBackupStatements.pg_version()['version']
if not while_offline:
start_backup_info = PgBackupStatements.run_start_backup()
version = PgBackupStatements.pg_version()['version']
else:
if os.path.exists(os.path.join(data_directory, 'postmaster.pid')):
raise UserException(
msg='while_offline set, but pg looks to be running',
detail='Found a postmaster.pid lockfile, and aborting',
hint='Shut down postgres. If there is a stale lockfile, '
'then remove it after being very sure postgres is not '
'running.')

controldata = PgControlDataParser(data_directory)
start_backup_info = controldata.last_xlog_file_name_and_offset()
version = controldata.pg_version()
uploaded_to, expanded_size_bytes = self._s3_upload_pg_cluster_dir(
start_backup_info, version=version, *args, **kwargs)
start_backup_info, data_directory, version=version, *args, **kwargs)
upload_good = True
finally:
if not upload_good:
Expand All @@ -324,7 +343,10 @@ def database_s3_backup(self, *args, **kwargs):
'but we have to wait anyway. '
'See README: TODO about pg_cancel_backup'))

stop_backup_info = PgBackupStatements.run_stop_backup()
if not while_offline:
stop_backup_info = PgBackupStatements.run_stop_backup()
else:
stop_backup_info = start_backup_info
backup_stop_good = True

# XXX: Ugly, this is more of a 'worker' task because it might
Expand Down
63 changes: 63 additions & 0 deletions wal_e/worker/pg_controldata_worker.py
@@ -0,0 +1,63 @@
from subprocess import PIPE
import os
from wal_e.piper import popen_sp

CONTROLDATA_BIN = 'pg_controldata'
CONFIG_BIN = 'pg_config'

class PgControlDataParser(object):
"""
When we're backing up a PG cluster that is not
running, we can't query it for information like
the current restartpoint's WAL index,
the current PG version, etc.
Fortunately, we can use pg_controldata, which
provides this information and doesn't require
a running PG process
"""

def __init__(self, data_directory):
self.data_directory = data_directory
pg_config_proc = popen_sp([CONFIG_BIN],
stdout=PIPE)
output = pg_config_proc.communicate()[0]
for line in output.split('\n'):
parts = line.split('=')
if len(parts) != 2:
continue
key, val = map(lambda x: x.strip(), parts)
if key == 'BINDIR':
self._controldata_bin = os.path.join(val, CONTROLDATA_BIN)
elif key == 'VERSION':
self._pg_version = val

def _read_controldata(self):
controldata_proc = popen_sp([self._controldata_bin, self.data_directory],
stdout=PIPE)
stdout = controldata_proc.communicate()[0]
controldata = {}
for line in stdout.split('\n'):
split_values = line.split(':')
if len(split_values) == 2:
key, val = split_values
controldata[key.strip()] = val.strip()
return controldata

def controldata_bin(self):
return self._controldata_bin

def pg_version(self):
return self._pg_version

def last_xlog_file_name_and_offset(self):
controldata = self._read_controldata()
last_checkpoint_offset = controldata["Latest checkpoint's REDO location"]
current_timeline = controldata["Latest checkpoint's TimeLineID"]
x, offset = last_checkpoint_offset.split('/')
timeline = current_timeline.zfill(8)
wal = x.zfill(8)
offset = offset[0:2].zfill(8)
return {
'file_name': ''.join([timeline, wal, offset]),
'file_offset': offset.zfill(8)}

0 comments on commit f7986b7

Please sign in to comment.