Browse files

Merge branch '2.4.4-align' into develop

  • Loading branch information...
2 parents c861fc3 + 52ab326 commit e62cde293553afd14f793b1e0542157c53748dc5 @dvarrazzo dvarrazzo committed Dec 23, 2011
View
85 psycopg2ct/__init__.py
@@ -6,7 +6,7 @@
from psycopg2ct._impl.adapters import Binary, Date, Time, Timestamp
from psycopg2ct._impl.adapters import DateFromTicks, TimeFromTicks
from psycopg2ct._impl.adapters import TimestampFromTicks
-from psycopg2ct._impl.connection import connect
+from psycopg2ct._impl.connection import _connect
from psycopg2ct._impl.exceptions import *
from psycopg2ct._impl.typecasts import BINARY, DATETIME, NUMBER, ROWID, STRING
@@ -19,4 +19,87 @@
_ext.register_adapter(tuple, _ext.SQL_IN)
_ext.register_adapter(type(None), _ext.NoneAdapter)
+
+import re
+
+def _param_escape(s,
+ re_escape=re.compile(r"([\\'])"),
+ re_space=re.compile(r'\s')):
+ """
+ Apply the escaping rule required by PQconnectdb
+ """
+ if not s: return "''"
+
+ s = re_escape.sub(r'\\\1', s)
+ if re_space.search(s):
+ s = "'" + s + "'"
+
+ return s
+
+del re
+
+
+def connect(dsn=None,
+ database=None, user=None, password=None, host=None, port=None,
+ connection_factory=None, async=False, **kwargs):
+ """
+ Create a new database connection.
+
+ The connection parameters can be specified either as a string:
+
+ conn = psycopg2.connect("dbname=test user=postgres password=secret")
+
+ or using a set of keyword arguments:
+
+ conn = psycopg2.connect(database="test", user="postgres", password="secret")
+
+ The basic connection parameters are:
+
+ - *dbname*: the database name (only in dsn string)
+ - *database*: the database name (only as keyword argument)
+ - *user*: user name used to authenticate
+ - *password*: password used to authenticate
+ - *host*: database host address (defaults to UNIX socket if not provided)
+ - *port*: connection port number (defaults to 5432 if not provided)
+
+ Using the *connection_factory* parameter a different class or connections
+ factory can be specified. It should be a callable object taking a dsn
+ argument.
+
+ Using *async*=True an asynchronous connection will be created.
+
+ Any other keyword parameter will be passed to the underlying client
+ library: the list of supported parameter depends on the library version.
+
+ """
+ if dsn is None:
+ # Note: reproducing the behaviour of the previous C implementation:
+ # keyword are silently swallowed if a DSN is specified. I would have
+ # raised an exception. File under "histerical raisins".
+ items = []
+ if database is not None:
+ items.append(('dbname', database))
+ if user is not None:
+ items.append(('user', user))
+ if password is not None:
+ items.append(('password', password))
+ if host is not None:
+ items.append(('host', host))
+ # Reproducing the previous C implementation behaviour: swallow a
+ # negative port. The libpq would raise an exception for it.
+ if port is not None and int(port) > 0:
+ items.append(('port', port))
+
+ items.extend(
+ [(k, v) for (k, v) in kwargs.iteritems() if v is not None])
+ dsn = " ".join(["%s=%s" % (k, _param_escape(str(v)))
+ for (k, v) in items])
+
+ if not dsn:
+ raise InterfaceError('missing dsn and no parameters')
+
+ return _connect(dsn,
+ connection_factory=connection_factory, async=async)
+
+
__all__ = filter(lambda k: not k.startswith('_'), locals().keys())
View
27 psycopg2ct/_impl/connection.py
@@ -764,33 +764,16 @@ def _have_wait_callback(self):
return bool(_green_callback)
-def connect(dsn=None, database=None, host=None, port=None, user=None,
- password=None, async=False, connection_factory=Connection):
-
- if dsn is None:
- args = []
- if database is not None:
- args.append('dbname=%s' % database)
- if host is not None:
- args.append('host=%s' % host)
- if port is not None:
- if isinstance(port, str):
- port = int(port)
-
- if not isinstance(port, int):
- raise TypeError('port must be a string or int')
- args.append('port=%d' % port)
- if user is not None:
- args.append('user=%s' % user)
- if password is not None:
- args.append('password=%s' % password)
- dsn = ' '.join(args)
+def _connect(dsn, connection_factory=None, async=False):
+ if connection_factory is None:
+ connection_factory = Connection
# Mimic the construction method as used by psycopg2, which notes:
# Here we are breaking the connection.__init__ interface defined
# by psycopg2. So, if not requiring an async conn, avoid passing
# the async parameter.
if async:
return connection_factory(dsn, async=True)
- return connection_factory(dsn)
+ else:
+ return connection_factory(dsn)
View
8 psycopg2ct/_impl/consts.py
@@ -5,10 +5,10 @@
# Isolation level values.
ISOLATION_LEVEL_AUTOCOMMIT = 0
-ISOLATION_LEVEL_READ_UNCOMMITTED = 1
-ISOLATION_LEVEL_READ_COMMITTED = 2
-ISOLATION_LEVEL_REPEATABLE_READ = 3
-ISOLATION_LEVEL_SERIALIZABLE = 4
+ISOLATION_LEVEL_READ_UNCOMMITTED = 4
+ISOLATION_LEVEL_READ_COMMITTED = 1
+ISOLATION_LEVEL_REPEATABLE_READ = 2
+ISOLATION_LEVEL_SERIALIZABLE = 3
# psycopg connection status values.
STATUS_SETUP = 0
View
37 psycopg2ct/extras.py
@@ -86,18 +86,28 @@ def fetchall(self):
res = _cursor.fetchall(self)
return res
- def next(self):
+ def __iter__(self):
if self._prefetch:
- res = _cursor.fetchone(self)
- if res is None:
- raise StopIteration()
+ res = _cursor.fetchmany(self, self.itersize)
+ if not res:
+ return
if self._query_executed:
self._build_index()
if not self._prefetch:
- res = _cursor.fetchone(self)
- if res is None:
- raise StopIteration()
- return res
+ res = _cursor.fetchmany(self, self.itersize)
+
+ for r in res:
+ yield r
+
+ # the above was the first itersize record. the following are
+ # in a repeated loop.
+ while 1:
+ res = _cursor.fetchmany(self, self.itersize)
+ if not res:
+ return
+ for r in res:
+ yield r
+
class DictConnection(_connection):
"""A connection that uses `DictCursor` automatically."""
@@ -694,7 +704,7 @@ def get_oids(self, conn_or_curs):
# revert the status of the connection as before the command
if (conn_status != _ext.STATUS_IN_TRANSACTION
- and conn.isolation_level != _ext.ISOLATION_LEVEL_AUTOCOMMIT):
+ and not conn.autocommit):
conn.rollback()
return tuple(rv0), tuple(rv1)
@@ -831,8 +841,8 @@ def parse(self, s, curs):
tokens = self.tokenize(s)
if len(tokens) != len(self.atttypes):
raise psycopg2.DataError(
- "expecting %d components for the type %s, %d found instead",
- (len(self.atttypes), self.name, len(self.tokens)))
+ "expecting %d components for the type %s, %d found instead" %
+ (len(self.atttypes), self.name, len(tokens)))
attrs = [ curs.cast(oid, token)
for oid, token in zip(self.atttypes, tokens) ]
@@ -903,15 +913,16 @@ def _from_db(self, name, conn_or_curs):
FROM pg_type t
JOIN pg_namespace ns ON typnamespace = ns.oid
JOIN pg_attribute a ON attrelid = typrelid
-WHERE typname = %%s and nspname = %%s
+WHERE typname = %%s AND nspname = %%s
+ AND attnum > 0 AND NOT attisdropped
ORDER BY attnum;
""" % typarray, (tname, schema))
recs = curs.fetchall()
# revert the status of the connection as before the command
if (conn_status != _ext.STATUS_IN_TRANSACTION
- and conn.isolation_level != _ext.ISOLATION_LEVEL_AUTOCOMMIT):
+ and not conn.autocommit):
conn.rollback()
if not recs:
View
43 psycopg2ct/tests/psycopg2_tests/__init__.py
@@ -22,28 +22,28 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
-import os
import sys
from testconfig import dsn
from testutils import unittest
-import test_bug_gc
+import test_async
import test_bugX000
-import test_extras_dictcursor
+import test_bug_gc
+import test_cancel
+import test_connection
+import test_copy
+import test_cursor
import test_dates
+import test_extras_dictcursor
+import test_green
+import test_lobject
+import test_module
+import test_notify
import test_psycopg2_dbapi20
import test_quote
-import test_connection
-import test_cursor
import test_transaction
import test_types_basic
import test_types_extras
-import test_lobject
-import test_copy
-import test_notify
-import test_async
-import test_green
-import test_cancel
def test_suite():
# If connection to test db fails, bail out early.
@@ -58,23 +58,24 @@ def test_suite():
cnn.close()
suite = unittest.TestSuite()
- suite.addTest(test_bug_gc.test_suite())
+ suite.addTest(test_async.test_suite())
suite.addTest(test_bugX000.test_suite())
- suite.addTest(test_extras_dictcursor.test_suite())
+ suite.addTest(test_bug_gc.test_suite())
+ suite.addTest(test_cancel.test_suite())
+ suite.addTest(test_connection.test_suite())
+ suite.addTest(test_copy.test_suite())
+ suite.addTest(test_cursor.test_suite())
suite.addTest(test_dates.test_suite())
+ suite.addTest(test_extras_dictcursor.test_suite())
+ suite.addTest(test_green.test_suite())
+ suite.addTest(test_lobject.test_suite())
+ suite.addTest(test_module.test_suite())
+ suite.addTest(test_notify.test_suite())
suite.addTest(test_psycopg2_dbapi20.test_suite())
suite.addTest(test_quote.test_suite())
- suite.addTest(test_connection.test_suite())
- suite.addTest(test_cursor.test_suite())
suite.addTest(test_transaction.test_suite())
suite.addTest(test_types_basic.test_suite())
suite.addTest(test_types_extras.test_suite())
- suite.addTest(test_lobject.test_suite())
- suite.addTest(test_copy.test_suite())
- suite.addTest(test_notify.test_suite())
- suite.addTest(test_async.test_suite())
- suite.addTest(test_green.test_suite())
- suite.addTest(test_cancel.test_suite())
return suite
if __name__ == '__main__':
View
29 psycopg2ct/tests/psycopg2_tests/test_connection.py
@@ -344,6 +344,16 @@ def test_isolation_level_serializable(self):
cur2.execute("select count(*) from isolevel;")
self.assertEqual(2, cur2.fetchone()[0])
+ def test_isolation_level_closed(self):
+ cnn = self.connect()
+ cnn.close()
+ self.assertRaises(psycopg2.InterfaceError, getattr,
+ cnn, 'isolation_level')
+ self.assertRaises(psycopg2.InterfaceError,
+ cnn.set_isolation_level, 0)
+ self.assertRaises(psycopg2.InterfaceError,
+ cnn.set_isolation_level, 1)
+
class ConnectionTwoPhaseTests(unittest.TestCase):
def setUp(self):
@@ -725,6 +735,12 @@ def tearDown(self):
if not self.conn.closed:
self.conn.close()
+ def test_closed(self):
+ self.conn.close()
+ self.assertRaises(psycopg2.InterfaceError,
+ self.conn.set_session,
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+
def test_not_in_transaction(self):
cur = self.conn.cursor()
cur.execute("select 1")
@@ -868,6 +884,19 @@ def tearDown(self):
if not self.conn.closed:
self.conn.close()
+ def test_closed(self):
+ self.conn.close()
+ self.assertRaises(psycopg2.InterfaceError,
+ setattr, self.conn, 'autocommit', True)
+
+ # The getter doesn't have a guard. We may change this in future
+ # to make it consistent with other methods; meanwhile let's just check
+ # it doesn't explode.
+ try:
+ self.assert_(self.conn.autocommit in (True, False))
+ except psycopg2.InterfaceError:
+ pass
+
def test_default_no_autocommit(self):
self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
View
12 psycopg2ct/tests/psycopg2_tests/test_cursor.py
@@ -97,6 +97,18 @@ def test_mogrify_decimal_explodes(self):
self.assertEqual(b('SELECT 10.3;'),
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))
+ def test_mogrify_leak_on_multiple_reference(self):
+ # issue #81: reference leak when a parameter value is referenced
+ # more than once from a dict.
+ cur = self.conn.cursor()
+ i = lambda x: x
+ foo = i('foo') * 10
+ import sys
+ nref1 = sys.getrefcount(foo)
+ cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
+ nref2 = sys.getrefcount(foo)
+ self.assertEqual(nref1, nref2)
+
def test_bad_placeholder(self):
cur = self.conn.cursor()
self.assertRaises(psycopg2.ProgrammingError,
View
12 psycopg2ct/tests/psycopg2_tests/test_dates.py
@@ -36,7 +36,7 @@ def execute(self, *args):
def test_parse_date(self):
value = self.DATE('2007-01-01', self.curs)
- self.assertNotEqual(value, None)
+ self.assert_(value is not None)
self.assertEqual(value.year, 2007)
self.assertEqual(value.month, 1)
self.assertEqual(value.day, 1)
@@ -51,7 +51,7 @@ def test_parse_incomplete_date(self):
def test_parse_time(self):
value = self.TIME('13:30:29', self.curs)
- self.assertNotEqual(value, None)
+ self.assert_(value is not None)
self.assertEqual(value.hour, 13)
self.assertEqual(value.minute, 30)
self.assertEqual(value.second, 29)
@@ -66,7 +66,7 @@ def test_parse_incomplete_time(self):
def test_parse_datetime(self):
value = self.DATETIME('2007-01-01 13:30:29', self.curs)
- self.assertNotEqual(value, None)
+ self.assert_(value is not None)
self.assertEqual(value.year, 2007)
self.assertEqual(value.month, 1)
self.assertEqual(value.day, 1)
@@ -336,15 +336,15 @@ def tearDown(self):
def test_parse_bc_date(self):
value = self.DATE('00042-01-01 BC', self.curs)
- self.assertNotEqual(value, None)
+ self.assert_(value is not None)
# mx.DateTime numbers BC dates from 0 rather than 1.
self.assertEqual(value.year, -41)
self.assertEqual(value.month, 1)
self.assertEqual(value.day, 1)
def test_parse_bc_datetime(self):
value = self.DATETIME('00042-01-01 13:30:29 BC', self.curs)
- self.assertNotEqual(value, None)
+ self.assert_(value is not None)
# mx.DateTime numbers BC dates from 0 rather than 1.
self.assertEqual(value.year, -41)
self.assertEqual(value.month, 1)
@@ -395,7 +395,7 @@ def test_parse_datetime_timezone(self):
def test_parse_interval(self):
value = self.INTERVAL('42 days 05:50:05', self.curs)
- self.assertNotEqual(value, None)
+ self.assert_(value is not None)
self.assertEqual(value.day, 42)
self.assertEqual(value.hour, 5)
self.assertEqual(value.minute, 50)
View
25 psycopg2ct/tests/psycopg2_tests/test_extras_dictcursor.py
@@ -65,6 +65,7 @@ def getter(curs):
return row
self._testWithPlainCursorReal(getter)
+
def testDictCursorWithNamedCursorFetchOne(self):
self._testWithNamedCursor(lambda curs: curs.fetchone())
@@ -80,6 +81,12 @@ def getter(curs):
return row
self._testWithNamedCursor(getter)
+ @skip_before_postgres(8, 2)
+ def testDictCursorWithNamedCursorNotGreedy(self):
+ curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
+ self._testNamedCursorNotGreedy(curs)
+
+
def testDictCursorRealWithNamedCursorFetchOne(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchone())
@@ -95,6 +102,12 @@ def getter(curs):
return row
self._testWithNamedCursorReal(getter)
+ @skip_before_postgres(8, 2)
+ def testDictCursorRealWithNamedCursorNotGreedy(self):
+ curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
+ self._testNamedCursorNotGreedy(curs)
+
+
def _testWithPlainCursor(self, getter):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
@@ -128,6 +141,18 @@ def testUpdateRow(self):
self.failUnless(row['foo'] == 'qux')
self.failUnless(row[0] == 'qux')
+ def _testNamedCursorNotGreedy(self, curs):
+ curs.itersize = 2
+ curs.execute("""select clock_timestamp() as ts from generate_series(1,3)""")
+ recs = []
+ for t in curs:
+ time.sleep(0.01)
+ recs.append(t)
+
+ # check that the dataset was not fetched in a single gulp
+ self.assert_(recs[1]['ts'] - recs[0]['ts'] < timedelta(seconds=0.005))
+ self.assert_(recs[2]['ts'] - recs[1]['ts'] > timedelta(seconds=0.0099))
+
class NamedTupleCursorTest(unittest.TestCase):
def setUp(self):
View
134 psycopg2ct/tests/psycopg2_tests/test_module.py
@@ -0,0 +1,134 @@
+#!/usr/bin/env python
+
+# test_module.py - unit test for the module interface
+#
+# Copyright (C) 2011 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+#
+# psycopg2 is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# In addition, as a special exception, the copyright holders give
+# permission to link this program with the OpenSSL library (or with
+# modified versions of OpenSSL that use the same license as OpenSSL),
+# and distribute linked combinations including the two.
+#
+# You must obey the GNU Lesser General Public License in all respects for
+# all of the code used other than OpenSSL.
+#
+# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+from testutils import unittest
+
+import psycopg2
+
+class ConnectTestCase(unittest.TestCase):
+ def setUp(self):
+ self.args = None
+ def conect_stub(dsn, connection_factory=None, async=False):
+ self.args = (dsn, connection_factory, async)
+
+ self._connect_orig = psycopg2._connect
+ psycopg2._connect = conect_stub
+
+ def tearDown(self):
+ psycopg2._connect = self._connect_orig
+
+ def test_there_has_to_be_something(self):
+ self.assertRaises(psycopg2.InterfaceError, psycopg2.connect)
+ self.assertRaises(psycopg2.InterfaceError, psycopg2.connect,
+ connection_factory=lambda dsn, async=False: None)
+ self.assertRaises(psycopg2.InterfaceError, psycopg2.connect,
+ async=True)
+
+ def test_no_keywords(self):
+ psycopg2.connect('')
+ self.assertEqual(self.args[0], '')
+ self.assertEqual(self.args[1], None)
+ self.assertEqual(self.args[2], False)
+
+ def test_dsn(self):
+ psycopg2.connect('dbname=blah x=y')
+ self.assertEqual(self.args[0], 'dbname=blah x=y')
+ self.assertEqual(self.args[1], None)
+ self.assertEqual(self.args[2], False)
+
+ def test_supported_keywords(self):
+ psycopg2.connect(database='foo')
+ self.assertEqual(self.args[0], 'dbname=foo')
+ psycopg2.connect(user='postgres')
+ self.assertEqual(self.args[0], 'user=postgres')
+ psycopg2.connect(password='secret')
+ self.assertEqual(self.args[0], 'password=secret')
+ psycopg2.connect(port=5432)
+ self.assertEqual(self.args[0], 'port=5432')
+ psycopg2.connect(sslmode='require')
+ self.assertEqual(self.args[0], 'sslmode=require')
+
+ psycopg2.connect(database='foo',
+ user='postgres', password='secret', port=5432)
+ self.assert_('dbname=foo' in self.args[0])
+ self.assert_('user=postgres' in self.args[0])
+ self.assert_('password=secret' in self.args[0])
+ self.assert_('port=5432' in self.args[0])
+ self.assertEqual(len(self.args[0].split()), 4)
+
+ def test_generic_keywords(self):
+ psycopg2.connect(foo='bar')
+ self.assertEqual(self.args[0], 'foo=bar')
+
+ def test_factory(self):
+ def f(dsn, async=False):
+ pass
+
+ psycopg2.connect(database='foo', bar='baz', connection_factory=f)
+ self.assertEqual(self.args[0], 'dbname=foo bar=baz')
+ self.assertEqual(self.args[1], f)
+ self.assertEqual(self.args[2], False)
+
+ psycopg2.connect("dbname=foo bar=baz", connection_factory=f)
+ self.assertEqual(self.args[0], 'dbname=foo bar=baz')
+ self.assertEqual(self.args[1], f)
+ self.assertEqual(self.args[2], False)
+
+ def test_async(self):
+ psycopg2.connect(database='foo', bar='baz', async=1)
+ self.assertEqual(self.args[0], 'dbname=foo bar=baz')
+ self.assertEqual(self.args[1], None)
+ self.assert_(self.args[2])
+
+ psycopg2.connect("dbname=foo bar=baz", async=True)
+ self.assertEqual(self.args[0], 'dbname=foo bar=baz')
+ self.assertEqual(self.args[1], None)
+ self.assert_(self.args[2])
+
+ def test_empty_param(self):
+ psycopg2.connect(database='sony', password='')
+ self.assertEqual(self.args[0], "dbname=sony password=''")
+
+ def test_escape(self):
+ psycopg2.connect(database='hello world')
+ self.assertEqual(self.args[0], "dbname='hello world'")
+
+ psycopg2.connect(database=r'back\slash')
+ self.assertEqual(self.args[0], r"dbname=back\\slash")
+
+ psycopg2.connect(database="quo'te")
+ self.assertEqual(self.args[0], r"dbname=quo\'te")
+
+ psycopg2.connect(database="with\ttab")
+ self.assertEqual(self.args[0], "dbname='with\ttab'")
+
+ psycopg2.connect(database=r"\every thing'")
+ self.assertEqual(self.args[0], r"dbname='\\every thing\''")
+
+
+def test_suite():
+ return unittest.TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
View
62 psycopg2ct/tests/psycopg2_tests/test_types_extras.py
@@ -596,7 +596,12 @@ def test_register_globally(self):
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], (1,2))
finally:
- del psycopg2.extensions.string_types[t.oid]
+ # drop the registered typecasters to help the refcounting
+ # script to return precise values.
+ del psycopg2.extensions.string_types[t.typecaster.values[0]]
+ if t.array_typecaster:
+ del psycopg2.extensions.string_types[
+ t.array_typecaster.values[0]]
finally:
conn1.close()
@@ -643,6 +648,61 @@ def test_composite_array(self):
self.assertEqual(v[1][1], "world")
self.assertEqual(v[1][2], date(2011,1,3))
+ @skip_if_no_composite
+ def test_wrong_schema(self):
+ oid = self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
+ from psycopg2.extras import CompositeCaster
+ c = CompositeCaster('type_ii', oid, [('a', 23), ('b', 23), ('c', 23)])
+ curs = self.conn.cursor()
+ psycopg2.extensions.register_type(c.typecaster, curs)
+ curs.execute("select (1,2)::type_ii")
+ self.assertRaises(psycopg2.DataError, curs.fetchone)
+
+ @skip_if_no_composite
+ @skip_before_postgres(8, 4)
+ def test_from_tables(self):
+ curs = self.conn.cursor()
+ curs.execute("""create table ctest1 (
+ id integer primary key,
+ temp int,
+ label varchar
+ );""")
+
+ curs.execute("""alter table ctest1 drop temp;""")
+
+ curs.execute("""create table ctest2 (
+ id serial primary key,
+ label varchar,
+ test_id integer references ctest1(id)
+ );""")
+
+ curs.execute("""insert into ctest1 (id, label) values
+ (1, 'test1'),
+ (2, 'test2');""")
+ curs.execute("""insert into ctest2 (label, test_id) values
+ ('testa', 1),
+ ('testb', 1),
+ ('testc', 2),
+ ('testd', 2);""")
+
+ psycopg2.extras.register_composite("ctest1", curs)
+ psycopg2.extras.register_composite("ctest2", curs)
+
+ curs.execute("""
+ select ctest1, array_agg(ctest2) as test2s
+ from (
+ select ctest1, ctest2
+ from ctest1 inner join ctest2 on ctest1.id = ctest2.test_id
+ order by ctest1.id, ctest2.label
+ ) x group by ctest1;""")
+
+ r = curs.fetchone()
+ self.assertEqual(r[0], (1, 'test1'))
+ self.assertEqual(r[1], [(1, 'testa', 1), (2, 'testb', 1)])
+ r = curs.fetchone()
+ self.assertEqual(r[0], (2, 'test2'))
+ self.assertEqual(r[1], [(3, 'testc', 2), (4, 'testd', 2)])
+
def _create_type(self, name, fields):
curs = self.conn.cursor()
try:

0 comments on commit e62cde2

Please sign in to comment.