Skip to content

Commit

Permalink
Proper non blocking, buffered stream, finally fixes #24
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Jan 4, 2013
1 parent fdd5a5c commit deec902
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
73 changes: 71 additions & 2 deletions messytables/core.py
@@ -1,5 +1,5 @@
from messytables.util import OrderedDict
import io
import cStringIO


def seekable_stream(fileobj):
Expand All @@ -9,10 +9,79 @@ def seekable_stream(fileobj):
except:
# otherwise seek failed, so slurp in stream and wrap
# it in a BytesIO
fileobj = io.BytesIO(fileobj.read())
fileobj = BufferedFile(fileobj)
return fileobj


class BufferedFile(object):
''' A buffered file that preserves the beginning of a stream up to buffer_size
'''
def __init__(self, fp, buffer_size=1024):
self.data = cStringIO.StringIO()
self.fp = fp
self.offset = 0
self.len = 0
self.fp_offset = 0
self.buffer_size = buffer_size

def _next_line(self):
try:
return self.fp.readline()
except AttributeError:
return self.fp.next()

def _read(self, n):
return self.fp.read(n)

@property
def _buffer_full(self):
return self.len >= self.buffer_size

def readline(self):
if self.len < self.offset < self.fp_offset:
raise BufferError('Line is not available anymore')
if self.offset >= self.len:
line = self._next_line()
self.fp_offset += len(line)

self.offset += len(line)

if not self._buffer_full:
self.data.write(line)
self.len += len(line)
else:
line = self.data.readline()
self.offset += len(line)
return line

def read(self, n):
if self.len < self.offset < self.fp_offset:
raise BufferError('Data is not available anymore')
if self.offset >= self.len:
byte = self._read(n)
self.fp_offset += len(byte)

self.offset += len(byte)

if not self._buffer_full:
self.data.write(byte)
self.len += len(byte)
else:
byte = self.data.read(n)
self.offset += len(byte)
return byte

def tell(self):
return self.offset

def seek(self, offset):
if self.len < offset < self.fp_offset:
raise BufferError('Cannot seek because data is not buffered here')
self.offset = offset
if offset < self.len:
self.data.seek(offset)


class Cell(object):
""" A cell is the basic value type. It always has a ``value`` (that
may be ``None`` and may optionally also have a type and column name
Expand Down
15 changes: 15 additions & 0 deletions test/test_rowset.py
Expand Up @@ -2,6 +2,7 @@
import unittest
import StringIO
import urllib2
import requests

from nose.tools import assert_equal
from httpretty import HTTPretty
Expand Down Expand Up @@ -237,6 +238,20 @@ def test_http_csv(self):
data = list(row_set)
assert_equal(4000, len(data))

@httprettified
def test_http_csv_requests(self):
url = 'http://www.messytables.org/static/long.csv'
HTTPretty.register_uri(HTTPretty.GET, url,
body=horror_fobj('long.csv').read(),
content_type="application/csv")
r = requests.get(url, stream=True)
# no full support for non blocking version yet, use urllib2
fh = StringIO.StringIO(r.raw.read())
table_set = CSVTableSet.from_fileobj(fh, encoding='utf-8')
row_set = table_set.tables[0]
data = list(row_set)
assert_equal(4000, len(data))

@httprettified
def test_http_csv_encoding(self):
url = 'http://www.messytables.org/static/utf-16le_encoded.csv'
Expand Down

0 comments on commit deec902

Please sign in to comment.