Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for streaming replication protocol #322

Merged
merged 68 commits into from
Aug 15, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
e32e1b8
Add support for streaming replication protocol
a1exsh Jun 1, 2015
80da76d
Get rid of postgres internal includes; check for Win32 for htonl()
a1exsh Jun 2, 2015
44b705f
Improve identify_system: don't hardcode column names
a1exsh Jun 2, 2015
f14521f
Add libpq_support.c and win32_support.c
a1exsh Jun 3, 2015
50df864
Add timersub for Win32. Fix gettimeofday on MinGW.
a1exsh Jun 4, 2015
f7b84ce
Add {libpq,win32}_support.* to the .cproj file
a1exsh Jun 4, 2015
453830f
Add ReplicationMessage object
a1exsh Jun 5, 2015
1ac385d
Fix logical decoding plugin options adaptation on python3
a1exsh Jun 10, 2015
9fc5bf4
Add handling of send_time field in replmsg
a1exsh Jun 10, 2015
35a3262
Expose ReplicationMessage type in extras
a1exsh Jun 11, 2015
9ed90b1
Refer cursor from ReplicationMessage object. At the same time, for th…
a1exsh Jun 11, 2015
e3c3a2c
Merge branch 'master' into feature/replication-message-object
a1exsh Jun 30, 2015
61e52ce
Rework replication protocol
a1exsh Jun 10, 2015
058db56
Merge remote-tracking branch 'zalando/feature/replication-protocol' i…
a1exsh Jun 30, 2015
318706f
Update docs for Replication protocol
a1exsh Jun 30, 2015
0d731aa
Comment on special handling of PGRES_COPY_BOTH
a1exsh Jun 30, 2015
9386653
Update docs on ReplicationCursor
a1exsh Jul 1, 2015
dab41c6
Fix PQconsumeInput usage.
a1exsh Jul 2, 2015
9c1f2ac
Check return value of PQsocket
a1exsh Jul 2, 2015
06f1823
Fix missing free in replmsg_dealloc
a1exsh Jul 3, 2015
eac16d0
Fix missing GC flag in ReplicationMessage type
a1exsh Jul 3, 2015
26fe1f2
Fix use of PQconsumeInput() in pq_read_replication_message()
a1exsh Jul 7, 2015
862eda1
Merge remote-tracking branch 'origin/master' into repl
a1exsh Oct 1, 2015
f872a2a
Remove typedef for uint32, include internal/c.h
a1exsh Sep 30, 2015
937a7a9
Cleanup start replication wrt. slot type a bit.
a1exsh Oct 1, 2015
95ee218
Update replication connection/cursor interface and docs.
a1exsh Oct 1, 2015
cac83da
Use parse_dsn in ReplicationConnectionBase
a1exsh Oct 1, 2015
0233620
Rework replication connection/cursor classes
a1exsh Oct 1, 2015
ea2b87e
Fix create_replication_slot doc signature
a1exsh Oct 13, 2015
6ad2999
Remove IDENTIFY_SYSTEM wrapper method (it can't work with async anyway).
a1exsh Oct 13, 2015
5407907
Fix ReplicationTest: no NotSupportedError now.
a1exsh Oct 14, 2015
fea2260
Fix stop_replication: always raise outside the loop.
a1exsh Oct 14, 2015
a0b42a1
Update stop_repl, require replication consumer to be a callable.
a1exsh Oct 14, 2015
e05b4fd
Add checks on replication state, have to have a separate check for co…
a1exsh Oct 14, 2015
822d671
Clear repl_stop flag after the consume loop.
a1exsh Oct 14, 2015
e3097ec
Fix select/timeout indication in async replication example
a1exsh Oct 14, 2015
28a1a00
Remove commented copy_both code in pqfetch.
a1exsh Oct 14, 2015
9ab38ee
Add psyco_curs_datetime_init
a1exsh Oct 14, 2015
8e518d4
Merge branch 'master' into feature/replication-protocol
a1exsh Oct 15, 2015
d14fea3
Use quote_ident from psycopg2.extensions
a1exsh Oct 15, 2015
cf4f241
Fix async replication and test.
a1exsh Oct 15, 2015
0435320
Fix PSYCOPG2_TEST_REPL_DSN handling.
a1exsh Oct 16, 2015
4ab7cf0
Replace stop_replication with requirement for an exception.
a1exsh Oct 19, 2015
7aea2ce
Improve async replication test.
a1exsh Oct 19, 2015
0bb81fc
Properly subclass ReplicationCursor on C level.
a1exsh Oct 19, 2015
23abe4f
Add quick start to the replication doc, minor doc fixes.
a1exsh Oct 20, 2015
b3f8e9a
Fix send_time printf format in replmsg_repr().
a1exsh Oct 20, 2015
089e745
Fix cursor_init() declaration for use in replication_cursor_type.c
a1exsh Oct 20, 2015
22cbfb2
Actually add replication tests to the test suite.
a1exsh Oct 20, 2015
76c7f4a
Use direct call to consume() callable in pq_copy_both()
a1exsh Oct 22, 2015
e69dafb
Move the `decode` parameter to `start_replication()`.
a1exsh Oct 23, 2015
dd6bcbd
Improve async replication example.
a1exsh Oct 23, 2015
8b79bf4
Drop ReplicationCursor.flush_feedback(), rectify pq_*_replication_*()…
a1exsh Oct 23, 2015
4b9a6f4
Merge branch 'master' into feature/replication-protocol
a1exsh Oct 27, 2015
7aba8b3
Rework psycopg2.connect() interface.
a1exsh Oct 27, 2015
433fb95
Merge branch 'feature/connect2' into feature/replication-protocol
a1exsh Oct 27, 2015
fbcf99a
Move replication connection to C level.
a1exsh Oct 27, 2015
e61db57
Add dbname=replication for physical replication type.
a1exsh Oct 30, 2015
09a4bb7
Allow retrying start_replication after syntax or data error.
a1exsh Jan 5, 2016
5d33b39
Fix error test for invalid START_REPLICATION command.
a1exsh Jan 21, 2016
cb70325
Merge branch 'master' into feature/replication-protocol-c-connection-…
a1exsh Mar 4, 2016
da6e061
Use python-defined make_dsn() for ReplicationConnection class
a1exsh Mar 8, 2016
1d52f34
We don't need to expose cursor_init(), call tp_init() on the type ins…
a1exsh Mar 8, 2016
2de2ed7
Remove some dead code
a1exsh Mar 8, 2016
b21c8f7
Move replication-related imports to extras.py
a1exsh Mar 8, 2016
3f10b4d
Remove duplicated doc for make_dsn()
a1exsh Mar 8, 2016
a7887fa
Merge remote-tracking branch 'zalando/feature/replication-protocol' i…
a1exsh Mar 8, 2016
d5443c6
Fix TODOs in ReplicationMessage inline docs
a1exsh Apr 21, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions doc/src/extras.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,128 @@ Logging cursor

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick start for docs:

You must be using PostgreSQL 9.4 or above to run this quick start.

Make sure that replication connections are permitted for user postgres from localhost in pg_hba.conf then SELECT pg_reload_conf(). Create a database psycopg2test.

Then run something like the following to quickly try the replication support out. This is not production code - it has no error handling, it sends feedback too often, etc - and it's only intended as a simple demo of logical replication functionality.

from __future__ import print_function
import sys
import psycopg2
import psycopg2.extras

conn = psycopg2.connect('dbname=psycopg2test user=postgres',
   connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()
try:
    cur.start_replication(slot_name = 'pytest')
except psycopg2.ProgrammingError:
    cur.create_replication_slot('pytest',
        slot_type=psycopg2.extras.REPLICATION_LOGICAL,
        output_plugin='test_decoding')
    cur.start_replication(slot_name = 'pytest')

class DemoConsumer(object):
    def consume(self, msg):
        print(msg.payload)
        msg.cursor.send_replication_feedback(flush_lsn=msg.data_start)

democonsumer = DemoConsumer()

print("Starting streaming, press Control-C to end...", file=sys.stderr)
try:
   cur.consume_replication_stream(democonsumer)
except KeyboardInterrupt,ex:
   cur.close()
   conn.close()
   print("The slot 'pytest' still exists. Drop it with SELECT pg_drop_replication_slot('pytest'); if no longer needed")
   print("WARNING: Transaction logs will accumulate in pg_xlog until the slot is dropped")

This will continue running until cancelled with control-C.

You can now make changes to the psycopg2test database using a normal psycopg2 session, psql, etc and see the logical decoding stream printed by this demo client.

.. autoclass:: MinTimeLoggingCursor

Replication cursor
^^^^^^^^^^^^^^^^^^

.. autoclass:: ReplicationConnection

This connection factory class can be used to open a special type of
connection that is used for streaming replication.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should note that normal SQL is not permitted or supported on a replication connection.


Example::

from psycopg2.extras import ReplicationConnection, REPLICATION_PHYSICAL, REPLICATION_LOGICAL
conn = psycopg2.connect(dsn, connection_factory=ReplicationConnection)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the user need to supply replication=database or replication=on in the dsn? Or do you add that?

If connecting to 9.3 or older, which don't support replication=database and only do "physical" replication, how's that handled?

(I'm intentionally reading docs before code so I see from a user perspective and know what the docs don't cover).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, thanks for your interest!

We add the replication keyword unless already supplied by the user. Good point about 9.3 compatibility, I don't think I've ever tested that as my main focus was logical replication. I will revisit this.

cur = conn.cursor()

.. seealso::

- PostgreSQL `Replication protocol`__

.. __: http://www.postgresql.org/docs/current/static/protocol-replication.html

.. autoclass:: ReplicationCursor

.. method:: identify_system()

Get information about the cluster status in form of a dict with
``systemid``, ``timeline``, ``xlogpos`` and ``dbname`` as keys.

Example::

>>> print cur.identify_system()
{'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'}

.. method:: create_replication_slot(slot_type, slot_name, output_plugin=None)

Create streaming replication slot.

:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to be created
:param output_plugin: name of the logical decoding output plugin to use
(logical replication only)

Example::

cur.create_replication_slot(REPLICATION_LOGICAL, "testslot", "test_decoding")

.. method:: drop_replication_slot(slot_name)

Drop streaming replication slot.

:param slot_name: name of the replication slot to drop

Example::

cur.drop_replication_slot("testslot")

.. method:: start_replication(file, slot_type, slot_name=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)

Start and consume replication stream.

:param file: a file-like object to write replication stream messages to
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to use (required for
logical replication)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For "physical" replication, no slot is required; it's permissible to replicate w/o a slot. It'd be nice to expose this in the API, e.g. by passing None as the slot name.

:param start_lsn: the point in replication stream (WAL position) to start
from, in the form ``XXX/XXX`` (forward-slash separated
pair of hexadecimals)
:param timeline: WAL history timeline to start streaming from (optional,
can only be used with physical replication)
:param keepalive_interval: interval (in seconds) to send keepalive
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be documented as "synchronous mode only, ignored for asynchronous mode")? Or is it used? If so, how?

messages to the server, in case there was no
communication during that period of time
:param options: an dictionary of options to pass to logical replication
slot

The ``keepalive_interval`` must be greater than zero.

This method never returns unless an error message is sent from the
server, or the server closes connection, or there is an exception in the
``write()`` method of the ``file`` object.

One can even use ``sys.stdout`` as the destination (this is only good for
testing purposes, however)::

>>> cur.start_replication(sys.stdout, "testslot")
...

This method acts much like the `~cursor.copy_to()` with an important
distinction that ``write()`` method return value is dirving the
server-side replication cursor. In order to report to the server that
the all the messages up to the current one have been stored reliably, one
should return true value (i.e. something that satisfies ``if retval:``
conidtion) from the ``write`` callback::

class ReplicationStreamWriter(object):
def write(self, msg):
if store_message_reliably(msg):
return True

cur.start_replication(writer, "testslot")
...

.. note::

One needs to be aware that failure to update the server-side cursor
on any one replication slot properly by constantly consuming and
reporting success to the server can eventually lead to "disk full"
condition on the server, because the server retains all the WAL
segments that might be needed to stream the changes via currently
open replication slots.

Drop any open replication slots that are no longer being used. The
list of open slots can be obtained by running a query like ``SELECT *
FROM pg_replication_slots``.

.. data:: REPLICATION_PHYSICAL

.. data:: REPLICATION_LOGICAL

.. index::
pair: Cursor; Replication


.. index::
Expand Down
138 changes: 138 additions & 0 deletions lib/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,144 @@ def callproc(self, procname, vars=None):
return LoggingCursor.callproc(self, procname, vars)


class ReplicationConnection(_connection):
"""A connection that uses `ReplicationCursor` automatically."""

def __init__(self, *args, **kwargs):
"""Initializes a replication connection, by adding appropriate replication parameter to the provided dsn arguments."""

if len(args):
dsn = args[0]

# FIXME: could really use parse_dsn here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, needs fixing before merge really


if dsn.startswith('postgres://') or dsn.startswith('postgresql://'):
# poor man's url parsing
if dsn.rfind('?') > 0:
if not dsn.endswith('?'):
dsn += '&'
else:
dsn += '?'
else:
dsn += ' '
dsn += 'replication=database'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replication=database won't work on 9.3 or older, fwiw... but it's useful to have replication=true supported for 9.4+ too, because that lets you connect without knowing a particular database name when you plan to do streaming replication.

I suggest appending replication=database only if you don't already find a replication dsn term. That way the user can explicitly pass replication=on if they really want it, and it doesn't require any complicated API that's more effort than it's worth to add.

args = [dsn] + list(args[1:])
else:
dbname = kwargs.get('dbname', None)
if dbname is None:
kwargs['dbname'] = 'replication'

if kwargs.get('replication', None) is None:
kwargs['replication'] = 'database' if dbname else 'true'

super(ReplicationConnection, self).__init__(*args, **kwargs)

# prevent auto-issued BEGIN statements
self.autocommit = True

def cursor(self, *args, **kwargs):
kwargs.setdefault('cursor_factory', ReplicationCursor)
return super(ReplicationConnection, self).cursor(*args, **kwargs)


"""Streamging replication types."""
REPLICATION_PHYSICAL = 0
REPLICATION_LOGICAL = 1

class ReplicationCursor(_cursor):
"""A cursor used for replication commands."""

def identify_system(self):
"""Get information about the cluster status."""

self.execute("IDENTIFY_SYSTEM")
return dict(zip(['systemid', 'timeline', 'xlogpos', 'dbname'],
self.fetchall()[0]))

def quote_ident(self, ident):
# FIXME: use PQescapeIdentifier or psycopg_escape_identifier_easy, somehow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this will be released we would have PQescapeIdentifier exposed, so yes, it will happen.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why's this exposed on the ReplicationCursor?

This is an oft-requested piece of functionality, but it should really be on the base psycopg2 Connection object IMO.

return '"%s"' % ident.replace('"', '""')

def create_replication_slot(self, slot_type, slot_name, output_plugin=None):
"""Create streaming replication slot."""

command = "CREATE_REPLICATION_SLOT %s " % self.quote_ident(slot_name)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like some more validation's needed here. Allowing slot_name to be None if physical replication is in use, but raising an exception if it's None for logical. Right now you'll get an error like

AttributeError: 'NoneType' object has no attribute 'replace'

which isn't very enlightening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slot name is optional in case of START_REPLICATION, but not with CREATE_REPLICATION_SLOT.


if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise RuntimeError("output_plugin is required for logical replication slot")

command += "LOGICAL %s" % self.quote_ident(output_plugin)

elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise RuntimeError("output_plugin is not applicable to physical replication")

command += "PHYSICAL"

else:
raise RuntimeError("unrecognized replication slot type")

return self.execute(command)

def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""

command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name)
return self.execute(command)

