Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for streaming replication protocol #322

Merged
merged 68 commits into from Aug 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
e32e1b8
Add support for streaming replication protocol
a1exsh Jun 1, 2015
80da76d
Get rid of postgres internal includes; check for Win32 for htonl()
a1exsh Jun 2, 2015
44b705f
Improve identify_system: don't hardcode column names
a1exsh Jun 2, 2015
f14521f
Add libpq_support.c and win32_support.c
a1exsh Jun 3, 2015
50df864
Add timersub for Win32. Fix gettimeofday on MinGW.
a1exsh Jun 4, 2015
f7b84ce
Add {libpq,win32}_support.* to the .cproj file
a1exsh Jun 4, 2015
453830f
Add ReplicationMessage object
a1exsh Jun 5, 2015
1ac385d
Fix logical decoding plugin options adaptation on python3
a1exsh Jun 10, 2015
9fc5bf4
Add handling of send_time field in replmsg
a1exsh Jun 10, 2015
35a3262
Expose ReplicationMessage type in extras
a1exsh Jun 11, 2015
9ed90b1
Refer cursor from ReplicationMessage object. At the same time, for th…
a1exsh Jun 11, 2015
e3c3a2c
Merge branch 'master' into feature/replication-message-object
a1exsh Jun 30, 2015
61e52ce
Rework replication protocol
a1exsh Jun 10, 2015
058db56
Merge remote-tracking branch 'zalando/feature/replication-protocol' i…
a1exsh Jun 30, 2015
318706f
Update docs for Replication protocol
a1exsh Jun 30, 2015
0d731aa
Comment on special handling of PGRES_COPY_BOTH
a1exsh Jun 30, 2015
9386653
Update docs on ReplicationCursor
a1exsh Jul 1, 2015
dab41c6
Fix PQconsumeInput usage.
a1exsh Jul 2, 2015
9c1f2ac
Check return value of PQsocket
a1exsh Jul 2, 2015
06f1823
Fix missing free in replmsg_dealloc
a1exsh Jul 3, 2015
eac16d0
Fix missing GC flag in ReplicationMessage type
a1exsh Jul 3, 2015
26fe1f2
Fix use of PQconsumeInput() in pq_read_replication_message()
a1exsh Jul 7, 2015
862eda1
Merge remote-tracking branch 'origin/master' into repl
a1exsh Oct 1, 2015
f872a2a
Remove typedef for uint32, include internal/c.h
a1exsh Sep 30, 2015
937a7a9
Cleanup start replication wrt. slot type a bit.
a1exsh Oct 1, 2015
95ee218
Update replication connection/cursor interface and docs.
a1exsh Oct 1, 2015
cac83da
Use parse_dsn in ReplicationConnectionBase
a1exsh Oct 1, 2015
0233620
Rework replication connection/cursor classes
a1exsh Oct 1, 2015
ea2b87e
Fix create_replication_slot doc signature
a1exsh Oct 13, 2015
6ad2999
Remove IDENTIFY_SYSTEM wrapper method (it can't work with async anyway).
a1exsh Oct 13, 2015
5407907
Fix ReplicationTest: no NotSupportedError now.
a1exsh Oct 14, 2015
fea2260
Fix stop_replication: always raise outside the loop.
a1exsh Oct 14, 2015
a0b42a1
Update stop_repl, require replication consumer to be a callable.
a1exsh Oct 14, 2015
e05b4fd
Add checks on replication state, have to have a separate check for co…
a1exsh Oct 14, 2015
822d671
Clear repl_stop flag after the consume loop.
a1exsh Oct 14, 2015
e3097ec
Fix select/timeout indication in async replication example
a1exsh Oct 14, 2015
28a1a00
Remove commented copy_both code in pqfetch.
a1exsh Oct 14, 2015
9ab38ee
Add psyco_curs_datetime_init
a1exsh Oct 14, 2015
8e518d4
Merge branch 'master' into feature/replication-protocol
a1exsh Oct 15, 2015
d14fea3
Use quote_ident from psycopg2.extensions
a1exsh Oct 15, 2015
cf4f241
Fix async replication and test.
a1exsh Oct 15, 2015
0435320
Fix PSYCOPG2_TEST_REPL_DSN handling.
a1exsh Oct 16, 2015
4ab7cf0
Replace stop_replication with requirement for an exception.
a1exsh Oct 19, 2015
7aea2ce
Improve async replication test.
a1exsh Oct 19, 2015
0bb81fc
Properly subclass ReplicationCursor on C level.
a1exsh Oct 19, 2015
23abe4f
Add quick start to the replication doc, minor doc fixes.
a1exsh Oct 20, 2015
b3f8e9a
Fix send_time printf format in replmsg_repr().
a1exsh Oct 20, 2015
089e745
Fix cursor_init() declaration for use in replication_cursor_type.c
a1exsh Oct 20, 2015
22cbfb2
Actually add replication tests to the test suite.
a1exsh Oct 20, 2015
76c7f4a
Use direct call to consume() callable in pq_copy_both()
a1exsh Oct 22, 2015
e69dafb
Move the `decode` parameter to `start_replication()`.
a1exsh Oct 23, 2015
dd6bcbd
Improve async replication example.
a1exsh Oct 23, 2015
8b79bf4
Drop ReplicationCursor.flush_feedback(), rectify pq_*_replication_*()…
a1exsh Oct 23, 2015
4b9a6f4
Merge branch 'master' into feature/replication-protocol
a1exsh Oct 27, 2015
7aba8b3
Rework psycopg2.connect() interface.
a1exsh Oct 27, 2015
433fb95
Merge branch 'feature/connect2' into feature/replication-protocol
a1exsh Oct 27, 2015
fbcf99a
Move replication connection to C level.
a1exsh Oct 27, 2015
e61db57
Add dbname=replication for physical replication type.
a1exsh Oct 30, 2015
09a4bb7
Allow retrying start_replication after syntax or data error.
a1exsh Jan 5, 2016
5d33b39
Fix error test for invalid START_REPLICATION command.
a1exsh Jan 21, 2016
cb70325
Merge branch 'master' into feature/replication-protocol-c-connection-…
a1exsh Mar 4, 2016
da6e061
Use python-defined make_dsn() for ReplicationConnection class
a1exsh Mar 8, 2016
1d52f34
We don't need to expose cursor_init(), call tp_init() on the type ins…
a1exsh Mar 8, 2016
2de2ed7
Remove some dead code
a1exsh Mar 8, 2016
b21c8f7
Move replication-related imports to extras.py
a1exsh Mar 8, 2016
3f10b4d
Remove duplicated doc for make_dsn()
a1exsh Mar 8, 2016
a7887fa
Merge remote-tracking branch 'zalando/feature/replication-protocol' i…
a1exsh Mar 8, 2016
d5443c6
Fix TODOs in ReplicationMessage inline docs
a1exsh Apr 21, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
428 changes: 428 additions & 0 deletions doc/src/extras.rst

Large diffs are not rendered by default.

123 changes: 122 additions & 1 deletion lib/extras.py
Expand Up @@ -39,8 +39,12 @@
from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection
from psycopg2.extensions import adapt as _A
from psycopg2.extensions import adapt as _A, quote_ident
from psycopg2.extensions import b
from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
from psycopg2._psycopg import ReplicationConnection as _replicationConnection
from psycopg2._psycopg import ReplicationCursor as _replicationCursor
from psycopg2._psycopg import ReplicationMessage


class DictCursorBase(_cursor):
Expand Down Expand Up @@ -437,6 +441,123 @@ def callproc(self, procname, vars=None):
return LoggingCursor.callproc(self, procname, vars)


class LogicalReplicationConnection(_replicationConnection):

def __init__(self, *args, **kwargs):
kwargs['replication_type'] = REPLICATION_LOGICAL
super(LogicalReplicationConnection, self).__init__(*args, **kwargs)


class PhysicalReplicationConnection(_replicationConnection):

def __init__(self, *args, **kwargs):
kwargs['replication_type'] = REPLICATION_PHYSICAL
super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)


