Skip to content

Commit

Permalink
bz2 no longer uses disk
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed May 10, 2015
1 parent a85ae41 commit 865ba05
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
15 changes: 8 additions & 7 deletions dask/bag/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,17 +744,18 @@ def stream_decompress(fmt, data):
if fmt == 'gz':
return gzip.GzipFile(fileobj=StringIO(data))
if fmt == 'bz2':
return bzip_stream(data)
return bz2_stream(data)
else:
return StringIO(data)


def bzip_stream(data):
with tmpfile() as fn:
with open(fn, 'wb') as f:
f.write(data)
file = bz2.BZ2File(fn)
for line in file:
def bz2_stream(compressed, chunksize=100000):
""" Stream lines from a chunk of compressed bz2 data """
decompressor = bz2.BZ2Decompressor()
for i in range(0, len(compressed), chunksize):
chunk = compressed[i: i+chunksize]
decompressed = decompressor.decompress(chunk).decode()
for line in decompressed.split('\n'):
yield line


Expand Down
10 changes: 8 additions & 2 deletions dask/bag/tests/test_bag.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import absolute_import, division, print_function

from toolz import merge, join, pipe, filter, identity, merge_with
from toolz import merge, join, pipe, filter, identity, merge_with, take
import numpy as np
from dask.bag.core import (Bag, lazify, lazify_task, fuse, map, collect,
reduceby)
reduceby, bz2_stream)
from dask.utils import filetexts
import dask
from pbag import PBag
Expand Down Expand Up @@ -321,3 +321,9 @@ def test_to_textfiles():
f.close()
finally:
shutil.rmtree('_foo')


def test_bz2_stream():
text = '\n'.join(map(str, range(10000)))
compressed = bz2.compress(text.encode())
assert list(take(100, bz2_stream(compressed))) == list(map(str, range(100)))

0 comments on commit 865ba05

Please sign in to comment.