Skip to content

Commit

Permalink
little refactoring of WriteBuffer object
Browse files Browse the repository at this point in the history
(better api and documentation)
  • Loading branch information
thefab committed Feb 22, 2015
1 parent cb2547e commit ef2547a
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 43 deletions.
2 changes: 1 addition & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_write_buffer1(self):
self.assertFalse(b.is_empty())
self.assertEquals(b._total_length, 9)
b2 = self._make_test_buffer()
b.extend(b2)
b.append(b2)
s = bytes(b)
self.assertEquals(s, b"123456789123456789")
self.assertFalse(b.is_empty())
Expand Down
2 changes: 1 addition & 1 deletion tornadis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,5 +243,5 @@ def _pipelined_call(self, pipeline, callback):
for args in pipeline.pipelined_args:
self.__callback_queue.append(cb)
tmp_buf = format_args_in_redis_protocol(*args)
buf.extend(tmp_buf)
buf.append(tmp_buf)
self.__connection.write(buf)
2 changes: 1 addition & 1 deletion tornadis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def write(self, data):
"""
self._ensure_connected()
if isinstance(data, WriteBuffer):
self._write_buffer.extend(data)
self._write_buffer.append(data)
else:
if len(data) > 0:
self._write_buffer.append(data)
157 changes: 117 additions & 40 deletions tornadis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,99 +10,176 @@
import contextlib
import collections

WRITEBUFFER_USE_MEMORY_VIEW_MIN_SIZE = 4096


class WriteBuffer(object):
"""Write buffer implementation optimized for reading by max sized chunks.
It is built on a deque and memoryviews to avoid too much string copies.
Attributes:
use_memory_view_min_size (int): minimum size before using memoryview
objects (to avoid object creation overhead bigger than string
copy for this size)
_deque (collections.deque): deque object to store each write
(without copy)
_has_view (boolean): True if there is some memoryview objects inside
the deque (if _has_view=False, there are some
"fastpath optimizations")
_total_length (int): total size (in bytes) of the buffer content
"""

def __init__(self, use_memory_view_min_size=4096):
"""Constructor.
def __init__(self):
Args:
use_memory_view_min_size (int): minimum size before using
memoryview objects (advanced option, the default is probably
good for you).
"""
self.use_memory_view_min_size = use_memory_view_min_size
self._deque = collections.deque()
self._reset()

def _reset(self):
"""Resets the object at its initial (empty) state."""
self._deque.clear()
self._total_length = 0
self._has_view = False

def __str__(self):
return self._tobytes(self._deque)
return self._tobytes()

def __bytes__(self):
return self._tobytes(self._deque)
return self._tobytes()

def _tobytes(self, iterable):
def _tobytes(self):
"""Serializes the write buffer into a single string (bytes).
Returns:
a string (bytes) object.
"""
if not self._has_view:
# fast path
return b"".join(iterable)
# fast path optimization
if len(self._deque) == 0:
return b""
elif len(self._deque) == 1:
# no copy
return self._deque[0]
else:
return b"".join(self._deque)
else:
tmp = [x.tobytes() if isinstance(x, memoryview) else x
for x in iterable]
for x in self._deque]
return b"".join(tmp)

def is_empty(self):
"""Returns True if the buffer is empty.
Returns:
True or False.
"""
return self._total_length == 0

def append(self, data, right=True):
length = len(data)
if length == 0:
return
self._total_length += length
if right:
self._deque.append(data)
else:
self._deque.appendleft(data)
def append(self, data):
"""Appends some data to end of the buffer (right).
def extend(self, write_buffer):
self._deque.extend(write_buffer._deque)
self._total_length += write_buffer._total_length
No string copy is done during this operation.
Args:
data: data to put in the buffer (can be string, memoryview or
another WriteBuffer).
"""
self._append(data, True)

def appendleft(self, data):
self.append(data, right=False)
"""Appends some data at the beginning of the buffer (left).
No string copy is done during this operation.
Args:
data: data to put in the buffer (can be string, memoryview or
another WriteBuffer).
"""
self._append(data, False)

def _append(self, data, right):
if isinstance(data, WriteBuffer):
# data is another writebuffer
if right:
self._deque.extend(data._deque)
else:
self._deque.extendleft(data._deque)
self._total_length += data._total_length
else:
if isinstance(data, memoryview):
# data is a memory viewobject
# nothing spacial but now the buffer has views
self._has_view = True
length = len(data)
if length == 0:
return
self._total_length += length
if right:
self._deque.append(data)
else:
self._deque.appendleft(data)

def get_chunk(self, chunk_max_size):
"""Gets a chunk of the given max size.
Args:
chunk_max_size (int): max size of the returned chunk
Returns:
FIXME
"""
if self._total_length < chunk_max_size:
# fastpath (the whole queue fit in a single chunk)
res = self._tobytes(self._deque)
self._deque.clear()
self._has_view = False
self._total_length = 0
res = self._tobytes()
self._reset()
return res
chunk_size = 0
tmp_list = []
chunk_write_buffer = WriteBuffer()
while True:
try:
data = self._deque.popleft()
data_length = len(data)
self._total_length -= data_length
if chunk_size == 0:
if chunk_write_buffer._total_length == 0:
# first iteration
if data_length == chunk_max_size:
# we are lucky !
return data
elif data_length > chunk_max_size:
if data_length < WRITEBUFFER_USE_MEMORY_VIEW_MIN_SIZE \
# we have enough data at first iteration
# => fast path optimization
if data_length < self.use_memory_view_min_size \
or isinstance(data, memoryview):
view = data
else:
view = memoryview(data)
self._has_view = True
self.appendleft(view[chunk_max_size:])
return view[:chunk_max_size]
else:
# not first iteration
if chunk_size + data_length > chunk_max_size:
if data_length < WRITEBUFFER_USE_MEMORY_VIEW_MIN_SIZE \
if chunk_write_buffer._total_length + data_length \
> chunk_max_size:
if data_length < self.use_memory_view_min_size \
or isinstance(data, memoryview):
view = data
else:
view = memoryview(data)
self._has_view = True
limit = chunk_max_size - chunk_size - data_length
limit = chunk_max_size - \
chunk_write_buffer._total_length - data_length
self.appendleft(view[limit:])
data = view[:limit]
tmp_list.append(data)
chunk_size += data_length
if chunk_size >= chunk_max_size:
chunk_write_buffer.append(data)
if chunk_write_buffer._total_length >= chunk_max_size:
break
except IndexError:
self.has_view = False
# the buffer is empty (so no memoryview inside)
self._has_view = False
break
return self._tobytes(tmp_list)
return chunk_write_buffer._tobytes()


def format_args_in_redis_protocol(*args):
Expand Down

0 comments on commit ef2547a

Please sign in to comment.