Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink_csv hangs when scanning multiple larger files #12918

Closed
2 tasks done
KoenD opened this issue Dec 6, 2023 · 4 comments · Fixed by #12991
Closed
2 tasks done

sink_csv hangs when scanning multiple larger files #12918

KoenD opened this issue Dec 6, 2023 · 4 comments · Fixed by #12991
Assignees
Labels
accepted Ready for implementation bug Something isn't working python Related to Python Polars

Comments

@KoenD
Copy link

KoenD commented Dec 6, 2023

Checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import polars as pl
from pathlib import Path

test_dir = Path(r'C:\Tmp\polars_sink_test')
test_dir.mkdir(parents=True, exist_ok=True)

dummy_data = list(range(100000))
dummy_df = pl.DataFrame({'a': dummy_data, 'b': dummy_data})

for i in range(10):
    dummy_df.write_csv(test_dir.joinpath(f'dummy_{i}.csv'))

df = pl.scan_csv(test_dir.joinpath('dummy_*.csv'))
df.sink_csv(test_dir.joinpath('sink.csv'))

Log output

RUN STREAMING PIPELINE
union -> parquet_sink
RefCell { value: [] }
STREAMING CHUNK SIZE: 25000 rows

Issue description

When scanning multiple (10 in this test) csv files with about 1 000 000 records each into a LazyFrame and sinking that LazyFrame to a single csv, polars hangs. With polars version 0.19.3 I do not encounter this issue, the same test script finishes within a minute.

Expected behavior

The sink_csv method should not hang, but finish within a reasonable amount of time (for this small test case).

Installed versions

--------Version info---------
Polars:               0.19.19
Index type:           UInt32
Platform:             Windows-10-10.0.19045-SP0
Python:               3.9.18 | packaged by conda-forge | (main, Aug 30 2023, 03:40:31) [MSC v.1929 64 bit (AMD64)]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          2.2.1
connectorx:           <not installed>
deltalake:            <not installed>
fsspec:               2023.6.0
gevent:               <not installed>
matplotlib:           3.7.2
numpy:                1.24.4
openpyxl:             3.1.2
pandas:               2.0.3
pyarrow:              10.0.1
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           2.0.8
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@KoenD KoenD added bug Something isn't working python Related to Python Polars labels Dec 6, 2023
@dittos
Copy link

dittos commented Dec 7, 2023

I'm experiencing the same hang. It is reproucible by just reading from large enough csv file.

import polars as pl
import os

dummy_data = list(range(100000 * 10))
dummy_df = pl.DataFrame({'a': dummy_data, 'b': dummy_data})
dummy_df.write_csv('dummy.csv')
print("written")

print("pid", os.getpid())

pl.scan_csv('dummy.csv').sink_csv('sink.csv')

Some lines are written to sink.csv but not fully.

I think both CSV reader and writer using the thread pool leads to some deadlock?

gdb stack trace shows all threads are waiting:

(gdb) thread apply all bt

