BUG: GH11786 Thread safety issue with read_csv #11790

Closed
wants to merge 1 commit into
from

Conversation

Projects
None yet
4 participants
Contributor

jdeschenes commented Dec 7, 2015

closes #11786

Fixed an issue with thread safety when calling read_csv with a StringIO object.

The issue was caused by a misplaced PyGilSate_Ensure()

Contributor

jreback commented Dec 7, 2015

@jdeschenes gr8! can you add in the example from the issue as a smoke test. (e.g. just have it run), then read in with a single trhead and compare.

and pls add a release note when you are satisified.

@jreback jreback added Bug CSV labels Dec 7, 2015

jreback added this to the 0.18.0 milestone Dec 7, 2015

Contributor

jdeschenes commented Dec 7, 2015

Alright, I did this quickly as I don't have time to work on this right now. How long until next release?

Contributor

jreback commented Dec 7, 2015

@jdeschenes oh have a while.....when you have a chance...thanks!

Contributor

jreback commented Dec 26, 2015

@jdeschenes if you can have a look at this again would be great.

Contributor

jdeschenes commented Jan 4, 2016

ping! @jreback

@jreback jreback commented on an outdated diff Jan 5, 2016

pandas/tests/test_frame.py
@@ -16064,6 +16064,18 @@ def bar(self):
with tm.assertRaisesRegexp(AttributeError, '.*i_dont_exist.*'):
A().bar
+ def test_multithread_stringio_read_csv(self):
+ from io import BytesIO
+ from multiprocessing.pool import ThreadPool
+
+ bytes = ['\n'.join(['%d,%d,%d' % (i, i, i) for i in range(10000)]).encode()
+ for j in range(100)]
+ files = [BytesIO(b) for b in bytes]
+
+ # Read all files in many threads
+ pool = ThreadPool(8)
@jreback

jreback Jan 5, 2016

Contributor

assert that the read in values match a single threaded reader. (e.g. compare frames)

@jreback jreback and 1 other commented on an outdated diff Jan 5, 2016

pandas/tests/test_frame.py
@@ -16064,6 +16064,18 @@ def bar(self):
with tm.assertRaisesRegexp(AttributeError, '.*i_dont_exist.*'):
A().bar
+ def test_multithread_stringio_read_csv(self):
+ from io import BytesIO
+ from multiprocessing.pool import ThreadPool
+
+ bytes = ['\n'.join(['%d,%d,%d' % (i, i, i) for i in range(10000)]).encode()
+ for j in range(100)]
+ files = [BytesIO(b) for b in bytes]
+
+ # Read all files in many threads
+ pool = ThreadPool(8)
+ pool.map(pd.read_csv, files)
@jreback

jreback Jan 5, 2016

Contributor

is this restricted to only a BytesIO? I think we need a test of this using actual files. e.g. create a frame, write to out, then read it from the files.

@jdeschenes

jdeschenes Jan 6, 2016

Contributor

It only applies to BytesIO, the function is called only when you are not passing a path. It requires a 'file-like' object.

@jreback jreback commented on an outdated diff Jan 5, 2016

pandas/tests/test_frame.py
@@ -16064,6 +16064,18 @@ def bar(self):
with tm.assertRaisesRegexp(AttributeError, '.*i_dont_exist.*'):
A().bar
+ def test_multithread_stringio_read_csv(self):
+ from io import BytesIO
+ from multiprocessing.pool import ThreadPool
+
@jreback

jreback Jan 5, 2016

Contributor

move this to pandas/io/tests/test_parsers.py.

@jreback jreback and 1 other commented on an outdated diff Jan 5, 2016

doc/source/whatsnew/v0.18.0.txt
@@ -355,7 +355,7 @@ Bug Fixes
- Regression in ``.clip`` with tz-aware datetimes (:issue:`11838`)
- Bug in ``date_range`` when the boundaries fell on the frequency (:issue:`11804`)
- Bug in consistency of passing nested dicts to ``.groupby(...).agg(...)`` (:issue:`9052`)
-
+- Bug in ``read_csv`` when reading from a StringIO in threads (:issue:`11790`)
@jreback

jreback Jan 5, 2016

Contributor

this is a bug in read_csv when reading using multiple threads (not restricted to StringIO)

