Skip to content

Commit

Permalink
Add lock= keyword to from_bcolz
Browse files Browse the repository at this point in the history
This allows for custom locks or no lock at all to be used.

Fixes dask#1033
  • Loading branch information
mrocklin committed Apr 22, 2016
1 parent a8d55b6 commit fee53b9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 27 deletions.
61 changes: 34 additions & 27 deletions dask/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ def from_pandas(data, npartitions=None, chunksize=None, sort=True):
return _Frame(dsk, name, data, divisions)


def from_bcolz(x, chunksize=None, categorize=True, index=None, **kwargs):
def from_bcolz(x, chunksize=None, categorize=True, index=None, lock=lock,
**kwargs):
""" Read dask Dataframe from bcolz.ctable
Parameters
Expand All @@ -423,12 +424,17 @@ def from_bcolz(x, chunksize=None, categorize=True, index=None, **kwargs):
Automatically categorize all string dtypes
index : string, optional
Column to make the index
lock: bool or Lock
Lock to use when reading or False for no lock (not-thread-safe)
See Also
--------
from_array: more generic function not optimized for bcolz
"""
if lock is True:
lock = Lock()

import dask.array as da
import bcolz

Expand Down Expand Up @@ -459,10 +465,10 @@ def from_bcolz(x, chunksize=None, categorize=True, index=None, **kwargs):
new_name = 'from_bcolz-' + token

dsk = dict(((new_name, i),
(locked_df_from_ctable,
(dataframe_from_ctable,
x,
(slice(i * chunksize, (i + 1) * chunksize),),
columns, categories))
columns, categories, lock))
for i in range(0, int(ceil(len(x) / chunksize))))

result = DataFrame(dsk, new_name, columns, divisions)
Expand All @@ -477,7 +483,7 @@ def from_bcolz(x, chunksize=None, categorize=True, index=None, **kwargs):
return result


def dataframe_from_ctable(x, slc, columns=None, categories=None):
def dataframe_from_ctable(x, slc, columns=None, categories=None, lock=lock):
""" Get DataFrame from bcolz.ctable
Parameters
Expand Down Expand Up @@ -520,29 +526,30 @@ def dataframe_from_ctable(x, slc, columns=None, categories=None):
stop = slc[0].stop if slc[0].stop < len(x) else len(x)
idx = pd.Index(range(start, stop))

if isinstance(x, bcolz.ctable):
chunks = [x[name][slc] for name in columns]
if categories is not None:
chunks = [pd.Categorical.from_codes(np.searchsorted(categories[name],
chunk),
categories[name], True)
if name in categories else chunk
for name, chunk in zip(columns, chunks)]
return pd.DataFrame(dict(zip(columns, chunks)), columns=columns,
index=idx)

elif isinstance(x, bcolz.carray):
chunk = x[slc]
if categories is not None and columns and columns in categories:
chunk = pd.Categorical.from_codes(
np.searchsorted(categories[columns], chunk),
categories[columns], True)
return pd.Series(chunk, name=columns, index=idx)


def locked_df_from_ctable(*args, **kwargs):
with lock:
result = dataframe_from_ctable(*args, **kwargs)
if lock:
lock.acquire()
try:
if isinstance(x, bcolz.ctable):
chunks = [x[name][slc] for name in columns]
if categories is not None:
chunks = [pd.Categorical.from_codes(
np.searchsorted(categories[name], chunk),
categories[name], True)
if name in categories else chunk
for name, chunk in zip(columns, chunks)]
result = pd.DataFrame(dict(zip(columns, chunks)), columns=columns,
index=idx)

elif isinstance(x, bcolz.carray):
chunk = x[slc]
if categories is not None and columns and columns in categories:
chunk = pd.Categorical.from_codes(
np.searchsorted(categories[columns], chunk),
categories[columns], True)
result = pd.Series(chunk, name=columns, index=idx)
finally:
if lock:
lock.release()
return result


Expand Down
18 changes: 18 additions & 0 deletions dask/dataframe/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import dask
from operator import getitem
import pytest
from threading import Lock
from toolz import valmap
import tempfile
import shutil
Expand Down Expand Up @@ -323,6 +324,23 @@ def test_from_bcolz():
sorted(dsk)


def test_from_bcolz_no_lock():
bcolz = pytest.importorskip('bcolz')
locktype = type(Lock())

t = bcolz.ctable([[1, 2, 3], [1., 2., 3.], ['a', 'b', 'a']],
names=['x', 'y', 'a'], chunklen=2)
a = dd.from_bcolz(t, chunksize=2)
b = dd.from_bcolz(t, chunksize=2, lock=True)
c = dd.from_bcolz(t, chunksize=2, lock=False)
eq(a, b)
eq(a, c)

assert not any(isinstance(item, locktype)
for v in c.dask.values()
for item in v)


def test_from_bcolz_filename():
bcolz = pytest.importorskip('bcolz')

Expand Down

0 comments on commit fee53b9

Please sign in to comment.