Skip to content

Commit b0b836b

Browse files
committed
Rewrite protocol implementation
1 parent 336f7a6 commit b0b836b

File tree

11 files changed

+621
-757
lines changed

11 files changed

+621
-757
lines changed

asyncpg/__init__.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import getpass
33
import os
4-
import socket
54
import urllib.parse
65

76
from .exceptions import * # NOQA
@@ -20,18 +19,14 @@ async def connect(dsn=None, *,
2019
loop=None,
2120
timeout=60,
2221
statement_cache_size=100,
23-
**kwargs):
22+
**opts):
2423

2524
if loop is None:
2625
loop = asyncio.get_event_loop()
2726

28-
host, port, user, password, database, kwargs = _parse_connect_params(
27+
host, port, opts = _parse_connect_params(
2928
dsn=dsn, host=host, port=port, user=user, password=password,
30-
database=database, kwargs=kwargs)
31-
32-
if kwargs:
33-
raise RuntimeError(
34-
'arbitrary connection arguments are not yet supported')
29+
database=database, opts=opts)
3530

3631
last_ex = None
3732
for h in host:
@@ -42,25 +37,18 @@ async def connect(dsn=None, *,
4237
# UNIX socket name
4338
sname = os.path.join(h, '.s.PGSQL.{}'.format(port))
4439
conn = loop.create_unix_connection(
45-
lambda: protocol.Protocol(sname, connected, user,
46-
password, database, loop),
40+
lambda: protocol.Protocol(sname, connected, opts, loop),
4741
sname)
4842
else:
4943
conn = loop.create_connection(
50-
lambda: protocol.Protocol((h, port), connected, user,
51-
password, database, loop),
44+
lambda: protocol.Protocol((h, port), connected, opts, loop),
5245
h, port)
5346

5447
try:
5548
tr, pr = await asyncio.wait_for(conn, timeout=timeout, loop=loop)
5649
except (OSError, asyncio.TimeoutError) as ex:
5750
last_ex = ex
5851
else:
59-
if not unix:
60-
sock = tr.get_extra_info('socket')
61-
if hasattr(socket, 'TCP_NODELAY'):
62-
sock.setsockopt(socket.IPPROTO_TCP,
63-
socket.TCP_NODELAY, 1)
6452
break
6553
else:
6654
raise last_ex
@@ -76,7 +64,7 @@ async def connect(dsn=None, *,
7664

7765

7866
def _parse_connect_params(*, dsn, host, port, user,
79-
password, database, kwargs):
67+
password, database, opts):
8068

8169
if dsn:
8270
parsed = urllib.parse.urlparse(dsn)
@@ -140,7 +128,7 @@ def _parse_connect_params(*, dsn, host, port, user,
140128
password = val
141129

142130
if query:
143-
kwargs = {**query, **kwargs}
131+
opts = {**query, **opts}
144132

145133
# On env-var -> connection parameter conversion read here:
146134
# https://www.postgresql.org/docs/current/static/libpq-envars.html
@@ -176,7 +164,24 @@ def _parse_connect_params(*, dsn, host, port, user,
176164
if database is None:
177165
database = os.getenv('PGDATABASE')
178166

179-
return host, port, user, password, database, kwargs
167+
if user is not None:
168+
opts['user'] = user
169+
if password is not None:
170+
opts['password'] = password
171+
if database is not None:
172+
opts['database'] = database
173+
174+
for param in opts:
175+
if not isinstance(param, str):
176+
raise ValueError(
177+
'invalid connection parameter {!r} (str expected)'
178+
.format(param))
179+
if not isinstance(opts[param], str):
180+
raise ValueError(
181+
'invalid connection parameter {!r}: {!r} (str expected)'
182+
.format(param, opts[param]))
183+
184+
return host, port, opts
180185

181186

182187
def _create_future(loop):

asyncpg/connection.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,12 @@ async def close(self):
169169
return
170170
self._close_stmts()
171171
self._aborted = True
172-
self._transport.abort()
173172
await self._protocol.close()
174173

175174
def terminate(self):
176175
self._close_stmts()
177176
self._aborted = True
178-
self._transport.abort()
177+
self._protocol.abort()
179178

180179
def _get_unique_id(self):
181180
self._uid += 1

asyncpg/protocol/buffer.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ cdef class ReadBuffer:
9696
cdef inline read_cstr(self)
9797
cdef int32_t has_message(self) except -1
9898
cdef inline char* try_consume_message(self, int32_t* len)
99-
cdef consume_message(self)
99+
cdef Memory consume_message(self)
100100
cdef discard_message(self)
101101
cdef inline _discard_message(self)
102102
cdef inline char get_message_type(self)

asyncpg/protocol/buffer.pyx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ cdef class ReadBuffer:
484484
self._discard_message()
485485
return buf
486486

487-
cdef consume_message(self):
487+
cdef Memory consume_message(self):
488488
if not self._current_message_ready:
489489
raise BufferError('no message to consume')
490490
mem = self.read(self._current_message_len_unread)
@@ -500,10 +500,14 @@ cdef class ReadBuffer:
500500
raise BufferError('no message to discard')
501501

502502
if self._current_message_len_unread:
503+
IF DEBUG:
504+
mtype = chr(self._current_message_type)
505+
503506
discarded = self.consume_message()
507+
504508
IF DEBUG:
505509
print('!!! discarding message {!r} unread data: {!r}'.format(
506-
chr(self._current_message_type),
510+
mtype,
507511
(<Memory>discarded).as_bytes()))
508512

509513
self._discard_message()

asyncpg/protocol/coreproto.pxd

Lines changed: 67 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,139 +1,99 @@
11
cdef enum ConnectionStatus:
2-
CONNECTION_OK = 0
3-
CONNECTION_BAD = 1
4-
CONNECTION_STARTED = 2 # Waiting for connection to be made.
5-
CONNECTION_MADE = 3 # Connection OK; waiting to send.
6-
CONNECTION_AWAITING_RESPONSE = 4 # Waiting for a response from the
7-
# postmaster.
8-
CONNECTION_AUTH_OK = 5 # Received authentication; waiting for
9-
# backend startup.
10-
CONNECTION_SETENV = 6 # Negotiating environment.
11-
CONNECTION_SSL_STARTUP = 7 # Negotiating SSL.
12-
CONNECTION_NEEDED = 8 # Internal state: connect() needed
13-
14-
cdef enum AsyncStatus:
15-
# defines the state of the query-execution state machine
16-
PGASYNC_IDLE = 0 # nothing's happening, dude
17-
PGASYNC_BUSY = 1 # query in progress
18-
PGASYNC_READY = 2 # result ready for PQgetResult
19-
PGASYNC_COPY_IN = 3 # Copy In data transfer in progress
20-
PGASYNC_COPY_OUT = 4 # Copy Out data transfer in progress
21-
PGASYNC_COPY_BOTH = 5 # Copy In/Out data transfer in progress
22-
23-
cdef enum QueryClass:
24-
# tracks which query protocol we are now executing
25-
PGQUERY_SIMPLE = 0 # simple Query protocol (PQexec)
26-
PGQUERY_EXTENDED = 1 # full Extended protocol (PQexecParams)
27-
PGQUERY_PREPARE = 2 # Parse only (PQprepare)
28-
PGQUERY_DESCRIBE = 3 # Describe Statement or Portal
29-
PGQUERY_CLOSE = 4
30-
31-
cdef enum ExecStatusType:
32-
PGRES_EMPTY_QUERY = 0 # empty query string was executed
33-
PGRES_COMMAND_OK = 1 # a query command that doesn't return
34-
# anything was executed properly by the
35-
# backend
36-
PGRES_TUPLES_OK = 2 # a query command that returns tuples was
37-
# executed properly by the backend,
38-
# PGresult contains the result tuples
39-
PGRES_COPY_OUT = 3 # Copy Out data transfer in progress
40-
PGRES_COPY_IN = 4 # Copy In data transfer in progress
41-
PGRES_BAD_RESPONSE = 5 # an unexpected response was recv'd from
42-
# the backend
43-
PGRES_NONFATAL_ERROR = 6 # notice or warning message
44-
PGRES_FATAL_ERROR = 7 # query failed
45-
PGRES_COPY_BOTH = 8 # Copy In/Out data transfer in progress
46-
PGRES_SINGLE_TUPLE = 9 # single tuple from larger resultset
2+
CONNECTION_OK = 1
3+
CONNECTION_BAD = 2
4+
CONNECTION_STARTED = 3 # Waiting for connection to be made.
475

48-
cdef enum TransactionStatus:
49-
PQTRANS_IDLE = 0 # connection idle
50-
PQTRANS_ACTIVE = 1 # command in progress
51-
PQTRANS_INTRANS = 2 # idle, within transaction block
52-
PQTRANS_INERROR = 3 # idle, within failed transaction
53-
PQTRANS_UNKNOWN = 4 # cannot determine status
546

55-
cdef enum MessageDispatchLoop:
56-
DISPATCH_CONTINUE = 0
57-
DISPATCH_STOP = 1
58-
DISPATCH_CONTINUE_NO_DISCARD = 2
7+
cdef enum ProtocolState:
8+
PROTOCOL_IDLE = 0
599

10+
PROTOCOL_FAILED = 1
6011

61-
cdef class Result:
62-
cdef:
63-
ExecStatusType status
12+
PROTOCOL_ERROR_CONSUME = 2
13+
14+
PROTOCOL_AUTH = 10
15+
PROTOCOL_PREPARE = 11
16+
PROTOCOL_BIND_EXECUTE = 12
17+
PROTOCOL_CLOSE_STMT_PORTAL = 13
18+
PROTOCOL_SIMPLE_QUERY = 14
6419

65-
# message broken into fields
66-
dict err_fields
67-
# text of triggering query, if available
68-
str err_query
6920

70-
# cmd status from the query
71-
bytes cmd_status
21+
cdef enum ResultType:
22+
RESULT_OK = 1
23+
RESULT_FAILED = 2
7224

73-
object parameters_desc
74-
object row_desc
75-
list rows
7625

77-
@staticmethod
78-
cdef Result new(ExecStatusType status)
26+
cdef enum TransactionStatus:
27+
PQTRANS_IDLE = 0 # connection idle
28+
PQTRANS_ACTIVE = 1 # command in progress
29+
PQTRANS_INTRANS = 2 # idle, within transaction block
30+
PQTRANS_INERROR = 3 # idle, within failed transaction
31+
PQTRANS_UNKNOWN = 4 # cannot determine status
7932

8033

8134
ctypedef object (*decode_row_method)(object, const char*, int32_t)
8235

8336

8437
cdef class CoreProtocol:
8538
cdef:
86-
object transport
8739
ReadBuffer buffer
40+
bint _skip_discard
8841

89-
####### Options:
42+
ConnectionStatus con_status
43+
ProtocolState state
44+
TransactionStatus xact_status
9045

91-
str _user
92-
str _password
93-
str _dbname
94-
str _encoding
46+
str encoding
9547

96-
####### Connection State:
97-
int _backend_pid
98-
int _backend_secret
99-
100-
# result being constructed
101-
Result _result
48+
object transport
10249

103-
ConnectionStatus _status
104-
AsyncStatus _async_status
105-
QueryClass _query_class
106-
TransactionStatus _xact_status
50+
# Dict with all connection arguments
51+
dict con_args
52+
53+
int32_t backend_pid
54+
int32_t backend_secret
55+
56+
## Result
57+
ResultType result_type
58+
object result
59+
bytes result_param_desc
60+
bytes result_row_desc
61+
bytes result_status_msg
62+
63+
cdef _process__auth(self, char mtype)
64+
cdef _process__prepare(self, char mtype)
65+
cdef _process__bind_exec(self, char mtype)
66+
cdef _process__close_stmt_portal(self, char mtype)
67+
cdef _process__simple_query(self, char mtype)
68+
69+
cdef _parse_msg_authentication(self)
70+
cdef _parse_msg_parameter_status(self)
71+
cdef _parse_msg_backend_key_data(self)
72+
cdef _parse_msg_ready_for_query(self)
73+
cdef _parse_data_msgs(self)
74+
cdef _parse_msg_error_response(self, is_error)
75+
cdef _parse_msg_command_complete(self)
76+
77+
cdef _write(self, buf)
78+
cdef inline _write_sync_message(self)
10779

108-
WriteBuffer _after_sync
80+
cdef _read_server_messages(self)
10981

110-
cdef inline _write(self, WriteBuffer buf)
111-
cdef inline _write_sync_message(self)
112-
cdef inline _read_server_messages(self)
113-
cdef inline MessageDispatchLoop _dispatch_server_message(self, char mtype)
114-
cdef _parse_server_authentication(self)
115-
cdef _parse_server_parameter_status(self)
116-
cdef _parse_server_backend_key_data(self)
117-
cdef _parse_server_parameter_description(self)
118-
cdef _parse_server_ready_for_query(self)
119-
cdef _parse_server_row_description(self)
120-
cdef _parse_server_data_rows(self)
121-
cdef _parse_server_error_response(self, is_error)
122-
cdef _fatal_error(self, exc)
12382
cdef _push_result(self)
124-
cdef _sync(self)
125-
cdef _ensure_ready_state(self)
83+
cdef _reset_result(self)
84+
cdef _set_state(self, ProtocolState new_state)
85+
86+
cdef _ensure_connected(self)
12687

12788
cdef _connect(self)
128-
cdef _query(self, str query)
12989
cdef _prepare(self, str stmt_name, str query)
130-
cdef _bind(self, str portal_name, str stmt_name,
131-
WriteBuffer bind_data, int32_t limit)
90+
cdef _bind_and_execute(self, str portal_name, str stmt_name,
91+
WriteBuffer bind_data, int32_t limit)
13292
cdef _close(self, str name, bint is_portal)
93+
cdef _simple_query(self, str query)
13394

13495
cdef _decode_row(self, const char* buf, int32_t buf_len)
13596

136-
cdef _on_result(self, Result result)
137-
cdef _on_fatal_error(self, exc)
97+
cdef _on_result(self)
98+
cdef _set_server_parameter(self, name, val)
13899
cdef _on_connection_lost(self, exc)
139-
cdef _set_server_parameter(self, key, val)

0 commit comments

Comments
 (0)