Fetching contributors…
Cannot retrieve contributors at this time
791 lines (673 sloc) 29.3 KB
#! /usr/bin/env python
"""Londiste setup and sanity checker.
import sys, os, re, skytools
from pgq.cascade.admin import CascadeAdmin
from londiste.exec_attrs import ExecAttrs
from londiste.util import find_copy_source
import londiste.handler
__all__ = ['LondisteSetup']
class LondisteSetup(CascadeAdmin):
"""Londiste-specific admin commands."""
initial_db_name = 'node_db'
provider_location = None
commands_without_pidfile = CascadeAdmin.commands_without_pidfile + [
'tables', 'seqs', 'missing', 'show-handlers']
def install_code(self, db):
self.extra_objs = [
skytools.DBSchema("londiste", sql_file = 'londiste.sql'),
skytools.DBFunction("londiste.global_add_table", 2, sql_file = 'londiste.upgrade_2.1_to_3.1.sql'),
CascadeAdmin.install_code(self, db)
def __init__(self, args):
"""Londiste setup init."""
CascadeAdmin.__init__(self, 'londiste3', 'db', args, worker_setup = True)
# compat
self.queue_name ='pgq_queue_name', '')
# real
if not self.queue_name:
self.queue_name ='queue_name')
self.set_name = self.queue_name
self.lock_timeout ='lock_timeout', 10)
def init_optparse(self, parser=None):
"""Add londiste switches to CascadeAdmin ones."""
p = CascadeAdmin.init_optparse(self, parser)
p.add_option("--expect-sync", action="store_true", dest="expect_sync",
help = "no copy needed", default=False)
p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
help = "do not delete old data", default=False)
p.add_option("--find-copy-node", action="store_true", dest="find_copy_node",
help = "add: find table source for copy by walking upwards")
p.add_option("--copy-node", metavar = "NODE", dest="copy_node",
help = "add: use NODE as source for initial copy")
p.add_option("--force", action="store_true",
help="force", default=False)
p.add_option("--all", action="store_true",
help="include all tables", default=False)
p.add_option("--wait-sync", action="store_true",
help = "add: wait until all tables are in sync"),
p.add_option("--create", action="store_true",
help="create, minimal", default=False)
p.add_option("--create-full", action="store_true",
help="create, full")
help="set trigger flags (BAIUDLQ)")
p.add_option("--trigger-arg", action="append",
help="custom trigger arg")
p.add_option("--no-triggers", action="store_true",
help="no triggers on table")
p.add_option("--handler", action="store",
help="add: custom handler for table")
p.add_option("--handler-arg", action="append",
help="add: argument to custom handler")
p.add_option("--merge-all", action="store_true",
help="merge tables from all source queues", default=False)
p.add_option("--no-merge", action="store_true",
help="do not merge tables from source queues", default=False)
p.add_option("--max-parallel-copy", metavar = "NUM", type = "int",
help="max number of parallel copy processes")
p.add_option("--dest-table", metavar = "NAME",
help="add: name for actual table")
p.add_option("--skip-non-existing", action="store_true",
help="add: skip object that does not exist")
return p
def extra_init(self, node_type, node_db, provider_db):
"""Callback from CascadeAdmin init."""
if not provider_db:
pcurs = provider_db.cursor()
ncurs = node_db.cursor()
# sync tables
q = "select table_name from londiste.get_table_list(%s)"
pcurs.execute(q, [self.set_name])
for row in pcurs.fetchall():
tbl = row['table_name']
q = "select * from londiste.global_add_table(%s, %s)"
ncurs.execute(q, [self.set_name, tbl])
# sync seqs
q = "select seq_name, last_value from londiste.get_seq_list(%s)"
pcurs.execute(q, [self.set_name])
for row in pcurs.fetchall():
seq = row['seq_name']
val = row['last_value']
q = "select * from londiste.global_update_seq(%s, %s, %s)"
ncurs.execute(q, [self.set_name, seq, val])
# done
def is_root(self):
return self.queue_info.local_node.type == 'root'
def set_lock_timeout(self, curs):
ms = int(1000 * self.lock_timeout)
if ms > 0:
q = "SET LOCAL statement_timeout = %d" % ms
def cmd_add_table(self, *args):
"""Attach table(s) to local node."""
src_db = self.get_provider_db()
if not self.is_root():
src_curs = src_db.cursor()
src_tbls = self.fetch_set_tables(src_curs)
dst_db = self.get_database('db')
dst_curs = dst_db.cursor()
dst_tbls = self.fetch_set_tables(dst_curs)
if self.is_root():
src_tbls = dst_tbls
self.sync_table_list(dst_curs, src_tbls, dst_tbls)
needs_tbl = self.handler_needs_table()
args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl)
# pick proper create flags
if self.options.create_full:
create_flags = skytools.T_ALL
elif self.options.create:
create_flags = skytools.T_TABLE | skytools.T_PKEY
create_flags = 0
# search for usable copy node if requested & needed
if (self.options.find_copy_node and create_flags != 0
and needs_tbl and not self.is_root()):
src_name, src_loc, _ = find_copy_source(self, self.queue_name, args, None, self.provider_location)
self.options.copy_node = src_name
src_db = self.get_provider_db()
src_curs = src_db.cursor()
src_tbls = self.fetch_set_tables(src_curs)
# dont check for exist/not here (root handling)
if not self.is_root() and not self.options.expect_sync and not self.options.find_copy_node:
problems = False
for tbl in args:
tbl = skytools.fq_name(tbl)
if (tbl in src_tbls) and not src_tbls[tbl]['local']:
if self.options.skip_non_existing:
self.log.warning("Table %s does not exist on provider", tbl)
self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
problems = True
if problems:
self.log.error("Problems, canceling operation")
# sanity check
if self.options.dest_table and len(args) > 1:
self.log.error("--dest-table can be given only for single table")
# seems ok
for tbl in args:
self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
# wait
if self.options.wait_sync:
def add_table(self, src_db, dst_db, tbl, create_flags, src_tbls):
# use full names
tbl = skytools.fq_name(tbl)
dest_table = self.options.dest_table or tbl
dest_table = skytools.fq_name(dest_table)
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
tbl_exists = skytools.exists_table(dst_curs, dest_table)
if dest_table == tbl:
desc = tbl
desc = "%s(%s)" % (tbl, dest_table)
if create_flags:
if tbl_exists:'Table %s already exist, not touching', desc)
src_dest_table = src_tbls[tbl]['dest_table']
if not skytools.exists_table(src_curs, src_dest_table):
# table not present on provider - nowhere to get the DDL from
self.log.warning('Table %s missing on provider, cannot create, skipping', desc)
schema = skytools.fq_name_parts(dest_table)[0]
if not skytools.exists_schema(dst_curs, schema):
q = "create schema %s" % skytools.quote_ident(schema)
s = skytools.TableStruct(src_curs, src_dest_table)
# create, using rename logic only when necessary
newname = None
if src_dest_table != dest_table:
newname = dest_table
s.create(dst_curs, create_flags, log = self.log, new_table_name = newname)
elif not tbl_exists and self.options.skip_non_existing:
self.log.warning('Table %s does not exist on local node, skipping', desc)
tgargs = self.build_tgargs()
attrs = {}
if self.options.handler:
attrs['handler'] = self.build_handler(tbl, tgargs, self.options.dest_table)
if self.options.find_copy_node:
attrs['copy_node'] = '?'
elif self.options.copy_node:
attrs['copy_node'] = self.options.copy_node
if not self.options.expect_sync:
if self.options.skip_truncate:
attrs['skip_truncate'] = 1
if self.options.max_parallel_copy:
attrs['max_parallel_copy'] = self.options.max_parallel_copy
# actual table registration
args = [self.set_name, tbl, tgargs, None, None]
if attrs:
args[3] = skytools.db_urlencode(attrs)
if dest_table != tbl:
args[4] = dest_table
q = "select * from londiste.local_add_table(%s, %s, %s, %s, %s)"
self.exec_cmd(dst_curs, q, args)
def build_tgargs(self):
"""Build trigger args"""
tgargs = []
if self.options.trigger_arg:
tgargs = self.options.trigger_arg
tgflags = self.options.trigger_flags
if tgflags:
if self.options.no_triggers:
if self.options.merge_all:
if self.options.no_merge:
if self.options.expect_sync:
return tgargs
def build_handler(self, tbl, tgargs, dest_table=None):
"""Build handler and return handler string"""
hstr = londiste.handler.create_handler_string(
self.options.handler, self.options.handler_arg)
p = londiste.handler.build_handler(tbl, hstr, dest_table)
return hstr
def handler_needs_table(self):
if self.options.handler:
hstr = londiste.handler.create_handler_string(
self.options.handler, self.options.handler_arg)
p = londiste.handler.build_handler('unused.string', hstr, None)
return p.needs_table()
return True
def sync_table_list(self, dst_curs, src_tbls, dst_tbls):
for tbl in src_tbls.keys():
q = "select * from londiste.global_add_table(%s, %s)"
if tbl not in dst_tbls:"Table %s info missing from subscriber, adding", tbl)
self.exec_cmd(dst_curs, q, [self.set_name, tbl])
dst_tbls[tbl] = {'local': False, 'dest_table': tbl}
for tbl in dst_tbls.keys():
q = "select * from londiste.global_remove_table(%s, %s)"
if tbl not in src_tbls:"Table %s gone but exists on subscriber, removing", tbl)
self.exec_cmd(dst_curs, q, [self.set_name, tbl])
del dst_tbls[tbl]
def fetch_set_tables(self, curs):
q = "select table_name, local, "\
" coalesce(dest_table, table_name) as dest_table "\
" from londiste.get_table_list(%s)"
curs.execute(q, [self.set_name])
res = {}
for row in curs.fetchall():
res[row[0]] = row
return res
def cmd_remove_table(self, *args):
"""Detach table(s) from local node."""
db = self.get_database('db')
args = self.expand_arg_list(db, 'r', True, args)
q = "select * from londiste.local_remove_table(%s, %s)"
self.exec_cmd_many(db, q, [self.set_name], args)
def cmd_change_handler(self, tbl):
"""Change handler (table_attrs) of the replicated table."""
tbl = skytools.fq_name(tbl)
db = self.get_database('db')
curs = db.cursor()
q = "select table_attrs, dest_table "\
" from londiste.get_table_list(%s) "\
" where table_name = %s and local"
curs.execute(q, [self.set_name, tbl])
if curs.rowcount == 0:
self.log.error("Table %s not found on this node", tbl)
attrs, dest_table = curs.fetchone()
attrs = skytools.db_urldecode(attrs or '')
old_handler = attrs.get('handler')
tgargs = self.build_tgargs()
if self.options.handler:
new_handler = self.build_handler(tbl, tgargs, dest_table)
new_handler = None
if old_handler == new_handler:"Handler is already set to desired value, nothing done")
if new_handler:
attrs['handler'] = new_handler
elif 'handler' in attrs:
del attrs['handler']
args = [self.set_name, tbl, tgargs, None]
if attrs:
args[3] = skytools.db_urlencode(attrs)
q = "select * from londiste.local_change_handler(%s, %s, %s, %s)"
self.exec_cmd(curs, q, args)
def cmd_add_seq(self, *args):
"""Attach seqs(s) to local node."""
dst_db = self.get_database('db')
dst_curs = dst_db.cursor()
src_db = self.get_provider_db()
src_curs = src_db.cursor()
src_seqs = self.fetch_seqs(src_curs)
dst_seqs = self.fetch_seqs(dst_curs)
self.sync_seq_list(dst_curs, src_seqs, dst_seqs)
args = self.expand_arg_list(dst_db, 'S', False, args)
# pick proper create flags
if self.options.create_full:
create_flags = skytools.T_SEQUENCE
elif self.options.create:
create_flags = skytools.T_SEQUENCE
create_flags = 0
# seems ok
for seq in args:
seq = skytools.fq_name(seq)
self.add_seq(src_db, dst_db, seq, create_flags)
def add_seq(self, src_db, dst_db, seq, create_flags):
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
seq_exists = skytools.exists_sequence(dst_curs, seq)
if create_flags:
if seq_exists:'Sequence %s already exist, not creating', seq)
if not skytools.exists_sequence(src_curs, seq):
# sequence not present on provider - nowhere to get the DDL from
self.log.warning('Sequence "%s" missing on provider, skipping', seq)
s = skytools.SeqStruct(src_curs, seq)
s.create(dst_curs, create_flags, log = self.log)
elif not seq_exists:
if self.options.skip_non_existing:
self.log.warning('Sequence "%s" missing on local node, skipping', seq)
raise skytools.UsageError("Sequence %r missing on local node", seq)
q = "select * from londiste.local_add_seq(%s, %s)"
self.exec_cmd(dst_curs, q, [self.set_name, seq])
def fetch_seqs(self, curs):
q = "select seq_name, last_value, local from londiste.get_seq_list(%s)"
curs.execute(q, [self.set_name])
res = {}
for row in curs.fetchall():
res[row[0]] = row
return res
def sync_seq_list(self, dst_curs, src_seqs, dst_seqs):
for seq in src_seqs.keys():
q = "select * from londiste.global_update_seq(%s, %s, %s)"
if seq not in dst_seqs:"Sequence %s info missing from subscriber, adding", seq)
self.exec_cmd(dst_curs, q, [self.set_name, seq, src_seqs[seq]['last_value']])
tmp = src_seqs[seq].copy()
tmp['local'] = False
dst_seqs[seq] = tmp
for seq in dst_seqs.keys():
q = "select * from londiste.global_remove_seq(%s, %s)"
if seq not in src_seqs:"Sequence %s gone but exists on subscriber, removing", seq)
self.exec_cmd(dst_curs, q, [self.set_name, seq])
del dst_seqs[seq]
def cmd_remove_seq(self, *args):
"""Detach seqs(s) from local node."""
q = "select * from londiste.local_remove_seq(%s, %s)"
db = self.get_database('db')
args = self.expand_arg_list(db, 'S', True, args)
self.exec_cmd_many(db, q, [self.set_name], args)
def cmd_resync(self, *args):
"""Reload data from provider node."""
db = self.get_database('db')
args = self.expand_arg_list(db, 'r', True, args)
if not self.options.find_copy_node:
src_db = self.get_provider_db()
src_curs = src_db.cursor()
src_tbls = self.fetch_set_tables(src_curs)
problems = 0
for tbl in args:
tbl = skytools.fq_name(tbl)
if tbl not in src_tbls or not src_tbls[tbl]['local']:
self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
problems += 1
if problems > 0:
self.log.error("Problems, cancelling operation")
if self.options.find_copy_node or self.options.copy_node:
q = "select table_name, table_attrs from londiste.get_table_list(%s) where local"
cur = db.cursor()
cur.execute(q, [self.set_name])
for row in cur.fetchall():
if row['table_name'] not in args:
attrs = skytools.db_urldecode (row['table_attrs'] or '')
if self.options.find_copy_node:
attrs['copy_node'] = '?'
elif self.options.copy_node:
attrs['copy_node'] = self.options.copy_node
attrs = skytools.db_urlencode (attrs)
q = "select * from londiste.local_set_table_attrs (%s, %s, %s)"
self.exec_cmd(db, q, [self.set_name, row['table_name'], attrs])
q = "select * from londiste.local_set_table_state(%s, %s, null, null)"
self.exec_cmd_many(db, q, [self.set_name], args)
def cmd_tables(self):
"""Show attached tables."""
q = """select table_name, merge_state, table_attrs
from londiste.get_table_list(%s) where local
order by table_name"""
db = self.get_database('db')
def show_attr(a):
if a:
return skytools.db_urldecode(a)
return ''
self.display_table(db, "Tables on node", q, [self.set_name],
fieldfmt = {'table_attrs': show_attr})
def cmd_seqs(self):
"""Show attached seqs."""
q = "select seq_name, local, last_value from londiste.get_seq_list(%s)"
db = self.get_database('db')
self.display_table(db, "Sequences on node", q, [self.set_name])
def cmd_missing(self):
"""Show missing tables on local node."""
q = "select * from londiste.local_show_missing(%s)"
db = self.get_database('db')
self.display_table(db, "Missing objects on node", q, [self.set_name])
def cmd_check(self):
"""TODO: check if structs match"""
def cmd_fkeys(self):
"""TODO: show removed fkeys."""
def cmd_triggers(self):
"""TODO: show removed triggers."""
def cmd_show_handlers(self, *args):
"""Show help about handlers."""
def cmd_execute(self, *files):
db = self.get_database('db')
curs = db.cursor()
tables = self.fetch_set_tables(curs)
seqs = self.fetch_seqs(curs)
# generate local maps
local_tables = {}
local_seqs = {}
for tbl in tables.values():
if tbl['local']:
local_tables[tbl['table_name']] = tbl['dest_table']
for seq in seqs.values():
if seq['local']:
local_seqs[seq['seq_name']] = seq['seq_name']
# set replica role for EXECUTE transaction
if db.server_version >= 80300:
curs.execute("set local session_replication_role = 'local'")
for fn in files:
fname = os.path.basename(fn)
sql = open(fn, "r").read()
attrs = ExecAttrs(sql = sql)
q = "select * from londiste.execute_start(%s, %s, %s, true, %s)"
res = self.exec_cmd(db, q, [self.queue_name, fname, sql, attrs.to_urlenc()], commit = False)
ret = res[0]['ret_code']
if ret > 200:
self.log.warning("Skipping execution of '%s'", fname)
if attrs.need_execute(curs, local_tables, local_seqs):"%s: executing sql", fname)
xsql = attrs.process_sql(sql, local_tables, local_seqs)
for stmt in skytools.parse_statements(xsql):
else:"%s: This SQL does not need to run on this node.", fname)
q = "select * from londiste.execute_finish(%s, %s)"
self.exec_cmd(db, q, [self.queue_name, fname], commit = False)
def get_provider_db(self):
if self.options.copy_node:
# use custom node for copy
source_node = self.options.copy_node
m = self.queue_info.get_member(source_node)
if not m:
raise skytools.UsageError("Cannot find node <%s>", source_node)
if source_node == self.local_node:
raise skytools.UsageError("Cannot use itself as provider")
self.provider_location = m.location
if not self.provider_location:
db = self.get_database('db')
q = 'select * from pgq_node.get_node_info(%s)'
res = self.exec_cmd(db, q, [self.queue_name], quiet = True)
self.provider_location = res[0]['provider_location']
return self.get_database('provider_db', connstr = self.provider_location, profile = 'remote')
def expand_arg_list(self, db, kind, existing, args, needs_tbl=True):
curs = db.cursor()
if kind == 'S':
q1 = "select seq_name, local from londiste.get_seq_list(%s) where local"
elif kind == 'r':
q1 = "select table_name, local from londiste.get_table_list(%s) where local"
raise Exception("bug")
q2 = "select obj_name from londiste.local_show_missing(%%s) where obj_kind = '%s'" % kind
lst_exists = []
map_exists = {}
curs.execute(q1, [self.set_name])
for row in curs.fetchall():
map_exists[row[0]] = 1
lst_missing = []
map_missing = {}
curs.execute(q2, [self.set_name])
for row in curs.fetchall():
map_missing[row[0]] = 1
if not args and self.options.all:
if existing:
return lst_exists
return lst_missing
allow_nonexist = not needs_tbl
if existing:
res = self.solve_globbing(args, lst_exists, map_exists, map_missing, allow_nonexist)
res = self.solve_globbing(args, lst_missing, map_missing, map_exists, allow_nonexist)
if not res:"what to do ?")
return res
def solve_globbing(self, args, full_list, full_map, reverse_map, allow_nonexist):
def glob2regex(s):
s = s.replace('.', '[.]').replace('?', '.').replace('*', '.*')
return '^%s$' % s
res_map = {}
res_list = []
err = 0
for a in args:
if a.find('*') >= 0 or a.find('?') >= 0:
if a.find('.') < 0:
a = 'public.' + a
rc = re.compile(glob2regex(a))
for x in full_list:
if rc.match(x):
if not x in res_map:
res_map[x] = 1
a = skytools.fq_name(a)
if a in res_map:
elif a in full_map:
res_map[a] = 1
elif a in reverse_map:"%s already processed", a)
elif allow_nonexist:
res_map[a] = 1
elif self.options.force:
self.log.warning("%s not available, but --force is used", a)
res_map[a] = 1
self.log.warning("%s not available", a)
err = 1
if err:
raise skytools.UsageError("Cannot proceed")
return res_list
def load_extra_status(self, curs, node):
"""Fetch extra info."""
# must be thread-safe (!)
CascadeAdmin.load_extra_status(self, curs, node)
curs.execute("select * from londiste.get_table_list(%s)", [self.queue_name])
n_ok = n_half = n_ign = 0
for tbl in curs.fetchall():
if not tbl['local']:
n_ign += 1
elif tbl['merge_state'] == 'ok':
n_ok += 1
n_half += 1
node.add_info_line('Tables: %d/%d/%d' % (n_ok, n_half, n_ign))
def cmd_wait_sync(self):
dst_db = self.get_database('db')
def wait_for_sync(self, dst_db):"Waiting until all tables are in sync")
q = "select table_name, merge_state, local"\
" from londiste.get_table_list(%s) where local"
dst_curs = dst_db.cursor()
partial = {}
startup_info = 0
while 1:
dst_curs.execute(q, [self.queue_name])
rows = dst_curs.fetchall()
total_count = 0
cur_count = 0
done_list = []
for row in rows:
if not row['local']:
total_count += 1
tbl = row['table_name']
if row['merge_state'] != 'ok':
partial[tbl] = 0
cur_count += 1
elif tbl in partial:
if partial[tbl] == 0:
partial[tbl] = 1
done_count = total_count - cur_count
if not startup_info:"%d/%d table(s) to copy", cur_count, total_count)
startup_info = 1
for done in done_list:"%s: finished (%d/%d)", done, done_count, total_count)
if cur_count == 0:
self.sleep(2)"All done")
def resurrect_dump_event(self, ev, stats, batch_info):
"""Collect per-table stats."""
super(LondisteSetup, self).resurrect_dump_event(ev, stats, batch_info)
ROLLBACK = 'can rollback'
NO_ROLLBACK = 'cannot rollback'
if ev.ev_type == 'TRUNCATE':
if 'truncated_tables' not in stats:
stats['truncated_tables'] = []
tlist = stats['truncated_tables']
tbl = ev.ev_extra1
if tbl not in tlist:
elif ev.ev_type[:2] in ('I:', 'U:', 'D:', 'I', 'U', 'D'):
op = ev.ev_type[0]
tbl = ev.ev_extra1
bak = ev.ev_extra3
tblkey = 'table: %s' % tbl
if tblkey not in stats:
stats[tblkey] = [0,0,0,ROLLBACK]
tinfo = stats[tblkey]
if op == 'I':
tinfo[0] += 1
elif op == 'U':
tinfo[1] += 1
if not bak:
tinfo[3] = NO_ROLLBACK
elif op == 'D':
tinfo[2] += 1
if not bak and ev.ev_type == 'D':
tinfo[3] = NO_ROLLBACK