Skip to content

Commit

Permalink
walmgr: new command createslave and new option --synch-standby
Browse files Browse the repository at this point in the history
  • Loading branch information
Tarvi Pillessaar committed Jul 9, 2013
1 parent a49b2b8 commit bc1bf3c
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 32 deletions.
13 changes: 13 additions & 0 deletions doc/walmgr3.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ listed below.
Remove .pgpass entry, which was used for streaming replication
(used in Slave)

--synch-standby='synchronous_standby_names'::
Do the same thing as command synch-standby, but walmgr ini file is not used.
This option can be used when walmgr ini is not available. It tries to guess
the postgres config location, --pgdata option may also be needed.
(used in Master)

== DAEMON OPTIONS ==

-r, --reload::
Expand Down Expand Up @@ -151,6 +157,13 @@ Pauses WAL playback.

Continues previously paused WAL playback.

=== createslave ===

Creates backup from Master database using streaming replication.
Also creates recovery.conf and starts slave standby.
Backup is created with pg_basebackup and pg_receivexlog (available in 9.2 and
up).

== COMMON COMMANDS ==

=== listbackups ===
Expand Down
251 changes: 219 additions & 32 deletions python/walmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
boot Stop playback, accept queries
pause Just wait, don't play WAL-s
continue Start playing WAL-s again
createslave Create streaming replication slave
Common commands:
init Create configuration files, set up ssh keys.
Expand Down Expand Up @@ -231,26 +232,6 @@ def write(self):
f.writelines(self.contents)
f.close()

def pgpass_fields_from_conninfo(self,conninfo):
"""Extract host,user and port from primary-conninfo"""
m = re.match("^.*\s*host=\s*([^\s]+)\s*.*$", conninfo)
if m:
host = m.group(1)
else:
host = 'localhost'
m = re.match("^.*\s*user=\s*([^\s]+)\s*.*$", conninfo)
if m:
user = m.group(1)
else:
user = os.environ['USER']
m = re.match("^.*\s*port=\s*([^\s]+)\s*.*$", conninfo)
if m:
port = m.group(1)
else:
port = '5432'

return host,port,user


class PostgresConfiguration:
"""Postgres configuration manipulation"""
Expand Down Expand Up @@ -385,6 +366,8 @@ def init_optparse(self, parser=None):
help = "slave: connect string for streaming replication master")
p.add_option("", "--init-slave", action="store_true", dest="init_slave",
help = "Initialize slave walmgr.", default=False)
p.add_option("", "--synch-standby", action="store", dest="synchronous_standby_names", default=None,
help = "master: do the same thing as command synch-standby, but do not use INI file")
return p

def load_config(self):
Expand Down Expand Up @@ -477,6 +460,8 @@ def __init__(self, args):
self.cmd = 'init_master'
elif self.options.init_slave:
self.cmd = 'init_slave'
elif self.options.synchronous_standby_names is not None:
self.cmd = "synch-standby"
else:
usage(1)

Expand All @@ -502,6 +487,7 @@ def __init__(self, args):
'pause': self.slave_pause,
'continue': self.slave_continue,
'boot': self.slave_boot,
'createslave': self.slave_createslave,
'cleanup': self.walmgr_cleanup,
'synch-standby': self.master_synch_standby,
'xlock': self.slave_lock_backups_exit,
Expand Down Expand Up @@ -663,6 +649,33 @@ def chdir(self, loc):
self.pg_stop_backup()
sys.exit(1)

def parse_conninfo(self,conninfo):
"""Extract host,user and port from primary-conninfo"""
m = re.match("^.*\s*host\s*=\s*([^\s]+)\s*.*$", conninfo)
if m:
host = m.group(1)
else:
host = 'localhost'
m = re.match("^.*\s*user\s*=\s*([^\s]+)\s*.*$", conninfo)
if m:
user = m.group(1)
else:
user = os.environ['USER']
m = re.match("^.*\s*port\s*=\s*([^\s]+)\s*.*$", conninfo)
if m:
port = m.group(1)
else:
port = '5432'

m = re.match("^.*\s*sslmode\s*=\s*([^\s]+)\s*.*$", conninfo)
if m:
sslmode = m.group(1)
else:
sslmode = None

return host,port,user,sslmode


def get_last_complete(self):
"""Get the name of last xarchived segment."""

Expand Down Expand Up @@ -762,7 +775,7 @@ def walmgr_cleanup(self):
primary_conninfo = self.cf.get("primary_conninfo", "")
if self.options.remove_password and primary_conninfo and not self.not_really:
pg = Pgpass('~/.pgpass')
host, port, user = pg.pgpass_fields_from_conninfo(primary_conninfo)
host, port, user, _ = self.parse_conninfo(primary_conninfo)
if pg.remove_user(host, port, user):
self.log.info("Removing line from .pgpass")
pg.write()
Expand All @@ -775,13 +788,25 @@ def walmgr_cleanup(self):
def master_synch_standby(self):
"""Manage synchronous_standby_names parameter"""