@jdeschenes

jdeschenes Jan 6, 2016

Contributor

It only applies if you pass in an object that has a 'read' attribute(BytesIO)

mrocklin commented Jan 5, 2016

Thank you both for keeping up on this.

Contributor

jreback commented Jan 6, 2016

@jdeschenes IIRC this issue is repro with actual files. Is that not the case? is it only StringIO/BytesIO. are they not thread-safe?

Contributor

jreback commented Jan 11, 2016

@jdeschenes can you respond to my comments.

mrocklin referenced this pull request in dask/dask Jan 12, 2016

Closed

Fatal error when running read_csv #841

Contributor

jdeschenes commented Jan 19, 2016

Hi @jreback,

the issue is solely reproducible with StringIO. The root cause of this bug is in function buffer_rd_bytes in
pandas/src/parser/io.c. This function is only used when a StringIO/BytesIO is passed to the read_csv function.

The function was calling Py_XDECREF before ensuring that the thread had the GIL. This behavior could not be seen before since the GIL was always locked throughout the read_csv function call.

I am not aware of any issues when reading from disk and this pull request will not fix any problem related to this.

I think that the release notes should be kept as is.

Let me know what you think.

Contributor

jreback commented Jan 19, 2016

ok, can you add a test that validates the issue that reading from a disk with multiple threads is ok (so we don't regress).

@jreback jreback commented on an outdated diff Jan 19, 2016

doc/source/whatsnew/v0.18.0.txt
@@ -476,11 +476,7 @@ Bug Fixes
- Regression in ``.clip`` with tz-aware datetimes (:issue:`11838`)
- Bug in ``date_range`` when the boundaries fell on the frequency (:issue:`11804`)
- Bug in consistency of passing nested dicts to ``.groupby(...).agg(...)`` (:issue:`9052`)
-- Accept unicode in ``Timedelta`` constructor (:issue:`11995`)
-- Bug in value label reading for ``StataReader`` when reading incrementally (:issue:`12014`)
-- Bug in vectorized ``DateOffset`` when ``n`` parameter is ``0`` (:issue:`11370`)
-- Compat for numpy 1.11 w.r.t. ``NaT`` comparison changes (:issue:`12049`)
-
+- Bug in ``read_csv`` when reading from a StringIO in threads (:issue:`11790`)
@jreback

jreback Jan 19, 2016

Contributor

use double backticks around StringIO

@jreback jreback commented on an outdated diff Jan 19, 2016

pandas/io/tests/test_parsers.py
+ results = pool.map(pd.read_csv, files)
+ first_result = results[0]
+
+ for result in results:
+ tm.assert_frame_equal(first_result, result)
+
+ def test_multithread_path_read_csv(self):
+ df = DataFrame(np.random.rand(5000, 20), columns=map(str, xrange(20)))
+ num_files = 20
+ with tm.ensure_clean('__passing_str_as_dtype__.csv') as path:
+ df.to_csv(path, index=False)
+ pool = ThreadPool(4)
+ read_dataframes = pool.map(pd.read_csv, [path]*num_files)
+
+ for single_dataframe in read_dataframes:
+ tm.assert_frame_equal(df, single_dataframe, check_names=False)
@jreback

jreback Jan 19, 2016

Contributor

ahh I see you already added this. ok then.

@jreback jreback and 1 other commented on an outdated diff Jan 19, 2016

pandas/io/tests/test_parsers.py
+
+ for result in results:
+ tm.assert_frame_equal(first_result, result)
+
+ def test_multithread_path_read_csv(self):
+ df = DataFrame(np.random.rand(5000, 20), columns=map(str, xrange(20)))
+ num_files = 20
+ with tm.ensure_clean('__passing_str_as_dtype__.csv') as path:
+ df.to_csv(path, index=False)
+ pool = ThreadPool(4)
+ read_dataframes = pool.map(pd.read_csv, [path]*num_files)
+
+ for single_dataframe in read_dataframes:
+ tm.assert_frame_equal(df, single_dataframe, check_names=False)
+
+
class TestMiscellaneous(tm.TestCase):
@jreback

jreback Jan 19, 2016

Contributor

actually, can you add a test where you read from a single file with multiple readers? similar to the examples in the original issue.

@jdeschenes

jdeschenes Jan 19, 2016

Contributor

Hmmm, is this ok? As per your comment above?

@jreback

jreback Jan 19, 2016

Contributor

sort of. I think this functionailty will be used to read a single file in parts (not the same file several times)

something like this:

import pandas as pd
from pandas.util import testing as tm
import numpy as np
from multiprocessing.pool import ThreadPool
pd.options.display.width=200
fn = 'test.csv'
np.random.seed(1234)

N = 1000000
df = pd.DataFrame(np.random.randn(N,5),columns=list('abcde'))
df['foo'] = 'foo'
df['bar'] = 'bar'
df['baz'] = 'baz'
df['date'] = pd.date_range('20000101 09:00:00',periods=N,freq='s')
df['int'] = np.arange(N)

df.to_csv(fn)

def f():
    return pd.read_csv(fn, index_col=0, header=0)

def g(procs=4):

    def reader(arg):
        start, nrows = arg

        # read from the start line for nrows
        if not start:
            return pd.read_csv(fn, index_col=0, header=0, nrows=nrows)

        # skip the header entirely
        return pd.read_csv(fn, index_col=0, header=None, skiprows=int(start)+1,nrows=nrows)

    # our tasks
    tasks = [ (N*i/procs, N/procs) for i in range(procs) ]
    pool = ThreadPool(processes=procs)

    # execute
    results = pool.map(reader, tasks)

    # collate the results
    # use the header from the first piece
    header = results[0].columns
    for r in results[1:]:
        r.columns = header
    return pd.concat(results)

def compare(result, expected):
    tm.assert_frame_equal(result, expected)
@jreback

jreback Jan 19, 2016

Contributor

note AFAICT this DOES work!. I just want to have this as a test for validation.

Like to test this with actual files and StringIO

@jreback jreback commented on an outdated diff Jan 19, 2016

pandas/io/tests/test_parsers.py
@@ -4128,6 +4130,34 @@ def test_bool_header_arg(self):
with tm.assertRaises(TypeError):
pd.read_fwf(StringIO(data), header=arg)
+ def test_multithread_stringio_read_csv(self):
+ max_row_range = 10000
@jreback

jreback Jan 19, 2016

Contributor

pls add the issue number as a comment (on all tests)

Contributor

jreback commented Jan 19, 2016

pls run git diff master | flake8 --diff as much PEP checking has been one on these files.

FWIW using BytesIO has actual use cases in distributed computing, it isn't just a test case.

Many parallel storage systems won't give you access to the hard disk but will instead deliver a bunch of bytes. In this case the best way I've found to use pd.read_csv is to hand it a BytesIO object.

Contributor

jreback commented Jan 19, 2016

@mrocklin oh of course. just covering the bases. I suspect people have tried multi-threading to read files as well :)