def start_replication(self, o, slot_type, slot_name=None, start_lsn=None,
timeline=0, keepalive_interval=10, options=None):
"""Start and consume replication stream."""

if keepalive_interval <= 0:
raise RuntimeError("keepalive_interval must be > 0: %d" % keepalive_interval)

command = "START_REPLICATION "

if slot_type == REPLICATION_LOGICAL and slot_name is None:
raise RuntimeError("slot_name is required for logical replication slot")

if slot_name:
command += "SLOT %s " % self.quote_ident(slot_name)

if slot_type == REPLICATION_LOGICAL:
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
command += "PHYSICAL "
else:
raise RuntimeError("unrecognized replication slot type")

if start_lsn is None:
start_lsn = '0/0'

# reparse lsn to catch possible garbage
lsn = start_lsn.split('/')
command += "%X/%X" % (int(lsn[0], 16), int(lsn[1], 16))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the LSN should internally be a uint64, formatted in Pg's x/y format for output on the wire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we would store this in an object attribute or pass the arg to C code, yes. But it is so much easier to just construct the replication protocol command in Python, so there's no actual difference: we append it to the string and that's it.


if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise RuntimeError("cannot specify timeline for logical replication")

if timeline < 0:
raise RuntimeError("timeline must be >= 0: %d" % timeline)