if len(self.args) < 1:
die(1, "usage: synch-standby SYNCHRONOUS_STANDBY_NAMES")
if self.options.synchronous_standby_names is None:
if len(self.args) < 1:
die(1, "usage: synch-standby SYNCHRONOUS_STANDBY_NAMES")

names = self.args[0]
cf = PostgresConfiguration(self, self.cf.getfile("master_config"))
names = self.args[0]
self.assert_is_master(True)
else:
# as synchronous_standby_names is available since 9.1
# we can override DEFAULT_PG_VERSION
global DEFAULT_PG_VERSION
DEFAULT_PG_VERSION = "9.1"

self.assert_is_master(True)
self.guess_locations()
self.override_cf_option('master_config', self.postgres_conf)
self.override_cf_option('master_data', self.pgdata)
self.override_cf_option('master_db', 'dbname=template1')
names = self.options.synchronous_standby_names

cf = PostgresConfiguration(self, self.cf.getfile("master_config"))

# list of slaves
db = self.get_database("master_db")
Expand All @@ -791,12 +816,14 @@ def master_synch_standby(self):
self.close_database("master_db")

if names.strip() == "":
cf.set_synchronous_standby_names("")
if not self.not_really:
cf.set_synchronous_standby_names("")
return

if names.strip() == "*":
if slave_names:
cf.set_synchronous_standby_names(names)
if not self.not_really:
cf.set_synchronous_standby_names(names)
return
else:
die(1,"At least one slave must be available when enabling synchronous mode")
Expand All @@ -812,7 +839,7 @@ def master_synch_standby(self):

if not slave_found:
die(1,"At least one slave must be available from new list when enabling synchronous mode")
else:
elif not self.not_really:
cf.set_synchronous_standby_names(names)

def master_configure_archiving(self, enable_archiving, can_restart):
Expand Down Expand Up @@ -1202,7 +1229,7 @@ def walmgr_init_slave(self):
pwd = open(self.options.add_password).readline().rstrip('\n\r')

pg = Pgpass('~/.pgpass')
host, port, user = pg.pgpass_fields_from_conninfo(self.options.primary_conninfo)
host, port, user, _ = self.parse_conninfo(self.options.primary_conninfo)
pg.ensure_user(host, port, user, pwd)
pg.write()

Expand Down Expand Up @@ -1783,6 +1810,7 @@ def slave_xrestore_unsafe(self, srcname, dstpath, parent_pid, lstname = None):
pausefile = os.path.join(srcdir, "PAUSE")
stopfile = os.path.join(srcdir, "STOP")
prgrfile = os.path.join(srcdir, "PROGRESS")
prxlogfile = os.path.join(srcdir,"PG_RECEIVEXLOG")
srcfile = os.path.join(srcdir, srcname)
partfile = os.path.join(partdir, srcname)

Expand All @@ -1791,6 +1819,11 @@ def slave_xrestore_unsafe(self, srcname, dstpath, parent_pid, lstname = None):
primary_conninfo = self.cf.get("primary_conninfo", "")
if primary_conninfo and not os.path.isfile(srcfile):
self.log.info("%s: not found (ignored)", srcname)

# remove PG_RECEIVEXLOG file if it's present
if os.path.isfile(prxlogfile):
os.remove(prxlogfile)

sys.exit(1)

# assume that postgres has processed the WAL file and is
Expand Down Expand Up @@ -1858,7 +1891,7 @@ def slave_xrestore_unsafe(self, srcname, dstpath, parent_pid, lstname = None):
self.stat_add('count', 1)
self.send_stats()

