Skip to content

Commit

Permalink
- add document for notes on the protocol specification
Browse files Browse the repository at this point in the history
- add debugging signal handler (nothing yet implemented)
- add events to dispatcher (i.e. responses without requests)
- make stream signed short (see protocol notes)
- refactor decoding (all Struct is inside protocol.py, yeah!)
- create EventMessage and implement event_handler contract
- testasync: make event tester work
  • Loading branch information
toppk committed Nov 20, 2019
1 parent ce5be23 commit ee84d0c
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 96 deletions.
8 changes: 8 additions & 0 deletions docs/protocol.md
@@ -0,0 +1,8 @@


Here are some thoughts on the protocol specification (currently using v4)

stream (sec 2.3) is listed as [short] which is a 2 byte unsigned value (sec 3.) , but has a maximum value of
2 ** 15 (sec 2.3) and can contain a flag value of -1 (sec 2.3 & sec 4.2.6)

How to remove registration
1 change: 1 addition & 0 deletions mkdocs.yml
Expand Up @@ -12,6 +12,7 @@ nav:
- Introduction: 'index.md'
- Developing: 'developing.md'
- Contributors: 'contributors.md'
- Protocol: 'protocol.md'
- V4 Spec: 'native_protocol_v4.spec.txt'
- V5 Spec: 'native_protocol_v5.spec.txt'

Expand Down
28 changes: 25 additions & 3 deletions pysandra/client.py
@@ -1,9 +1,11 @@
import asyncio
from typing import List, Tuple
from os import getpid
from signal import Signals
from typing import List, Optional, Tuple

from .constants import REQUEST_TIMEOUT, STARTUP_TIMEOUT, Events # noqa: F401
from .dispatcher import Dispatcher
from .exceptions import RequestTimeout, StartupTimeout
from .exceptions import RequestTimeout, StartupTimeout, TypeViolation
from .protocol import Protocol
from .types import ExpectedResponses # noqa: F401
from .utils import get_logger
Expand All @@ -13,12 +15,32 @@


class Client:
def __init__(self) -> None:
def __init__(self, debug_signal: Optional["Signals"] = None) -> None:
self._proto = V4Protocol()
self._dispatcher = Dispatcher(protocol=self._proto, **default_host())
self._is_ready = False
self._in_startup = False
self._is_ready_event = asyncio.Event()
if debug_signal is not None:
self._install_signal(debug_signal)

def _dump_state(self) -> None:
logger.debug("in signal handler")

def _install_signal(self, debug_signal: "Signals") -> None:
if isinstance(debug_signal, Signals):
signal = debug_signal
elif isinstance(signal, str):
try:
signal = Signals(debug_signal)
except ValueError:
raise TypeViolation(
f"signal={debug_signal} is not valid. Please use signal.SIG*"
)

loop = asyncio.get_event_loop()
logger.debug(f" adding debug handler at signal={signal!r} for pid={getpid()}")
loop.add_signal_handler(signal, self._dump_state)

@property
def protocol(self) -> "Protocol":
Expand Down
1 change: 1 addition & 0 deletions pysandra/constants.py
Expand Up @@ -11,6 +11,7 @@

CQL_VERSION = "3.0.0"
SERVER_SENT = 0x80
EVENT_STREAM_ID = -1


class Opcode(int, Enum):
Expand Down
35 changes: 23 additions & 12 deletions pysandra/dispatcher.py
Expand Up @@ -3,6 +3,7 @@
import traceback
from typing import Callable, Dict, Optional, Tuple, Union

from .constants import EVENT_STREAM_ID
from .exceptions import InternalDriverError, MaximumStreamsException, ServerError
from .protocol import ErrorMessage, Protocol, RequestMessage # noqa: F401
from .types import ExpectedResponses # noqa: F401
Expand Down Expand Up @@ -95,19 +96,24 @@ async def _receive(self) -> None:
assert self._reader is not None
head = await self._reader.read(9)
logger.debug(f"in _receive head={head!r}")
version, flags, stream, opcode, length = self._proto.decode_header(head)
version, flags, stream_id, opcode, length = self._proto.decode_header(head)
body = await self._reader.read(length)
request, response_handler, event = self._rm_stream_id(stream)
# exceptions are stashed here (in the wrong task)
try:
self._data[event] = response_handler(
request, version, flags, stream, opcode, length, body
if stream_id == EVENT_STREAM_ID:
await self._proto.event_handler(
version, flags, stream_id, opcode, length, body
)
except ServerError as e:
self._data[event] = e
except InternalDriverError as e:
self._data[event] = e
event.set()
else:
request, response_handler, event = self._rm_stream_id(stream_id)
# exceptions are stashed here (in the wrong task)
try:
self._data[event] = response_handler(
request, version, flags, stream_id, opcode, length, body
)
except ServerError as e:
self._data[event] = e
except InternalDriverError as e:
self._data[event] = e
event.set()

def retrieve(self, event: "asyncio.Event") -> "ExpectedResponses":
try:
Expand Down Expand Up @@ -156,8 +162,9 @@ async def close(self) -> None:


if __name__ == "__main__":
from .v4protocol import V4Protocol

client = Dispatcher()
client = Dispatcher(protocol=V4Protocol())
move = 0
while True:
move += 1
Expand All @@ -167,6 +174,10 @@ async def close(self) -> None:
print(len(client._streams))
raise e
print("got new streamid=%s" % streamid)
client._update_stream_id(
streamid,
(RequestMessage(0, 0, 0), client._proto.build_response, asyncio.Event()),
)
if (move % 19) == 0:

print("remove streamid = %s" % streamid)
Expand Down

0 comments on commit ee84d0c

Please sign in to comment.