Skip to content

Commit

Permalink
Merge e8838d9 into bed5221
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-blanchard committed Oct 1, 2018
2 parents bed5221 + e8838d9 commit 5a47f61
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 52 deletions.
12 changes: 12 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,24 @@ python:
- "3.6"
- "3.7-dev"
- "pypy"
env:
global:
- NO_MSGPACK=0

matrix:
fast_finish: true
include:
- python: '3.6'
env: NO_MSGPACK=1

install:
- travis_retry pip install --upgrade pip setuptools
- travis_retry pip install --upgrade --upgrade-strategy only-if-needed -r test-requirements.txt
- travis_retry pip install --upgrade --upgrade-strategy only-if-needed -r requirements.txt
- echo NO_MSGPACK=${NO_MSGPACK}
- echo TRAVIS_PYTHON_VERSION=${TRAVIS_PYTHON_VERSION}
- if [ ${NO_MSGPACK:0:1} == "1" ]; then travis_retry pip uninstall msgpack msgpack-python; fi
- if [ ${NO_MSGPACK:0:1} == "0" ]; then travis_retry pip install msgpack; fi
- python setup.py --version

# Run test
Expand Down
108 changes: 59 additions & 49 deletions pystorm/serializers/msgpack_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,74 @@

import io
import os
import warnings

import msgpack
try:
import msgpack
HAVE_MSGPACK = True
except:
warnings.warn('Cannot import msgpack. It is necessary for using MsgpackSerializer',
category=ImportWarning)
HAVE_MSGPACK = False

from ..exceptions import StormWentAwayError
from .serializer import Serializer


class MsgpackSerializer(Serializer):
if HAVE_MSGPACK:
class MsgpackSerializer(Serializer):

CHUNK_SIZE = 1024 ** 2
CHUNK_SIZE = 1024 ** 2

def __init__(self, input_stream, output_stream, reader_lock, writer_lock):
super(MsgpackSerializer, self).__init__(input_stream,
self._raw_stream(output_stream),
reader_lock, writer_lock)
self._messages = self._messages_generator()
def __init__(self, input_stream, output_stream, reader_lock, writer_lock):
super(MsgpackSerializer, self).__init__(input_stream,
self._raw_stream(output_stream),
reader_lock, writer_lock)
self._messages = self._messages_generator()

@staticmethod
def _raw_stream(stream):
"""Returns the raw buffer used by stream, so we can write bytes."""
if hasattr(stream, 'buffer'):
return stream.buffer
else:
return stream
@staticmethod
def _raw_stream(stream):
"""Returns the raw buffer used by stream, so we can write bytes."""
if hasattr(stream, 'buffer'):
return stream.buffer
else:
return stream

def _messages_generator(self):
unpacker = msgpack.Unpacker()
while True:
# f.read(n) on sys.stdin blocks until n bytes are read, causing
# serializer to hang.
# os.read(fileno, n) will block if there is nothing to read, but will
# return as soon as it is able to read at most n bytes.
with self._reader_lock:
try:
line = os.read(self.input_stream.fileno(), self.CHUNK_SIZE)
except io.UnsupportedOperation:
line = self.input_stream.read(self.CHUNK_SIZE)
if not line:
# Handle EOF, which usually means Storm went away
raise StormWentAwayError()
# As python-msgpack docs suggest, we feed data to the unpacker
# internal buffer in order to let the unpacker deal with message
# boundaries recognition and uncomplete messages. In case input ends
# with a partial message, unpacker raises a StopIteration and will be
# able to continue after being feeded with the rest of the message.
unpacker.feed(line)
for i in unpacker:
yield i
def _messages_generator(self):
unpacker = msgpack.Unpacker()
while True:
# f.read(n) on sys.stdin blocks until n bytes are read, causing
# serializer to hang.
# os.read(fileno, n) will block if there is nothing to read, but will
# return as soon as it is able to read at most n bytes.
with self._reader_lock:
try:
line = os.read(self.input_stream.fileno(), self.CHUNK_SIZE)
except io.UnsupportedOperation:
line = self.input_stream.read(self.CHUNK_SIZE)
if not line:
# Handle EOF, which usually means Storm went away
raise StormWentAwayError()
# As python-msgpack docs suggest, we feed data to the unpacker
# internal buffer in order to let the unpacker deal with message
# boundaries recognition and uncomplete messages. In case input ends
# with a partial message, unpacker raises a StopIteration and will be
# able to continue after being feeded with the rest of the message.
unpacker.feed(line)
for i in unpacker:
yield i

def read_message(self):
""""Messages are delimited by msgpack itself, no need for Storm
multilang end line.
"""
return next(self._messages)
def read_message(self):
""""Messages are delimited by msgpack itself, no need for Storm
multilang end line.
"""
return next(self._messages)

def serialize_dict(self, msg_dict):
""""Messages are delimited by msgpack itself, no need for Storm
multilang end line.
"""
# TODO: Determine if we need use_bin_type here
return msgpack.packb(msg_dict)
def serialize_dict(self, msg_dict):
""""Messages are delimited by msgpack itself, no need for Storm
multilang end line.
"""
# TODO: Determine if we need use_bin_type here
return msgpack.packb(msg_dict)
else:
MsgpackSerializer = Serializer
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
contextlib2; python_version < '3.2'
msgpack
setuptools
simplejson>=2.2.0
six>=1.5
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def readme():
install_requires = [
'six>=1.5',
'simplejson>=2.2.0',
'msgpack-python'
]

if sys.version_info.major < 3:
Expand Down
13 changes: 12 additions & 1 deletion test/pystorm/serializers/test_msgpack_serializer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from __future__ import absolute_import, print_function, unicode_literals

from io import BytesIO
import warnings
try:
from unittest import mock
except ImportError:
import mock

import msgpack
try:
import msgpack
HAVE_MSGPACK = True
except:
warnings.warn('Cannot import msgpack. It is necessary for using MsgpackSerializer',
category=ImportWarning)
HAVE_MSGPACK = False
import pytest

from pystorm.exceptions import StormWentAwayError
Expand All @@ -19,22 +26,26 @@ class TestMsgpackSerializer(SerializerTestCase):

INSTANCE_CLS = MsgpackSerializer

@pytest.mark.skipif(not HAVE_MSGPACK, reason='msgpack not installed')
def test_read_message_dict(self):
msg_dict = {b'hello': b"world",}
self.instance.input_stream = BytesIO(msgpack.packb(msg_dict))
assert self.instance.read_message() == msg_dict

@pytest.mark.skipif(not HAVE_MSGPACK, reason='msgpack not installed')
def test_read_message_list(self):
msg_list = [3, 4, 5]
self.instance.input_stream = BytesIO(msgpack.packb(msg_list))
assert self.instance.read_message() == msg_list

@pytest.mark.skipif(not HAVE_MSGPACK, reason='msgpack not installed')
def test_send_message(self):
msg_dict = {'hello': "world"}
expected_output = msgpack.packb(msg_dict)
self.instance.send_message(msg_dict)
assert self.instance.output_stream.getvalue() == expected_output

@pytest.mark.skipif(not HAVE_MSGPACK, reason='msgpack not installed')
def test_send_message_raises_stormwentaway(self):
bytes_io_mock = mock.MagicMock(autospec=True)
def raiser(): # lambdas can't raise
Expand Down

0 comments on commit 5a47f61

Please sign in to comment.