Thread 10 (Thread 0x7efdca5ff700 (LWP 230209)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb657673 in rayon_core::latch::LockLatch::wait_and_reset () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#2  0x00007efdd88007c7 in rayon_core::registry::Registry::in_worker_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#3  0x00007efddb117977 in <polars_io::csv::write::BatchedWriter<std::fs::File> as polars_pipe::executors::sinks::file_sink::SinkWriter>::_write_batch () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#4  0x00007efddb123184 in std::sys_common::backtrace::__rust_begin_short_backtrace () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#5  0x00007efddb121c51 in core::ops::function::FnOnce::call_once{{vtable-sh--Type <RET> for more, q to quit, c to continue without paging--c
im}} () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#6  0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#7  <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#8  std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#9  0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#10 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 9 (Thread 0x7efdcbfff700 (LWP 230207)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb8c1184 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys_common::thread_parking::futex::Parker::park () at library/std/src/sys_common/thread_parking/futex.rs:52
#3  std::thread::park () at library/std/src/thread/mod.rs:1070
#4  0x00007efddb12533b in crossbeam_channel::context::Context::wait_until () at library/alloc/src/alloc.rs:117
#5  0x00007efddb142ade in crossbeam_channel::flavors::array::Channel<T>::send::{{closure}} () at library/alloc/src/alloc.rs:117
#6  0x00007efddb141e93 in crossbeam_channel::channel::Sender<T>::send () at library/alloc/src/alloc.rs:117
#7  0x00007efddb141865 in <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () at library/alloc/src/alloc.rs:117
#8  0x00007efddb1a6fed in <rayon_core::job::HeapJob<BODY> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#9  0x00007efdd8830fa9 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efddb658de7 in std::sys_common::backtrace::__rust_begin_short_backtrace () at library/alloc/src/alloc.rs:117
#11 0x00007efddb658ace in core::ops::function::FnOnce::call_once{{vtable-shim}} () at library/alloc/src/alloc.rs:117
#12 0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#13 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#14 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#15 0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#16 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 8 (Thread 0x7efdd05ff700 (LWP 230206)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb8c1184 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys_common::thread_parking::futex::Parker::park () at library/std/src/sys_common/thread_parking/futex.rs:52
#3  std::thread::park () at library/std/src/thread/mod.rs:1070
#4  0x00007efddb12533b in crossbeam_channel::context::Context::wait_until () at library/alloc/src/alloc.rs:117
#5  0x00007efddb142ade in crossbeam_channel::flavors::array::Channel<T>::send::{{closure}} () at library/alloc/src/alloc.rs:117
#6  0x00007efddb141e93 in crossbeam_channel::channel::Sender<T>::send () at library/alloc/src/alloc.rs:117
#7  0x00007efddb141865 in <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () at library/alloc/src/alloc.rs:117
#8  0x00007efddb1a6fed in <rayon_core::job::HeapJob<BODY> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#9  0x00007efdd8830fa9 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efddb658de7 in std::sys_common::backtrace::__rust_begin_short_backtrace () at library/alloc/src/alloc.rs:117
#11 0x00007efddb658ace in core::ops::function::FnOnce::call_once{{vtable-shim}} () at library/alloc/src/alloc.rs:117
#12 0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#13 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#14 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#15 0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#16 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 7 (Thread 0x7efdd0fff700 (LWP 230205)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb8c1184 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys_common::thread_parking::futex::Parker::park () at library/std/src/sys_common/thread_parking/futex.rs:52
#3  std::thread::park () at library/std/src/thread/mod.rs:1070
#4  0x00007efddb12533b in crossbeam_channel::context::Context::wait_until () at library/alloc/src/alloc.rs:117
#5  0x00007efddb142ade in crossbeam_channel::flavors::array::Channel<T>::send::{{closure}} () at library/alloc/src/alloc.rs:117
#6  0x00007efddb141e93 in crossbeam_channel::channel::Sender<T>::send () at library/alloc/src/alloc.rs:117
#7  0x00007efddb141865 in <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () at library/alloc/src/alloc.rs:117
#8  0x00007efddb1a6fed in <rayon_core::job::HeapJob<BODY> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#9  0x00007efdd8830fa9 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efddb658de7 in std::sys_common::backtrace::__rust_begin_short_backtrace () at library/alloc/src/alloc.rs:117
#11 0x00007efddb658ace in core::ops::function::FnOnce::call_once{{vtable-shim}} () at library/alloc/src/alloc.rs:117
#12 0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#13 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#14 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#15 0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#16 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 6 (Thread 0x7efdd15fb700 (LWP 230204)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb8c1184 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys_common::thread_parking::futex::Parker::park () at library/std/src/sys_common/thread_parking/futex.rs:52
#3  std::thread::park () at library/std/src/thread/mod.rs:1070
#4  0x00007efddb12533b in crossbeam_channel::context::Context::wait_until () at library/alloc/src/alloc.rs:117
#5  0x00007efddb142ade in crossbeam_channel::flavors::array::Channel<T>::send::{{closure}} () at library/alloc/src/alloc.rs:117
#6  0x00007efddb141e93 in crossbeam_channel::channel::Sender<T>::send () at library/alloc/src/alloc.rs:117
#7  0x00007efddb141865 in <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () at library/alloc/src/alloc.rs:117
#8  0x00007efddb1a6fed in <rayon_core::job::HeapJob<BODY> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#9  0x00007efdd8830fa9 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efddb658de7 in std::sys_common::backtrace::__rust_begin_short_backtrace () at library/alloc/src/alloc.rs:117
#11 0x00007efddb658ace in core::ops::function::FnOnce::call_once{{vtable-shim}} () at library/alloc/src/alloc.rs:117
#12 0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#13 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#14 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#15 0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#16 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 5 (Thread 0x7efdd17fc700 (LWP 230203)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb8c1184 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys_common::thread_parking::futex::Parker::park () at library/std/src/sys_common/thread_parking/futex.rs:52
#3  std::thread::park () at library/std/src/thread/mod.rs:1070
#4  0x00007efddb12533b in crossbeam_channel::context::Context::wait_until () at library/alloc/src/alloc.rs:117
#5  0x00007efddb142ade in crossbeam_channel::flavors::array::Channel<T>::send::{{closure}} () at library/alloc/src/alloc.rs:117
#6  0x00007efddb141e93 in crossbeam_channel::channel::Sender<T>::send () at library/alloc/src/alloc.rs:117
#7  0x00007efddb141865 in <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () at library/alloc/src/alloc.rs:117
#8  0x00007efddb1a6fed in <rayon_core::job::HeapJob<BODY> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#9  0x00007efdd8830fa9 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efddb658de7 in std::sys_common::backtrace::__rust_begin_short_backtrace () at library/alloc/src/alloc.rs:117
#11 0x00007efddb658ace in core::ops::function::FnOnce::call_once{{vtable-shim}} () at library/alloc/src/alloc.rs:117
#12 0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#13 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#14 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#15 0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#16 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 4 (Thread 0x7efdd19fd700 (LWP 230202)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb8c1184 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys_common::thread_parking::futex::Parker::park () at library/std/src/sys_common/thread_parking/futex.rs:52
#3  std::thread::park () at library/std/src/thread/mod.rs:1070
#4  0x00007efddb12533b in crossbeam_channel::context::Context::wait_until () at library/alloc/src/alloc.rs:117
#5  0x00007efddb142ade in crossbeam_channel::flavors::array::Channel<T>::send::{{closure}} () at library/alloc/src/alloc.rs:117
#6  0x00007efddb141e93 in crossbeam_channel::channel::Sender<T>::send () at library/alloc/src/alloc.rs:117
#7  0x00007efddb141865 in <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () at library/alloc/src/alloc.rs:117
#8  0x00007efddb1a6fed in <rayon_core::job::HeapJob<BODY> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#9  0x00007efdd8830fa9 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efddb658de7 in std::sys_common::backtrace::__rust_begin_short_backtrace () at library/alloc/src/alloc.rs:117
#11 0x00007efddb658ace in core::ops::function::FnOnce::call_once{{vtable-shim}} () at library/alloc/src/alloc.rs:117
#12 0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#13 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#14 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#15 0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#16 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 3 (Thread 0x7efdd1bfe700 (LWP 230201)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb8c1184 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys_common::thread_parking::futex::Parker::park () at library/std/src/sys_common/thread_parking/futex.rs:52
#3  std::thread::park () at library/std/src/thread/mod.rs:1070
#4  0x00007efddb12533b in crossbeam_channel::context::Context::wait_until () at library/alloc/src/alloc.rs:117
#5  0x00007efddb142ade in crossbeam_channel::flavors::array::Channel<T>::send::{{closure}} () at library/alloc/src/alloc.rs:117
#6  0x00007efddb141e93 in crossbeam_channel::channel::Sender<T>::send () at library/alloc/src/alloc.rs:117
#7  0x00007efddb141865 in <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () at library/alloc/src/alloc.rs:117
#8  0x00007efddb1a6fed in <rayon_core::job::HeapJob<BODY> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#9  0x00007efdd8830fa9 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efddb658de7 in std::sys_common::backtrace::__rust_begin_short_backtrace () at library/alloc/src/alloc.rs:117
#11 0x00007efddb658ace in core::ops::function::FnOnce::call_once{{vtable-shim}} () at library/alloc/src/alloc.rs:117
#12 0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#13 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#14 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#15 0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#16 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 2 (Thread 0x7efdd1dff700 (LWP 230200)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb8c1184 in std::sys::unix::futex::futex_wait () at library/std/src/sys/unix/futex.rs:62
#2  std::sys_common::thread_parking::futex::Parker::park () at library/std/src/sys_common/thread_parking/futex.rs:52
#3  std::thread::park () at library/std/src/thread/mod.rs:1070
#4  0x00007efddb12533b in crossbeam_channel::context::Context::wait_until () at library/alloc/src/alloc.rs:117
#5  0x00007efddb142ade in crossbeam_channel::flavors::array::Channel<T>::send::{{closure}} () at library/alloc/src/alloc.rs:117
#6  0x00007efddb141e93 in crossbeam_channel::channel::Sender<T>::send () at library/alloc/src/alloc.rs:117
#7  0x00007efddb141865 in <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () at library/alloc/src/alloc.rs:117
#8  0x00007efddb1a6fed in <rayon_core::job::HeapJob<BODY> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#9  0x00007efdd8830805 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efddb1a6404 in rayon_core::scope::scope::{{closure}} () at library/alloc/src/alloc.rs:117
#11 0x00007efddb1a7afa in <rayon_core::job::StackJob<L,F,R> as rayon_core::job::Job>::execute () at library/alloc/src/alloc.rs:117
#12 0x00007efdd8830fa9 in rayon_core::registry::WorkerThread::wait_until_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#13 0x00007efddb658de7 in std::sys_common::backtrace::__rust_begin_short_backtrace () at library/alloc/src/alloc.rs:117
#14 0x00007efddb658ace in core::ops::function::FnOnce::call_once{{vtable-shim}} () at library/alloc/src/alloc.rs:117
#15 0x00007efddb8fe926 in <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#16 <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once () at library/alloc/src/boxed.rs:2007
#17 std::sys::unix::thread::Thread::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#18 0x00007efddd0c9609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#19 0x00007efddd203133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 1 (Thread 0x7efddcf18740 (LWP 230199)):
#0  syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1  0x00007efddb657673 in rayon_core::latch::LockLatch::wait_and_reset () at library/alloc/src/alloc.rs:117
#2  0x00007efdd8802e57 in rayon_core::registry::Registry::in_worker_cold () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#3  0x00007efddb1a4492 in polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline_no_finalize () at library/alloc/src/alloc.rs:117
#4  0x00007efddb1a3d0b in polars_pipe::pipeline::dispatcher::PipeLine::run_pipeline () at library/alloc/src/alloc.rs:117
#5  0x00007efdda7f26e7 in <F as polars_plan::logical_plan::apply::DataFrameUdfMut>::call_udf () at library/alloc/src/alloc.rs:117
#6  0x00007efddb48acf1 in polars_plan::logical_plan::functions::FunctionNode::evaluate () at library/alloc/src/alloc.rs:117
#7  0x00007efdda801e91 in <polars_lazy::physical_plan::executors::udf::UdfExec as polars_lazy::physical_plan::executors::executor::Executor>::execute () at library/alloc/src/alloc.rs:117
#8  0x00007efdda91e0ba in polars_lazy::frame::LazyFrame::sink () at library/alloc/src/alloc.rs:117
#9  0x00007efdd93026b0 in polars::lazyframe::PyLazyFrame::sink_csv () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#10 0x00007efdd9312774 in polars::lazyframe::_::<impl polars::lazyframe::PyLazyFrame>::__pymethod_sink_csv__ () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#11 0x00007efdd8c91523 in pyo3::impl_::trampoline::trampoline () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#12 0x00007efdd93dc7d1 in polars::lazyframe::_::_::__INVENTORY::trampoline () from /home/ditto/.cache/pypoetry/virtualenvs/thinkingface-IAymwpp_-py3.8/lib/python3.8/site-packages/polars/polars.abi3.so
#13 0x00000000005d5499 in PyCFunction_Call ()
#14 0x00000000005d6066 in _PyObject_MakeTpCall ()
#15 0x000000000054d44a in _PyEval_EvalFrameDefault ()
#16 0x000000000054552a in _PyEval_EvalCodeWithName ()
#17 0x00000000005d5a23 in _PyFunction_Vectorcall ()
#18 0x00000000005d4c12 in PyObject_Call ()
#19 0x0000000000548a66 in _PyEval_EvalFrameDefault ()
#20 0x000000000054552a in _PyEval_EvalCodeWithName ()
#21 0x00000000005d5a23 in _PyFunction_Vectorcall ()
#22 0x00000000005d4c12 in PyObject_Call ()
#23 0x0000000000548a66 in _PyEval_EvalFrameDefault ()
#24 0x000000000054552a in _PyEval_EvalCodeWithName ()
#25 0x00000000005d5a23 in _PyFunction_Vectorcall ()
#26 0x0000000000547447 in _PyEval_EvalFrameDefault ()
#27 0x000000000054552a in _PyEval_EvalCodeWithName ()
#28 0x0000000000684327 in PyEval_EvalCode ()
#29 0x0000000000673a41 in ?? ()
#30 0x0000000000673abb in ?? ()
#31 0x0000000000673b61 in ?? ()
#32 0x00000000006747e7 in PyRun_SimpleFileExFlags ()
#33 0x00000000006b4072 in Py_RunMain ()
#34 0x00000000006b43fd in Py_BytesMain ()
#35 0x00007efddd108083 in __libc_start_main (main=0x4c4510 <main>, argc=2, argv=0x7ffe23e9e8a8, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7ffe23e9e898) at ../csu/libc-start.c:308
#36 0x00000000005da67e in _start ()