command += " TIMELINE %d" % timeline

if options:
if slot_type == REPLICATION_PHYSICAL:
raise RuntimeError("cannot specify plugin options for physical replication")

command += " ("
for k,v in options.iteritems():
if not command.endswith('('):
command += ", "
command += "%s %s" % (self.quote_ident(k), _A(str(v)).getquoted())
command += ")"

return self.start_replication_expert(o, command, keepalive_interval)


# a dbtype and adapter for Python UUID type

class UUID_adapter(object):
Expand Down
6 changes: 6 additions & 0 deletions psycopg/cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct cursorObject {
#define DEFAULT_COPYSIZE 16384
#define DEFAULT_COPYBUFF 8192

int keepalive_interval; /* interval for keepalive messages in replication mode */

PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */

Expand All @@ -88,6 +90,10 @@ struct cursorObject {

};

/* streaming replication modes */
#define CURSOR_REPLICATION_PHYSICAL 0
#define CURSOR_REPLICATION_LOGICAL 1


/* C-callable functions in cursor_int.c and cursor_type.c */
BORROWED HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid);
Expand Down
39 changes: 39 additions & 0 deletions psycopg/cursor_type.c
Original file line number Diff line number Diff line change
Expand Up @@ -1579,6 +1579,43 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
return res;
}

#define psyco_curs_start_replication_expert_doc \
"start_replication_expert(file, command, keepalive_interval) -- Start and consume replication stream with direct command."

static PyObject *
psyco_curs_start_replication_expert(cursorObject *self, PyObject *args)
{
PyObject *file, *res = NULL;
char *command;
int keepalive_interval;

if (!PyArg_ParseTuple(args, "O&si",
_psyco_curs_has_write_check, &file,
&command, &keepalive_interval)) {
return NULL;
}

EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, start_replication_expert);
EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(self->conn, start_replication_expert);

Dprintf("psyco_curs_start_replication_expert: command = %s", command);

self->copysize = 0;
Py_INCREF(file);
self->copyfile = file;
self->keepalive_interval = keepalive_interval;

if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None;
Py_INCREF(Py_None);
}
Py_CLEAR(self->copyfile);

return res;
}

/* extension: closed - return true if cursor is closed */

#define psyco_curs_closed_doc \
Expand Down Expand Up @@ -1753,6 +1790,8 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc},
{"copy_expert", (PyCFunction)psyco_curs_copy_expert,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc},
{"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert,
METH_VARARGS, psyco_curs_start_replication_expert_doc},
{NULL}
};

Expand Down
Loading