Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

--sync-watermark: limit watermark to specific nodes

This will allow specified nodes to sync global watermark
with each other, without sending it upwards.

This isolates root node from lag on downstream nodes.
  • Loading branch information...
commit 7c1861e7bbfde69da0904b68d38720786329af41 1 parent da4a0bc
@markokr authored
View
2  python/londiste.py
@@ -97,6 +97,8 @@ def init_optparse(self, parser=None):
help = "takeover: old node was root")
g.add_option("--dead-branch", action = 'store_true',
help = "takeover: old node was branch")
+ g.add_option("--sync-watermark",
+ help = "create-branch: list of node names to sync wm with")
p.add_option_group(g)
g = optparse.OptionGroup(p, "repair queue position")
g.add_option("--rewind", action = "store_true",
View
13 python/pgq/cascade/admin.py
@@ -94,6 +94,8 @@ def init_optparse(self, parser = None):
help = "tag some node as dead")
g.add_option("--dead-branch", action="store_true",
help = "tag some node as dead")
+ g.add_option("--sync-watermark",
+ help = "list of node names to sync with")
p.add_option_group(g)
return p
@@ -147,6 +149,7 @@ def create_node(self, node_type, node_name, node_location):
return
self.log.info("Initializing node")
+ node_attrs = {}
worker_name = self.options.worker
if not worker_name:
@@ -155,6 +158,11 @@ def create_node(self, node_type, node_name, node_location):
if combined_queue and node_type != 'leaf':
raise Exception('--merge can be used only for leafs')
+ if self.options.sync_watermark:
+ if node_type != 'branch':
+ raise UsageError('--sync-watermark can be used only for branch nodes')
+ node_attrs['sync_watermark'] = self.options.sync_watermark
+
# register member
if node_type == 'root':
global_watermark = None
@@ -221,6 +229,11 @@ def create_node(self, node_type, node_name, node_location):
self.extra_init(node_type, db, provider_db)
+ if node_attrs:
+ s_attrs = skytools.db_urlencode(node_attrs)
+ self.exec_cmd(db, "select * from pgq_node.set_node_attrs(%s, %s)",
+ [self.queue_name, s_attrs])
+
self.log.info("Done")
def extra_init(self, node_type, node_db, provider_db):
View
12 python/pgq/cascade/nodeinfo.py
@@ -6,6 +6,7 @@
__all__ = ['MemberInfo', 'NodeInfo', 'QueueInfo']
import datetime
+import skytools
# node types
ROOT = 'root'
@@ -49,6 +50,7 @@ class NodeInfo:
combined_queue = None
combined_type = None
last_tick = None
+ node_attrs = {}
def __init__(self, queue_name, row, main_worker = True, node_name = None):
self.queue_name = queue_name
@@ -83,6 +85,12 @@ def __init__(self, queue_name, row, main_worker = True, node_name = None):
self.combined_type = row['combined_type']
self.last_tick = row['worker_last_tick']
+ self.node_attrs = {}
+ if 'node_attrs' in row:
+ a = row['node_attrs']
+ if a:
+ self.node_attrs = skytools.db_urldecode(a)
+
def __get_target_queue(self):
qname = None
if self.type == LEAF:
@@ -129,6 +137,10 @@ def get_infolines(self):
txt += ", NOT UPTODATE"
lst.append(txt)
+ for k, v in self.node_attrs.items():
+ txt = "Attr: %s=%s" % (k, v)
+ lst.append(txt)
+
for cname, row in self.cascaded_consumer_map.items():
err = row['cur_error']
if err:
View
97 python/pgq/cascade/worker.py
@@ -4,10 +4,11 @@
"""
-import sys, time
+import sys, time, skytools
from pgq.cascade.consumer import CascadedConsumer
from pgq.producer import bulk_insert_events
+from pgq.event import Event
__all__ = ['CascadedWorker']
@@ -32,10 +33,20 @@ class WorkerState:
filtered_copy = 0 # ok
process_global_wm = 0 # ok
+ sync_watermark = 0 # ?
+ wm_sync_nodes = []
+
def __init__(self, queue_name, nst):
self.node_type = nst['node_type']
self.node_name = nst['node_name']
self.local_watermark = nst['local_watermark']
+ self.global_watermark = nst['global_watermark']
+
+ self.node_attrs = {}
+ attrs = nst.get('node_attrs', '')
+ if attrs:
+ self.node_attrs = skytools.db_urldecode(attrs)
+
ntype = nst['node_type']
ctype = nst['combined_type']
if ntype == 'root':
@@ -49,7 +60,12 @@ def __init__(self, queue_name, nst):
self.process_tick_event = 1
self.keep_event_ids = 1
self.create_tick = 1
- self.process_global_wm = 1
+ if 'sync_watermark' in self.node_attrs:
+ slist = self.node_attrs['sync_watermark']
+ self.sync_watermark = 1
+ self.wm_sync_nodes = slist.split(',')
+ else:
+ self.process_global_wm = 1
elif ntype == 'leaf' and not ctype:
self.process_batch = 1
self.process_events = 1
@@ -139,8 +155,6 @@ def process_remote_batch(self, src_db, tick_id, event_list, dst_db):
self.process_remote_event(src_curs, dst_curs, ev)
if ev.ev_id > max_id:
max_id = ev.ev_id
- if st.local_wm_publish:
- self.publish_local_wm(src_db)
if max_id > self.cur_max_id:
self.cur_max_id = max_id
@@ -195,22 +209,72 @@ def is_batch_done(self, state, batch_info, dst_db):
self.create_branch_tick(dst_db, cur_tick, tick_time)
return True
- def publish_local_wm(self, src_db):
+ def publish_local_wm(self, src_db, dst_db):
"""Send local watermark to provider.
"""
- if not self.main_worker:
- return
+
t = time.time()
if t - self.local_wm_publish_time < self.local_wm_publish_period:
return
st = self._worker_state
- self.log.debug("Publishing local watermark: %d" % st.local_watermark)
+ wm = st.local_watermark
+ if st.sync_watermark:
+ # dont send local watermark upstream
+ wm = self.batch_info['prev_tick_id']
+
+ self.log.debug("Publishing local watermark: %d" % wm)
src_curs = src_db.cursor()
q = "select * from pgq_node.set_subscriber_watermark(%s, %s, %s)"
- src_curs.execute(q, [self.pgq_queue_name, st.node_name, st.local_watermark])
+ src_curs.execute(q, [self.pgq_queue_name, st.node_name, wm])
+
+ # if last part fails, dont repeat it immediately
self.local_wm_publish_time = t
+ if st.sync_watermark:
+ # instead sync 'global-watermark' with specific nodes
+ dst_curs = dst_db.cursor()
+ nmap = self._get_node_map(dst_curs)
+ dst_db.commit()
+
+ wm = st.local_watermark
+ for node in st.wm_sync_nodes:
+ if node == st.node_name:
+ continue
+ if node not in nmap:
+ # dont ignore missing nodes - cluster may be partially set up
+ self.log.warning('Unknown node in sync_watermark list: %s' % node)
+ return
+ n = nmap[node]
+ if n['dead']:
+ # ignore dead nodes
+ continue
+ wmdb = self.get_database('wmdb', connstr = n['node_location'], autocommit = 1)
+ wmcurs = wmdb.cursor()
+ q = 'select local_watermark from pgq_node.get_node_info(%s)'
+ wmcurs.execute(q, [self.queue_name])
+ row = wmcurs.fetchone()
+ if not row:
+ # partially set up node?
+ self.log.warning('Node not working: %s' % node)
+ elif row['local_watermark'] < wm:
+ # keep lowest wm
+ wm = row['local_watermark']
+ self.close_database('wmdb')
+
+ # now we have lowest wm, store it
+ q = "select pgq_node.set_global_watermark(%s, %s)"
+ dst_curs.execute(q, [self.queue_name, wm])
+ dst_db.commit()
+
+ def _get_node_map(self, curs):
+ q = "select node_name, node_location, dead from pgq_node.get_queue_locations(%s)"
+ curs.execute(q, [self.queue_name])
+ res = {}
+ for row in curs.fetchall():
+ res[row['node_name']] = row
+ return res
+
def process_remote_event(self, src_curs, dst_curs, ev):
"""Handle cascading events.
"""
@@ -245,7 +309,10 @@ def process_remote_event(self, src_curs, dst_curs, ev):
q = "select * from pgq_node.unregister_location(%s, %s)"
dst_curs.execute(q, [self.pgq_queue_name, node])
elif t == "pgq.global-watermark":
- if st.process_global_wm:
+ if st.sync_watermark:
+ tick_id = int(ev.ev_data)
+ self.log.info('Ignoring global watermark %s' % tick_id)
+ elif st.process_global_wm:
tick_id = int(ev.ev_data)
q = "select * from pgq_node.set_global_watermark(%s, %s)"
dst_curs.execute(q, [self.pgq_queue_name, tick_id])
@@ -288,6 +355,8 @@ def finish_remote_batch(self, src_db, dst_db, tick_id):
tick_id = self.batch_info['tick_id']
tick_time = self.batch_info['batch_end']
self.create_branch_tick(dst_db, tick_id, tick_time)
+ if st.local_wm_publish:
+ self.publish_local_wm(src_db, dst_db)
def create_branch_tick(self, dst_db, tick_id, tick_time):
q = "select pgq.ticker(%s, %s, %s, %s)"
@@ -308,6 +377,14 @@ def copy_event(self, dst_curs, ev, filtered_copy):
return
if len(self.ev_buf) >= self.max_evbuf:
self.flush_events(dst_curs)
+
+ if ev.type == 'pgq.global-watermark':
+ st = self._worker_state
+ if st.sync_watermark:
+ # replace payload with synced global watermark
+ row = ev._event_row.copy()
+ row['ev_data'] = str(st.global_watermark)
+ ev = Event(self.queue_name, row)
self.ev_buf.append(ev)
def flush_events(self, dst_curs):
View
48 sql/pgq_node/expected/pgq_node_test.out
@@ -4,6 +4,10 @@
0
(1 row)
+ sub_consumer | sub_id | co_name
+--------------+--------+---------
+(0 rows)
+
upgrade_schema
----------------
0
@@ -159,10 +163,16 @@ select queue_name, consumer_name, last_tick from pgq.get_consumer_info();
aqueue | node2_worker | 1
(3 rows)
+select * from pgq_node.set_node_attrs('aqueue', 'test=1');
+ ret_code | ret_note
+----------+-------------------------
+ 200 | Node attributes updated
+(1 row)
+
select * from pgq_node.get_node_info('aqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
-----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
- 100 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 3
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs
+----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------
+ 100 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 3 | test=1
(1 row)
select * from pgq_node.get_subscriber_info('aqueue');
@@ -218,28 +228,28 @@ select * from pgq_node.local_state;
(4 rows)
select * from pgq_node.node_info;
- queue_name | node_type | node_name | worker_name | combined_queue
-------------+-----------+-----------+--------------+----------------
- aqueue | root | node1 | node1_worker |
- bqueue | branch | node2 | node2_worker |
+ queue_name | node_type | node_name | worker_name | combined_queue | node_attrs
+------------+-----------+-----------+--------------+----------------+------------
+ aqueue | root | node1 | node1_worker | | test=1
+ bqueue | branch | node2 | node2_worker | |
(2 rows)
select * from pgq_node.get_node_info('aqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
-----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
- 100 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 3
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs
+----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------
+ 100 | Ok | root | node1 | 1 | 1 | node1 | dbname=node1 | | | node1_worker | f | f | 3 | test=1
(1 row)
select * from pgq_node.get_node_info('bqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
-----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
- 100 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs
+----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------
+ 100 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1 |
(1 row)
select * from pgq_node.get_node_info('cqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
-----------+-----------------------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+-------------+---------------+-----------------+------------------
- 404 | Unknown queue: cqueue | | | | | | | | | | | |
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs
+----------+-----------------------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+-------------+---------------+-----------------+------------------+------------
+ 404 | Unknown queue: cqueue | | | | | | | | | | | | |
(1 row)
select * from pgq_node.get_worker_state('aqueue');
@@ -347,9 +357,9 @@ select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
(1 row)
select * from pgq_node.get_node_info('bqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick
-----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
- 100 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs
+----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------
+ 100 | Ok | branch | node2 | 1 | 1 | node1 | dbname=node1 | | | node2_worker | f | f | 1 |
(1 row)
set session_replication_role = 'replica';
View
11 sql/pgq_node/functions/pgq_node.get_node_info.sql
@@ -1,4 +1,6 @@
+drop function if exists pgq_node.get_node_info(text);
+
create or replace function pgq_node.get_node_info(
in i_queue_name text,
@@ -17,7 +19,8 @@ create or replace function pgq_node.get_node_info(
out worker_name text,
out worker_paused bool,
out worker_uptodate bool,
- out worker_last_tick bigint
+ out worker_last_tick bigint,
+ out node_attrs text
) returns record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.get_node_info(1)
@@ -46,10 +49,12 @@ declare
begin
select 100, 'Ok', n.node_type, n.node_name,
c.node_type, c.queue_name, w.provider_node, l.node_location,
- n.worker_name, w.paused, w.uptodate, w.last_tick_id
+ n.worker_name, w.paused, w.uptodate, w.last_tick_id,
+ n.node_attrs
into ret_code, ret_note, node_type, node_name,
combined_type, combined_queue, provider_node, provider_location,
- worker_name, worker_paused, worker_uptodate, worker_last_tick
+ worker_name, worker_paused, worker_uptodate, worker_last_tick,
+ node_attrs
from pgq_node.node_info n
left join pgq_node.node_info c on (c.queue_name = n.combined_queue)
left join pgq_node.local_state w on (w.queue_name = n.queue_name and w.consumer_name = n.worker_name)
View
35 sql/pgq_node/functions/pgq_node.set_node_attrs.sql
@@ -0,0 +1,35 @@
+
+create or replace function pgq_node.set_node_attrs(
+ in i_queue_name text,
+ in i_node_attrs text,
+ out ret_code int4,
+ out ret_note text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_node.create_attrs(2)
+--
+-- Set node attributes.
+--
+-- Parameters:
+-- i_node_name - cascaded queue name
+-- i_node_attrs - urlencoded node attrs
+--
+-- Returns:
+-- 200 - ok
+-- 404 - node not found
+-- ----------------------------------------------------------------------
+begin
+ update pgq_node.node_info
+ set node_attrs = i_node_attrs
+ where queue_name = i_queue_name;
+ if not found then
+ select 404, 'Node not found' into ret_code, ret_note;
+ return;
+ end if;
+
+ select 200, 'Node attributes updated'
+ into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql security definer;
+
View
10 sql/pgq_node/functions/pgq_node.upgrade_schema.sql
@@ -5,6 +5,16 @@ returns int4 as $$
declare
cnt int4 = 0;
begin
+ -- node_info.node_attrs
+ perform 1 from information_schema.columns
+ where table_schema = 'pgq_node'
+ and table_name = 'node_info'
+ and column_name = 'node_attrs';
+ if not found then
+ alter table pgq_node.node_info add column node_attrs text;
+ cnt := cnt + 1;
+ end if;
+
return cnt;
end;
$$ language plpgsql;
View
2  sql/pgq_node/sql/pgq_node_test.sql
@@ -35,6 +35,8 @@ select * from pgq.ticker('aqueue');
select * from pgq_node.set_subscriber_watermark('aqueue', 'node2', 3);
select queue_name, consumer_name, last_tick from pgq.get_consumer_info();
+select * from pgq_node.set_node_attrs('aqueue', 'test=1');
+
select * from pgq_node.get_node_info('aqueue');
select * from pgq_node.get_subscriber_info('aqueue');
View
1  sql/pgq_node/structure/functions.sql
@@ -51,6 +51,7 @@ select pgq_node.upgrade_schema();
\i functions/pgq_node.demote_root.sql
\i functions/pgq_node.promote_branch.sql
+\i functions/pgq_node.set_node_attrs.sql
-- Group: Provider side operations - worker
\i functions/pgq_node.register_subscriber.sql
View
2  sql/pgq_node/structure/tables.sql
@@ -50,6 +50,7 @@ create table pgq_node.node_location (
-- provider_node - provider node name
-- worker_name - consumer name that maintains this node
-- combined_queue - on 'leaf' the target combined set name
+-- node_attrs - urlencoded fields for worker
--
-- Node types:
-- root - data + batches is generated here
@@ -62,6 +63,7 @@ create table pgq_node.node_info (
node_name text not null,
worker_name text,
combined_queue text,
+ node_attrs text,
foreign key (queue_name, node_name) references pgq_node.node_location,
check (node_type in ('root', 'branch', 'leaf')),
View
6 tests/londiste/regen.sh
@@ -71,15 +71,15 @@ msg "Install londiste3 and initialize nodes"
run londiste3 $v conf/londiste_db1.ini create-root node1 'dbname=db1'
run londiste3 $v conf/londiste_db2.ini create-branch node2 'dbname=db2' --provider='dbname=db1'
run londiste3 $v conf/londiste_db3.ini create-branch node3 'dbname=db3' --provider='dbname=db1'
-run londiste3 $v conf/londiste_db4.ini create-branch node4 'dbname=db4' --provider='dbname=db2'
-run londiste3 $v conf/londiste_db5.ini create-branch node5 'dbname=db5' --provider='dbname=db3'
+run londiste3 $v conf/londiste_db4.ini create-branch node4 'dbname=db4' --provider='dbname=db2' --sync-watermark=node4,node5
+run londiste3 $v conf/londiste_db5.ini create-branch node5 'dbname=db5' --provider='dbname=db3' --sync-watermark=node4,node5
msg "Run ticker"
run pgqd $v -d conf/pgqd.ini
run sleep 5
msg "See topology"
-run londiste3 $v conf/londiste_db4.ini status
+run londiste3 $v conf/londiste_db1.ini status
msg "Run londiste3 daemon for each node"
for db in $db_list; do

0 comments on commit 7c1861e

Please sign in to comment.
Something went wrong with that request. Please try again.