forked from pgq/skytools-legacy
-
Notifications
You must be signed in to change notification settings - Fork 17
/
londiste.py
executable file
·163 lines (141 loc) · 6.73 KB
/
londiste.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#! /usr/bin/env python
"""Londiste launcher.
"""
import sys, os, os.path, optparse
import pkgloader
pkgloader.require('skytools', '3.0')
import skytools
# python 2.3 will try londiste.py first...
if os.path.exists(os.path.join(sys.path[0], 'londiste.py')) \
and not os.path.isdir(os.path.join(sys.path[0], 'londiste')):
del sys.path[0]
import londiste, pgq.cascade.admin
command_usage = pgq.cascade.admin.command_usage + """
Replication Daemon:
worker replay events to subscriber
Replication Administration:
add-table TBL ... add table to queue
remove-table TBL ... remove table from queue
add-seq SEQ ... add sequence to provider
remove-seq SEQ ... remove sequence from provider
tables show all tables on provider
seqs show all sequences on provider
missing list tables subscriber has not yet attached to
resync TBL ... do full copy again
wait-sync wait until all tables are in sync
Replication Extra:
check compare table structure on both sides
fkeys print out fkey drop/create commands
compare [TBL ...] compare table contents on both sides
repair [TBL ...] repair data on subscriber
execute [FILE ...] execute SQL files on set
show-handlers [..] show info about all or specific handler
Internal Commands:
copy copy table logic
"""
cmd_handlers = (
(('create-root', 'create-branch', 'create-leaf', 'members', 'tag-dead', 'tag-alive',
'change-provider', 'rename-node', 'status', 'pause', 'resume', 'node-info',
'drop-node', 'takeover'), londiste.LondisteSetup),
(('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs',
'missing', 'resync', 'wait-sync', 'wait-root', 'wait-provider',
'check', 'fkeys', 'execute'), londiste.LondisteSetup),
(('show-handlers',), londiste.LondisteSetup),
(('worker',), londiste.Replicator),
(('compare',), londiste.Comparator),
(('repair',), londiste.Repairer),
(('copy',), londiste.CopyTable),
)
class Londiste(skytools.DBScript):
def __init__(self, args):
skytools.DBScript.__init__(self, 'londiste3', args)
if len(self.args) < 2:
print("need command")
sys.exit(1)
cmd = self.args[1]
self.script = None
for names, cls in cmd_handlers:
if cmd in names:
self.script = cls(args)
break
if not self.script:
print("Unknown command '%s', use --help for help" % cmd)
sys.exit(1)
def start(self):
self.script.start()
def print_ini(self):
"""Let the Replicator print the default config."""
londiste.Replicator(['--ini'])
def init_optparse(self, parser=None):
p = skytools.DBScript.init_optparse(self, parser)
p.set_usage(command_usage.strip())
g = optparse.OptionGroup(p, "options for cascading")
g.add_option("--provider",
help = "init: upstream node temp connect string")
g.add_option("--target",
help = "switchover: target node")
g.add_option("--merge",
help = "create-leaf: combined queue name")
g.add_option("--dead", action = 'append',
help = "cascade: assume node is dead")
g.add_option("--dead-root", action = 'store_true',
help = "takeover: old node was root")
g.add_option("--dead-branch", action = 'store_true',
help = "takeover: old node was branch")
g.add_option("--sync-watermark",
help = "create-branch: list of node names to sync wm with")
p.add_option_group(g)
g = optparse.OptionGroup(p, "repair queue position")
g.add_option("--rewind", action = "store_true",
help = "change queue position according to destination")
g.add_option("--reset", action = "store_true",
help = "reset queue pos on destination side")
p.add_option_group(g)
g = optparse.OptionGroup(p, "options for add")
g.add_option("--all", action="store_true",
help = "add: include add possible tables")
g.add_option("--wait-sync", action="store_true",
help = "add: wait until all tables are in sync"),
g.add_option("--dest-table",
help = "add: redirect changes to different table")
g.add_option("--expect-sync", action="store_true", dest="expect_sync",
help = "add: no copy needed", default=False)
g.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
help = "add: keep old data", default=False)
g.add_option("--create", action="store_true",
help = "add: create table/seq if not exist, with minimal schema")
g.add_option("--create-full", action="store_true",
help = "add: create table/seq if not exist, with full schema")
g.add_option("--trigger-flags",
help="add: Set trigger flags (BAIUDLQ)")
g.add_option("--trigger-arg", action="append",
help="add: Custom trigger arg (can be specified multiply times)")
g.add_option("--no-triggers", action="store_true",
help="add: Dont put triggers on table (makes sense on leaf)")
g.add_option("--handler", action="store",
help="add: Custom handler for table")
g.add_option("--handler-arg", action="append",
help="add: Argument to custom handler")
g.add_option("--find-copy-node", dest="find_copy_node", action="store_true",
help = "add: walk upstream to find node to copy from")
g.add_option("--copy-node", dest="copy_node",
help = "add: use NODE as source for initial COPY")
g.add_option("--copy-condition", dest="copy_condition",
help = "add: set WHERE expression for copy")
g.add_option("--merge-all", action="store_true",
help="merge tables from all source queues", default=False)
g.add_option("--no-merge", action="store_true",
help="don't merge tables from source queues", default=False)
g.add_option("--max-parallel-copy", type = "int",
help="max number of parallel copy processes")
p.add_option_group(g)
g = optparse.OptionGroup(p, "other options options")
g.add_option("--force", action="store_true",
help = "add: ignore table differences, repair: ignore lag")
g.add_option("--apply", action = "store_true",
help="repair: apply fixes automatically")
p.add_option_group(g)
return p
if __name__ == '__main__':
script = Londiste(sys.argv[1:])
script.start()