Skip to content

Commit fbcf99a

Browse files
committed
Move replication connection to C level.
1 parent 433fb95 commit fbcf99a

File tree

9 files changed

+296
-53
lines changed

9 files changed

+296
-53
lines changed

lib/extensions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@
6262
from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column
6363

6464
from psycopg2._psycopg import QueryCanceledError, TransactionRollbackError
65-
from psycopg2._psycopg import ReplicationCursor, ReplicationMessage
65+
from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
66+
from psycopg2._psycopg import ReplicationConnection, ReplicationCursor, ReplicationMessage
6667

6768
try:
6869
from psycopg2._psycopg import set_wait_callback, get_wait_callback

lib/extras.py

Lines changed: 11 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
from psycopg2 import extensions as _ext
4040
from psycopg2.extensions import cursor as _cursor
4141
from psycopg2.extensions import connection as _connection
42+
from psycopg2.extensions import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
43+
from psycopg2.extensions import ReplicationConnection as _replicationConnection
4244
from psycopg2.extensions import ReplicationCursor as _replicationCursor
4345
from psycopg2.extensions import ReplicationMessage
4446
from psycopg2.extensions import adapt as _A, quote_ident
@@ -439,65 +441,28 @@ def callproc(self, procname, vars=None):
439441
return LoggingCursor.callproc(self, procname, vars)
440442

441443

442-
"""Replication connection types."""
443-
REPLICATION_LOGICAL = "LOGICAL"
444-
REPLICATION_PHYSICAL = "PHYSICAL"
445-
446-
447-
class ReplicationConnectionBase(_connection):
444+
class ReplicationConnectionBase(_replicationConnection):
448445
"""
449446
Base class for Logical and Physical replication connection
450447
classes. Uses `ReplicationCursor` automatically.
451448
"""
452449

453450
def __init__(self, *args, **kwargs):
454-
"""
455-
Initializes a replication connection by adding appropriate
456-
parameters to the provided DSN and tweaking the connection
457-
attributes.
458-
"""
459-
460-
# replication_type is set in subclasses
461-
if self.replication_type == REPLICATION_LOGICAL:
462-
replication = 'database'
463-
464-
elif self.replication_type == REPLICATION_PHYSICAL:
465-
replication = 'true'
466-
467-
else:
468-
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type)
469-
470-
items = _ext.parse_dsn(args[0])
471-
472-
# we add an appropriate replication keyword parameter, unless
473-
# user has specified one explicitly in the DSN
474-
items.setdefault('replication', replication)
475-
476-
dsn = " ".join(["%s=%s" % (k, psycopg2._param_escape(str(v)))
477-
for (k, v) in items.iteritems()])
478-
479-
args = [dsn] + list(args[1:]) # async is the possible 2nd arg
480451
super(ReplicationConnectionBase, self).__init__(*args, **kwargs)
481-
482-
# prevent auto-issued BEGIN statements
483-
if not self.async:
484-
self.autocommit = True
485-
486-
if self.cursor_factory is None:
487-
self.cursor_factory = ReplicationCursor
452+
self.cursor_factory = ReplicationCursor
488453

489454

490455
class LogicalReplicationConnection(ReplicationConnectionBase):
491456

492457
def __init__(self, *args, **kwargs):
493-
self.replication_type = REPLICATION_LOGICAL
458+
kwargs['replication_type'] = REPLICATION_LOGICAL
494459
super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
495460

496461

497462
class PhysicalReplicationConnection(ReplicationConnectionBase):
498463

499464
def __init__(self, *args, **kwargs):
500-
self.replication_type = REPLICATION_PHYSICAL
465+
kwargs['replication_type'] = REPLICATION_PHYSICAL
501466
super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
502467

503468

@@ -528,16 +493,16 @@ def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None)
528493
if output_plugin is None:
529494
raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")
530495

531-
command += "%s %s" % (slot_type, quote_ident(output_plugin, self))
496+
command += "LOGICAL %s" % quote_ident(output_plugin, self)
532497

533498
elif slot_type == REPLICATION_PHYSICAL:
534499
if output_plugin is not None:
535500
raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")
536501

537-
command += slot_type
502+
command += "PHYSICAL"
538503

539504
else:
540-
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
505+
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
541506

542507
self.execute(command)
543508

@@ -562,15 +527,15 @@ def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
562527
else:
563528
raise psycopg2.ProgrammingError("slot name is required for logical replication")
564529

565-
command += "%s " % slot_type
530+
command += "LOGICAL "
566531

567532
elif slot_type == REPLICATION_PHYSICAL:
568533
if slot_name:
569534
command += "SLOT %s " % quote_ident(slot_name, self)
570535
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
571536

572537
else:
573-
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
538+
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
574539

575540
if type(start_lsn) is str:
576541
lsn = start_lsn.split('/')

psycopg/psycopg.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ typedef struct connectionObject connectionObject;
120120
typedef struct replicationMessageObject replicationMessageObject;
121121

122122
/* some utility functions */
123+
HIDDEN PyObject *parse_arg(int pos, char *name, PyObject *defval, PyObject *args, PyObject *kwargs);
123124
HIDDEN PyObject *psyco_parse_args(PyObject *self, PyObject *args, PyObject *kwargs);
124125
HIDDEN PyObject *psyco_parse_dsn(PyObject *self, PyObject *args, PyObject *kwargs);
125126
HIDDEN PyObject *psyco_make_dsn(PyObject *self, PyObject *args, PyObject *kwargs);