def restore_database(self):
def restore_database(self, restore_config=True):
"""Restore the database from backup
If setname is specified, the contents of that backup set directory are
Expand Down Expand Up @@ -2063,7 +2096,8 @@ def restore_database(self):
# attempt to restore configuration. Note that we cannot
# postpone this to boot time, as the configuration is needed
# to start postmaster.
self.slave_restore_config()
if restore_config:
self.slave_restore_config()

# run database in recovery mode
self.log.info("Starting postmaster: %s", start_cmd)
Expand Down Expand Up @@ -2119,6 +2153,159 @@ def slave_boot(self):
open(stopfile, "w").write("1")
self.log.info("Stopping recovery mode")

def slave_createslave(self):
self.assert_is_master(False)

errors = False
xlog_dir = self.cf.getfile("completed_wals")
full_dir = self.cf.getfile("full_backup")
prxloglock = os.path.join(xlog_dir,"PG_RECEIVEXLOG")
pg_receivexlog = os.path.join(self.cf.getfile("slave_bin"), "pg_receivexlog")
pg_basebackup = os.path.join(self.cf.getfile("slave_bin"), "pg_basebackup")

# check if pg_receivexlog is available
if not os.access(pg_receivexlog, os.X_OK):
die(1, "pg_receivexlog not available")

# check if pg_receivexlog is already running
if os.path.isfile(prxloglock):
pidstring = open(prxloglock,"r").read()
try:
pid =int(pidstring)
try:
os.kill(pid, 0)
except OSError, e:
if e.errno == errno.EPERM:
self.log.fatal("Found pg_receivexlog lock file %s, pid %d in use", prxloglock, pid)
sys.exit(1)
elif e.errno == errno.ESRCH:
self.log.info("Ignoring stale pg_receivexlog lock file")
if not self.not_really:
os.remove(prxloglock)
else:
self.log.fatal("pg_receivexlog is already running in %s, pid %d", xlog_dir, pid)
sys.exit(1)
except ValueError:
self.log.fatal("pg_receivexlog lock file %s does not contain a pid: %s", prxloglock, pidstring)
sys.exit(1)

# create directories
self.walmgr_setup()

# ensure that backup destination is 0700
if not self.not_really:
os.chmod(full_dir,0700)

self.args = [str(os.getpid())]
if self.slave_lock_backups() != 0:
self.log.fatal("Cannot obtain backup lock.")
sys.exit(1)

# get host and user from primary_conninfo
primary_conninfo = self.cf.get("primary_conninfo", "")
if not primary_conninfo:
die(1, "primary_conninfo missing")
host, port, user, sslmode = self.parse_conninfo(primary_conninfo)

# change sslmode for pg_receivexlog and pg_basebackup
envssl=None
if sslmode:
envssl={"PGSSLMODE": sslmode}

try:
# determine postgres version, we cannot use pg_control version number since
# 9.0 and 9.1 are using the same number in controlfile
pg_ver = ""
try:
cmdline = [os.path.join(self.cf.getfile("slave_bin"), "postgres"),'-V']
process = subprocess.Popen(cmdline, stdout=subprocess.PIPE)
output = process.communicate()
pg_ver = output[0].split()[2]
self.log.debug("PostgreSQL version: %s" % pg_ver)
except:
pass

# create pg_receivexlog process
cmdline = [pg_receivexlog,'-D', xlog_dir, '-h', host, '-U', user, '-p', port, '-w']
self.log.info("Starting pg_receivexlog")

if not self.not_really:
p_rxlog = subprocess.Popen(cmdline,env=envssl)

# create pg_receivexlog lock file
open(prxloglock, "w").write(str(p_rxlog.pid))

# leave error checking for pg_basebackup
# if pg_basebackup command fails then pg_receivexlog is not working either

# start backup
self.log.info("Starting pg_basebackup")
cmdline = [pg_basebackup, '-D', full_dir, '-h', host, '-U', user, '-p', port, '-w']
if not self.not_really:
p_basebackup = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=envssl)
output = p_basebackup.communicate()
res = p_basebackup.returncode

if res != 0:
raise Exception("exec failed, res=%d (%r), %s" % (res, cmdline, output[1]))

# fix skipped ssl symlinks (only relevant for 9.1)
if pg_ver.startswith('9.1.'):
for line in output[1].splitlines():
m = re.match('WARNING: skipping special file "\./server\.(crt|key)"', line)
if m:
# create symlinks
if m.group(1) == 'crt':
os.symlink('/etc/ssl/certs/ssl-cert-snakeoil.pem',
os.path.join(full_dir,'server.crt'))
elif m.group(1) == 'key':
os.symlink('/etc/ssl/private/ssl-cert-snakeoil.key',
os.path.join(full_dir,'server.key'))

self.log.info("pg_basebackup finished successfully")

# restore
self.args = []
self.restore_database(False)

# wait for recovery
while os.path.isfile(prxloglock) and not self.not_really:
time.sleep(5)

except Exception, e:
self.log.error(e)
errors = True

finally:
# stop pg_receivexlog
try:
if not self.not_really:
os.kill(p_rxlog.pid, signal.SIGTERM)
self.log.info("pg_receivelog stopped")
except Exception, det:
self.log.warning("Failed to stop pg_receivexlog: %s", det)

# cleanup
if os.path.isfile(prxloglock):
os.remove(prxloglock)

if not self.not_really:
for f in os.listdir(xlog_dir):
if f.endswith('.partial'):
self.log.debug("Removing %s", os.path.join(xlog_dir,f))
os.remove(os.path.join(xlog_dir,f))

if not self.not_really and os.path.isdir(full_dir):
shutil.rmtree(full_dir)

self.slave_resume_backups()

if not errors:
self.log.info("Streaming replication standby created successfully")
else:
self.log.error("Failed to create streaming replication standby")
sys.exit(1)


def slave_pause(self, waitcomplete=0):
"""Pause the WAL apply, wait until last file applied if needed"""
Expand Down

0 comments on commit bc1bf3c

Please sign in to comment.