-
Notifications
You must be signed in to change notification settings - Fork 1
/
psycopg2.py
52 lines (42 loc) · 1.56 KB
/
psycopg2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""Implement the Akiban dialect via an extended version of the psycopg2
driver.
"""
from __future__ import absolute_import
from .base import AkibanDialect, AkibanExecutionContext
Connection = None
class AkibanPsycopg2ExecutionContext(AkibanExecutionContext):
def set_ddl_autocommit(self, connection, value):
# this is psycopg2.autocommit:
# http://initd.org/psycopg/docs/connection.html#connection.autocommit
connection.commit()
connection.autocommit = value
class AkibanPsycopg2Dialect(AkibanDialect):
use_native_unicode = True
execution_ctx_cls = AkibanPsycopg2ExecutionContext
@classmethod
def dbapi(cls):
global Connection
from akiban.psycopg2 import Connection
return __import__("psycopg2")
def on_connect(self):
fns = []
# TODO: not sure what the unicode situation is yet
if self.dbapi and self.use_native_unicode:
from psycopg2 import extensions
def setup_unicode_extension(conn):
extensions.register_type(extensions.UNICODE, conn)
fns.append(setup_unicode_extension)
if fns:
def on_connect(conn):
for fn in fns:
fn(conn)
return on_connect
else:
return None
def create_connect_args(self, url):
opts = url.translate_connect_args(username='user')
if 'port' in opts:
opts['port'] = int(opts['port'])
opts.update(url.query)
opts['connection_factory'] = Connection
return ([], opts)