class StopReplication(Exception):
"""
Exception used to break out of the endless loop in
`~ReplicationCursor.consume_stream()`.

Subclass of `~exceptions.Exception`. Intentionally *not* inherited from
`~psycopg2.Error` as occurrence of this exception does not indicate an
error.
"""
pass


class ReplicationCursor(_replicationCursor):
"""A cursor used for communication on replication connections."""

def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""Create streaming replication slot."""

command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)

if slot_type is None:
slot_type = self.connection.replication_type

if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")

command += "LOGICAL %s" % quote_ident(output_plugin, self)

elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")

command += "PHYSICAL"

else:
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))

self.execute(command)

def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""

command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
self.execute(command)

def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False):
"""Start replication stream."""

command = "START_REPLICATION "

if slot_type is None:
slot_type = self.connection.replication_type

if slot_type == REPLICATION_LOGICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
else:
raise psycopg2.ProgrammingError("slot name is required for logical replication")

command += "LOGICAL "

elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % quote_ident(slot_name, self)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX

else:
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))

if type(start_lsn) is str:
lsn = start_lsn.split('/')
lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
else:
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF)

command += lsn

if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError("cannot specify timeline for logical replication")

command += " TIMELINE %d" % timeline

if options:
if slot_type == REPLICATION_PHYSICAL:
raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication")

command += " ("
for k,v in options.iteritems():
if not command.endswith('('):
command += ", "
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")"

self.start_replication_expert(command, decode=decode)

# allows replication cursors to be used in select.select() directly
def fileno(self):
return self.connection.fileno()


# a dbtype and adapter for Python UUID type

class UUID_adapter(object):
Expand Down
104 changes: 104 additions & 0 deletions psycopg/libpq_support.c
@@ -0,0 +1,104 @@
/* libpq_support.c - functions not provided by libpq, but which are
* required for advanced communication with the server, such as
* streaming replication
*
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/

#define PSYCOPG_MODULE
#include "psycopg/psycopg.h"

#include "psycopg/libpq_support.h"

/* htonl(), ntohl() */
#ifdef _WIN32
#include <winsock2.h>
/* gettimeofday() */
#include "psycopg/win32_support.h"
#else
#include <arpa/inet.h>
#endif

/* support routines taken from pg_basebackup/streamutil.c */

/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
* backend code. The protocol always uses integer timestamps, regardless of
* server setting.
*/
pg_int64
feGetCurrentTimestamp(void)
{
pg_int64 result;
struct timeval tp;

gettimeofday(&tp, NULL);

result = (pg_int64) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);

result = (result * USECS_PER_SEC) + tp.tv_usec;

return result;
}

/*
* Converts an int64 to network byte order.
*/
void
fe_sendint64(pg_int64 i, char *buf)
{
uint32 n32;

/* High order half first, since we're doing MSB-first */
n32 = (uint32) (i >> 32);
n32 = htonl(n32);
memcpy(&buf[0], &n32, 4);

/* Now the low order half */
n32 = (uint32) i;
n32 = htonl(n32);
memcpy(&buf[4], &n32, 4);
}

/*
* Converts an int64 from network byte order to native format.
*/
pg_int64
fe_recvint64(char *buf)
{
pg_int64 result;
uint32 h32;
uint32 l32;

memcpy(&h32, buf, 4);
memcpy(&l32, buf + 4, 4);
h32 = ntohl(h32);
l32 = ntohl(l32);

result = h32;
result <<= 32;
result |= l32;

return result;
}
49 changes: 49 additions & 0 deletions psycopg/libpq_support.h
@@ -0,0 +1,49 @@
/* libpq_support.h - definitions for libpq_support.c
*
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#ifndef PSYCOPG_LIBPQ_SUPPORT_H
#define PSYCOPG_LIBPQ_SUPPORT_H 1

#include "psycopg/config.h"
#include "internal/c.h"

/* type and constant definitions from internal postgres includes not available otherwise */
typedef unsigned PG_INT64_TYPE XLogRecPtr;

/* have to use lowercase %x, as PyString_FromFormat can't do %X */
#define XLOGFMTSTR "%x/%x"
#define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF))

/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */
#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */
#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */

#define SECS_PER_DAY 86400
#define USECS_PER_SEC 1000000LL

HIDDEN pg_int64 feGetCurrentTimestamp(void);
HIDDEN void fe_sendint64(pg_int64 i, char *buf);
HIDDEN pg_int64 fe_recvint64(char *buf);

#endif /* !defined(PSYCOPG_LIBPQ_SUPPORT_H) */