Thread 1 is the main thread.
Thread 2~9 are blocked at <polars_pipe::executors::sinks::file_sink::FilesSink as polars_pipe::operators::sink::Sink>::sink () (and my machine have 8 cores)
Thread 10 is blocked at <polars_io::csv::write::BatchedWriter<std::fs::File> as polars_pipe::executors::sinks::file_sink::SinkWriter>::_write_batch ()

@c-peters c-peters assigned c-peters and unassigned c-peters Dec 11, 2023
@c-peters c-peters added the accepted Ready for implementation label Dec 11, 2023
@ritchie46 ritchie46 self-assigned this Dec 11, 2023
@naterichman
Copy link

Was this fixed in 0.20.1? I'm still having the same issue. I can fix it by either

  1. collecting the lazy frame and then writing with write_csv instead of using sink_csv, or
  2. Writing to another format, i.e. scan_csv then sink_parquet, but scan_csv and sink_csv don't seem to work together

@cmdlineluser
Copy link
Contributor

The example still reproduces for me on 0.20.1

It works correctly with POLARS_MAX_THREADS=1 - so it stills seems to be an issue.

Perhaps @ritchie46 can confirm?

@cmdlineluser
Copy link
Contributor

@naterichman #13239 has fixed this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation bug Something isn't working python Related to Python Polars
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

6 participants