psycopg/psycopgmodule.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "psycopg/connection.h"
3030
#include "psycopg/cursor.h"
31+
#include "psycopg/replication_connection.h"
3132
#include "psycopg/replication_cursor.h"
3233
#include "psycopg/replication_message.h"
3334
#include "psycopg/green.h"
@@ -74,7 +75,7 @@ HIDDEN PyObject *psyco_DescriptionType = NULL;
7475

7576

7677
/* finds a keyword or positional arg (pops it from kwargs if found there) */
77-
static PyObject *
78+
PyObject *
7879
parse_arg(int pos, char *name, PyObject *defval, PyObject *args, PyObject *kwargs)
7980
{
8081
Py_ssize_t nargs = PyTuple_GET_SIZE(args);
@@ -1114,6 +1115,9 @@ INIT_MODULE(_psycopg)(void)
11141115
Py_TYPE(&cursorType) = &PyType_Type;
11151116
if (PyType_Ready(&cursorType) == -1) goto exit;
11161117

1118+
Py_TYPE(&replicationConnectionType) = &PyType_Type;
1119+
if (PyType_Ready(&replicationConnectionType) == -1) goto exit;
1120+
11171121
Py_TYPE(&replicationCursorType) = &PyType_Type;
11181122
if (PyType_Ready(&replicationCursorType) == -1) goto exit;
11191123

@@ -1237,13 +1241,16 @@ INIT_MODULE(_psycopg)(void)
12371241
PyModule_AddStringConstant(module, "__version__", PSYCOPG_VERSION);
12381242
PyModule_AddStringConstant(module, "__doc__", "psycopg PostgreSQL driver");
12391243
PyModule_AddIntConstant(module, "__libpq_version__", PG_VERSION_NUM);
1244+
PyModule_AddIntMacro(module, REPLICATION_PHYSICAL);
1245+
PyModule_AddIntMacro(module, REPLICATION_LOGICAL);
12401246
PyModule_AddObject(module, "apilevel", Text_FromUTF8(APILEVEL));
12411247
PyModule_AddObject(module, "threadsafety", PyInt_FromLong(THREADSAFETY));
12421248
PyModule_AddObject(module, "paramstyle", Text_FromUTF8(PARAMSTYLE));
12431249

12441250
/* put new types in module dictionary */
12451251
PyModule_AddObject(module, "connection", (PyObject*)&connectionType);
12461252
PyModule_AddObject(module, "cursor", (PyObject*)&cursorType);
1253+
PyModule_AddObject(module, "ReplicationConnection", (PyObject*)&replicationConnectionType);
12471254
PyModule_AddObject(module, "ReplicationCursor", (PyObject*)&replicationCursorType);
12481255
PyModule_AddObject(module, "ReplicationMessage", (PyObject*)&replicationMessageType);
12491256
PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType);
@@ -1285,6 +1292,9 @@ INIT_MODULE(_psycopg)(void)
12851292
if (0 != psyco_errors_init()) { goto exit; }
12861293
psyco_errors_fill(dict);
12871294

1295+
replicationPhysicalConst = PyDict_GetItemString(dict, "REPLICATION_PHYSICAL");
1296+
replicationLogicalConst = PyDict_GetItemString(dict, "REPLICATION_LOGICAL");
1297+
12881298
Dprintf("initpsycopg: module initialization complete");
12891299

12901300
exit:

psycopg/replication_connection.h

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/* replication_connection.h - definition for the psycopg replication connection type
2+
*
3+
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
4+
*
5+
* This file is part of psycopg.
6+
*
7+
* psycopg2 is free software: you can redistribute it and/or modify it
8+
* under the terms of the GNU Lesser General Public License as published
9+
* by the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* In addition, as a special exception, the copyright holders give
13+
* permission to link this program with the OpenSSL library (or with
14+
* modified versions of OpenSSL that use the same license as OpenSSL),
15+
* and distribute linked combinations including the two.
16+
*
17+
* You must obey the GNU Lesser General Public License in all respects for
18+
* all of the code used other than OpenSSL.
19+
*
20+
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
21+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
22+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
23+
* License for more details.
24+
*/
25+
26+
#ifndef PSYCOPG_REPLICATION_CONNECTION_H
27+
#define PSYCOPG_REPLICATION_CONNECTION_H 1
28+
29+
#include "psycopg/connection.h"
30+
31+
#ifdef __cplusplus
32+
extern "C" {
33+
#endif
34+
35+
extern HIDDEN PyTypeObject replicationConnectionType;
36+
37+
typedef struct replicationConnectionObject {
38+
connectionObject conn;
39+
40+
long int type;
41+
} replicationConnectionObject;
42+
43+
#define REPLICATION_PHYSICAL 1
44+
#define REPLICATION_LOGICAL 2
45+
46+
extern HIDDEN PyObject *replicationPhysicalConst;
47+
extern HIDDEN PyObject *replicationLogicalConst;
48+
49+
#ifdef __cplusplus
50+
}
51+
#endif
52+
53+
#endif /* !defined(PSYCOPG_REPLICATION_CONNECTION_H) */

0 commit comments

Comments
 (0)