@jdeschenes @jdeschenes jdeschenes BUG: Fixed an issue with thread safety when calling read_csv with a S…
…tringIO object., #11786

The issue was caused by a misplaced PyGilSate_Ensure()
505f6a6
Contributor

jdeschenes commented Jan 19, 2016

It would be very interesting to see if there is any benefit in using a ThreadPool for reading from a BytesIO. We are spending a lot of time into the GIL, thanks to the buffer_rd_bytes function. It should probably be benchmarked.

I have a suspicion that it doesn't help at all(It might be even a net loss).

I added the test for the file read. I didn't do it for the BytesIO. The code would effectively look a lot like what I did up top... Grabbing a list of BytesIO and processing them in a ThreadPool. I can take a look at this a bit later, if that is required.

jreback closed this in 567bc5c Jan 19, 2016

Contributor

jreback commented Jan 19, 2016

@jdeschenes thanks!

certainly would take addtl benchmarks / fixes!

@jreback jreback added a commit that referenced this pull request Jan 19, 2016

@jreback jreback TST: win32 testing fix, xref #11790 e8fbabd

kayvonr commented Jan 27, 2016

Hey all - any estimate of when this will be go out in a production release? Encountering this bug very very frequently with 0.17.1, and would like to get back up to a newer version of pandas again soon

Thanks

Contributor

jreback commented Jan 27, 2016

planning on a RC in about 2 weeks, so release should be roughly mid-feb or so

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment