Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock when running complex query #14939

Closed
2 tasks done
kszlim opened this issue Mar 9, 2024 · 39 comments
Closed
2 tasks done

Deadlock when running complex query #14939

kszlim opened this issue Mar 9, 2024 · 39 comments
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@kszlim
Copy link
Contributor

kszlim commented Mar 9, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

couldn't get MRE, previous code was not causing it anyways

Log output

Output right before deadlock:

parquet file must be read, statistics not sufficient for predicate.
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table

Issue description

Deadlocks when running the collection in a loop many times.

Expected behavior

Doesn't deadlock

Installed versions

Python 3.11.7 (main, Dec  5 2023, 22:00:36) [GCC 7.3.1 20180712 (Red Hat 7.3.1-17)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import polars
>>> polars.show_versions()
--------Version info---------
Polars:               0.20.19
Index type:           UInt32
Platform:             Linux-5.10.213-179.855.x86_64-x86_64-with-glibc2.26
Python:               3.11.7 (main, Dec  5 2023, 22:00:36) [GCC 7.3.1 20180712 (Red Hat 7.3.1-17)]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          3.0.0
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               2024.3.1
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         1.6.0
numpy:                1.26.4
openpyxl:             <not installed>
pandas:               2.2.1
pyarrow:              15.0.2
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>```


Occurs since 0.20.3

EDIT:
I no longer thing it has anything to do with map_batches or map_elements, i've removed them from my code and it still occurs. Seems like it gets hung shortly after:

estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table

This is also a scan_parquet against the cloud with a few joins.

@kszlim kszlim added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Mar 9, 2024
@orlp
Copy link
Collaborator

orlp commented Mar 11, 2024

I'll try to make something more reproducible later if required.

A minimal working example reproducing the problem would be much appreciated.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 11, 2024

Having a hard time creating something, will keep trying, but I've found that it seemingly is caused by:

expr.map_batches(lambda x: x.to_frame().unnest(q.name).to_numpy(use_pyarrow=True))

Ie. the default use_pyarrow=True parameter is the cause, if I change it to False, the deadlocks seem to stop, it's hard for me to know for sure, as it seems to take minutes or longer to reproduce.

EDIT
Seems like that was a red herring, I still get deadlocks. Might be related to level of parallelism? (I'm using collect_all on many lazyframes)?

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

Hmm, i'm actually not sure what the issue is. I'm still experiencing a deadlock, it happens so intermittently.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

I managed to capture the verbose output for when it's deadlocking, any ideas?

parquet file must be read, statistics not sufficient for predicate.
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

Ran into another instance of it, turns out it doesn't matter what combination of:
map_batches/map_elements or use_pyarrow=True or use_pyarrow=False is, it can deadlock under either case.

parquet file must be read, statistics not sufficient for predicate.
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
dataframe filtered
dataframe filtered
group_by keys are sorted; running sorted key fast path
FOUND SORTED KEY: running default HASH AGGREGATION
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
group_by keys are sorted; running sorted key fast path
ASOF join dataframes finished
group_by keys are sorted; running sorted key fast path
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table
group_by keys are sorted; running sorted key fast path
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
dataframe filtered
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table

@kszlim kszlim changed the title Deadlock when materializing column with map_batches Deadlock when running complex query Apr 12, 2024
@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

@ritchie46 anything pop out to you? I'm also using a lot of window functions in this query if that's relevant.

@ritchie46
Copy link
Member

Do you use map_elements/ map_batches? Then I think we can deadlock because of the python GIL.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

I removed the map_* and it still deadlocks.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

Here's the obfuscated query plan:

SORT BY [col("crenulation"), col("semipurposiveness")]
  AGGREGATE
  	[] BY [col("crenulation").alias("crenulation"), col("semipurposiveness").alias("semipurposiveness")] FROM
     SELECT [col("semipurposiveness"), col("crenulation")] FROM
      DROP_NULLS by: [phase]
         WITH_COLUMNS:
         [.when(col("raob").is_between([col("mastopexy"), col("glomus")])).then(String(ungarment)).otherwise(null.strict_cast(String)).alias("crenulation")]
           SELECT [col("semipurposiveness"), col("raob"), col("mastopexy"), col("glomus")] FROM
            ASOF JOIN:
            LEFT PLAN ON: [col("raob")]
              SORT BY [col("semipurposiveness"), col("raob")]
                 WITH_COLUMNS:
                 [col("semipurposiveness").over([col("semipurposiveness")]), col("raob").over([col("semipurposiveness")])]

                    Parquet SCAN 20 files: first file: s3://my-bucket/redacted.parquet
                    PROJECT 2/215 COLUMNS
                    SELECTION: col("semipurposiveness").is_in([Series])
            RIGHT PLAN ON: [col("mastopexy")]
              SORT BY [col("mastopexy")]
                 WITH_COLUMNS:
                 [.when([(col("__POLARS_CSER_12832905341579707231")) < (col("mastopexy"))]).then(col("mastopexy")).otherwise(col("__POLARS_CSER_12832905341579707231")).alias("glomus"), [(col("glomus")) + (0.dt.duration([0, 0, 0, 10, 0, 0, 0]).dt.total_nanoseconds())].alias("__POLARS_CSER_12832905341579707231")]
                  SORT BY [col("mastopexy"), col("glomus")]
                     SELECT [col("semipurposiveness"), col("mastopexy"), col("glomus")] FROM
                      AGGREGATE
                      	[col("raob").min().alias("mastopexy"), col("raob").max().alias("glomus")] BY [col("semipurposiveness"), col("Deltaville")] FROM
                        FILTER [(col("antiwoman")) | (col("autobus"))].over([col("semipurposiveness")]) FROM

                         WITH_COLUMNS:
                         [[(col("antiwoman")) | (col("autobus"))].rle_id().over([col("semipurposiveness")]).alias("Deltaville")]
                           WITH_COLUMNS:
                           [col("semipurposiveness").over([col("semipurposiveness")]), col("raob").over([col("semipurposiveness")]), [(col("Farnese").alias("nonpersuadable").list.get([2]).alias("dabblingness").abs()) > (3.0)].over([col("semipurposiveness")]).alias("antiwoman"), [([([(col("Farnese").alias("nonpersuadable").list.get([2])) - (col("Farnese").alias("nonpersuadable").list.get([2]).alias("dabblingness").shift([1]))]) / ([([(col("raob").strict_cast(Float64)) * (1.0000e-9)]) - ([(col("raob").strict_cast(Float64)) * (1.0000e-9)].alias("semibachelor").shift([1]))])].alias("dabblingness").abs()) > (4.0)].over([col("semipurposiveness")]).alias("autobus")]

                              Parquet SCAN 20 files: first file: s3://my-bucket/redacted.parquet
                              PROJECT 3/89 COLUMNS
                              SELECTION: col("semipurposiveness").is_in([Series])
            END ASOF JOIN

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

I've tried compiling main on this commit 92902e6 and it still deadlocks, though I got a deadlock after this logging:

run PARTITIONED HASH AGGREGATION
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run PARTITIONED HASH AGGREGATION
dataframe filtered
estimated unique values: 62345
estimated unique count: 62345 exceeded the boundary: 1000, running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run PARTITIONED HASH AGGREGATION

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

Not sure if this is helpful, but it seems to help a little if I turn down the concurrency budget, though that slows down my query quite dramatically.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

I've tried running it under valgrind's helgrind, and am unable to reproduce, it probably modifies the runtime charecteristics too much.

Haven't tried https://github.com/tokio-rs/loom or https://github.com/awslabs/shuttle i'm curious if they should be integrated into testing for polars?

@kszlim
Copy link
Contributor Author

kszlim commented Apr 12, 2024

@ritchie46 i locally compiled polars and replaced all usages of OnceLock/RwLock/Mutex with tracing-mutex and it didn't panic when running, so I'm guessing the deadlocking either has something to do with the GIL (though there's no map_* methods being called) or perhaps to do with thread starvation or async deadlocking.

@nameexhaustion
Copy link
Collaborator

Can you try generating a flamegraph? We can maybe see where the threads stall using it - let it deadlock for a while before killing the PID from another terminal.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 13, 2024

Can you try generating a flamegraph? We can maybe see where the threads stall using it - let it deadlock for a while before killing the PID from another terminal.

I did try to (I tried to generate a flamegraph by attaching to the deadlocked process) but since it wasn't making any progress it didn't seem to log anything in perf.

@ritchie46
Copy link
Member

You could get a backtrace in gdb. 👀

@nameexhaustion
Copy link
Collaborator

Alright, in that case I think we can try to use gdb - if you can run this

gdb -p <PID> -ex 'thread apply all bt' -ex detach -ex quit > backtrace.txt

@kszlim
Copy link
Contributor Author

kszlim commented Apr 13, 2024

If i encounter this again I'll capture one, after upgrading to 0.20.20 I have yet to encounter a deadlock, perhaps it's been fixed by #15626 or something.

Lets keep this issue open for a while, I'll close it once I'm sure it's been fixed?

@kszlim
Copy link
Contributor Author

kszlim commented Apr 14, 2024

@nameexhaustion @ritchie46 looks like it's not fixed, had to run a 30s query over 500 times for it to deadlock, here's the backtrace.

backtrace.txt

@nameexhaustion
Copy link
Collaborator

Nice. I'm guessing a timeout for now, given I can see 4 occurences of tokio::runtime::park::Inner::park, and that you had run the query several times in succession.

Thread 162 (Thread 0x7f80111ff700 (LWP 10692)):
#0  0x00007f824e6c92a9 in syscall () from /lib64/libc.so.6
#1  0x00007f8245c90a43 in tokio::runtime::park::Inner::park () from /path/to/venv/lib/python3.11/site-packages/polars/polars.abi3.so
#2  0x00007f824499da6c in polars_io::pl_async::RuntimeManager::block_on_potential_spawn::{{closure}} () from /path/to/venv/lib/python3.11/site-packages/polars/polars.abi3.so
#3  0x00007f824499a780 in <polars_lazy::physical_plan::executors::scan::parquet::ParquetExec as polars_lazy::physical_plan::executors::executor::Executor>::execute::{{closure}} () from /path/to/venv/lib/python3.11/site-packages/polars/polars.abi3.so

Would you be able to give this a try - #15643. I have re-added a the connection timeout so I'm hoping that instead of hanging it should display an error.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 14, 2024

I gave that a try and it didn't work, still deadlocked/hung.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 15, 2024

Is there anything else i can do to help diagnose this @nameexhaustion ?

@kszlim
Copy link
Contributor Author

kszlim commented Apr 15, 2024

Does this look like it might be the problem?

#0  0x00007f4eb53162a9 in syscall () from /lib64/libc.so.6
#1  0x00007f4eabf21e82 in std::sys::pal::unix::futex::futex_wait () at library/std/src/sys/pal/unix/futex.rs:62
#2  std::sys::sync::condvar::futex::Condvar::wait_optional_timeout () at library/std/src/sys/sync/condvar/futex.rs:49
#3  std::sys::sync::condvar::futex::Condvar::wait () at library/std/src/sys/sync/condvar/futex.rs:33
#4  0x00007f4eabda9e2e in std::sync::condvar::Condvar::wait (self=0x7f4aab614c50, guard=...) at /rustc/c9f8f3438a8134a413aa5d4903e0196e44e37bbc/library/std/src/sync/condvar.rs:189
#5  tokio::runtime::park::Inner::park (self=0x7f4aab614c40) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/park.rs:116
#6  0x00007f4ea8ca2a2a in tokio::runtime::park::CachedParkThread::block_on (self=0x7f4ca4dfb65f, f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/park.rs:285
#7  0x00007f4ea8c86338 in tokio::runtime::context::blocking::BlockingRegionGuard::block_on (self=0x7f4ca4dfb6a0, f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/context/blocking.rs:66
#8  tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}} (blocking=0x7f4ca4dfb6a0) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/scheduler/multi_thread/mod.rs:87
#9  tokio::runtime::context::runtime::enter_runtime (handle=<optimized out>, allow_block_in_place=<optimized out>, f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/context/runtime.rs:65
#10 0x00007f4ea8c23bb6 in tokio::runtime::scheduler::multi_thread::MultiThread::block_on (handle=0x7f4ead912aa0 <_ZN9polars_io8pl_async7RUNTIME17h03c7e01a78cac90cE.llvm.12616630128375555060+48>, self=<optimized out>, future=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/scheduler/multi_thread/mod.rs:86
#11 tokio::runtime::runtime::Runtime::block_on (self=<optimized out>, future=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/runtime.rs:350
#12 0x00007f4ea8c63ee3 in polars_io::pl_async::RuntimeManager::block_on_potential_spawn::{{closure}} () at crates/polars-io/src/pl_async.rs:246
#13 tokio::runtime::scheduler::multi_thread::worker::block_in_place (f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/scheduler/multi_thread/worker.rs:440
#14 0x00007f4ea8d46a6a in tokio::runtime::scheduler::block_in_place::block_in_place (f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/scheduler/block_in_place.rs:20
#15 tokio::task::blocking::block_in_place (f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/task/blocking.rs:78
#16 polars_io::pl_async::RuntimeManager::block_on_potential_spawn (self=0xfffffffffffffe00, future=<error reading variable: access outside bounds of object referenced via synthetic pointer>) at crates/polars-io/src/pl_async.rs:246
#17 polars_lazy::physical_plan::executors::scan::parquet::ParquetExec::read (self=0x7f4aab603c00) at crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs:363
#18 0x00007f4ea8cd62db in <polars_lazy::physical_plan::executors::scan::parquet::ParquetExec as polars_lazy::physical_plan::executors::executor::Executor>::execute::{{closure}} () at crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs:393
#19 polars_lazy::physical_plan::state::ExecutionState::record (self=<optimized out>, func=..., name=...) at crates/polars-lazy/src/physical_plan/state.rs:120
#20 0x00007f4ea8d47910 in <polars_lazy::physical_plan::executors::scan::parquet::ParquetExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab603c00, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs:393
#21 0x00007f4ea8d0e086 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4c51070190, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/stack.rs:78
#22 0x00007f4ea8d0e086 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4bbf26f5a0, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/stack.rs:78
#23 0x00007f4ea8d0d688 in <polars_lazy::physical_plan::executors::filter::FilterExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab614bd0, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/filter.rs:74
#24 0x00007f4ea8c90c90 in <polars_lazy::physical_plan::executors::group_by_partitioned::PartitionGroupByExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab68c780, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs:354
#25 0x00007f4ea8ce1fa7 in polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple::execute_impl (self=0x7f4aab63f260, state=0x408, columns=...) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:17
#26 0x00007f4ea8ce213e in <polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab63f260, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:42
#27 0x00007f4ea8cea4e9 in <polars_lazy::physical_plan::executors::sort::SortExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab6773c0, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/sort.rs:48
#28 0x00007f4ea8d0e086 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4ab2a723b0, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/stack.rs:78
#29 0x00007f4ea8cea4e9 in <polars_lazy::physical_plan::executors::sort::SortExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab677300, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/sort.rs:48
#30 0x00007f4ea8aea828 in <polars_lazy::physical_plan::executors::join::JoinExec as polars_lazy::physical_plan::executors::executor::Executor>::execute::{{closure}} () at crates/polars-lazy/src/physical_plan/executors/join.rs:64
#31 rayon_core::join::join::call::{{closure}} () at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:102
#32 rayon_core::join::join_context::call_b::{{closure}} (migrated=<optimized out>) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:129
#33 rayon_core::job::StackJob<L,F,R>::run_inline (self=..., stolen=<optimized out>) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/job.rs:102
#34 0x00007f4ea8bcd7f1 in rayon_core::join::join_context::{{closure}} (worker_thread=0x7f4ca4dfea80, injected=false) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:159
#35 0x00007f4ea8bd92e1 in rayon_core::registry::in_worker (op=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/registry.rs:951
#36 0x00007f4ea8bd66e4 in rayon_core::join::join_context (oper_a=..., oper_b=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:132
#37 rayon_core::join::join (oper_a=..., oper_b=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:105
#38 rayon_core::thread_pool::ThreadPool::join::{{closure}} () at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/thread_pool/mod.rs:280
#39 rayon_core::thread_pool::ThreadPool::install::{{closure}} () at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/thread_pool/mod.rs:147
#40 rayon_core::registry::Registry::in_worker (self=<optimized out>, op=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/registry.rs:502
#41 0x00007f4ea8ce7e4b in rayon_core::thread_pool::ThreadPool::install (self=0xfffffffffffffe00, op=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/thread_pool/mod.rs:147
#42 rayon_core::thread_pool::ThreadPool::join (self=0xfffffffffffffe00, oper_a=..., oper_b=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/thread_pool/mod.rs:280
#43 <polars_lazy::physical_plan::executors::join::JoinExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab6311c0, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/join.rs:57
#44 0x00007f4ea8ce1fa7 in polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple::execute_impl (self=0x7f4cde83b3c0, state=0x408, columns=...) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:17
#45 0x00007f4ea8ce213e in <polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4cde83b3c0, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:42
#46 0x00007f4ea8d0e086 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4c6fa713c0, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/stack.rs:78
#47 0x00007f4ea8d1ce07 in <polars_lazy::physical_plan::executors::udf::UdfExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab670600, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/udf.rs:17
#48 0x00007f4ea8ce1fa7 in polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple::execute_impl (self=0x7f4bfec3d780, state=0x408, columns=...) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:17
#49 0x00007f4ea8ce213e in <polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4bfec3d780, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:42
#50 0x00007f4ea8c90c90 in <polars_lazy::physical_plan::executors::group_by_partitioned::PartitionGroupByExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab68c5a0, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs:354
#51 0x00007f4ea8cea4e9 in <polars_lazy::physical_plan::executors::sort::SortExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab677240, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/sort.rs:48

@nameexhaustion
Copy link
Collaborator

Does this look like it might be the problem?

I think the backtrace is telling us we are stuck on an async future, but we need to try and confirm this.

I have updated #15643 to set the timeout to 3 minutes instead of 7, and also set the non-connect timeout - please give it another try.

Please also try using this function instead of directly calling collect_all, it needs to be given a heartbeat_lf (just use a pl.scan_parquet(...) of the same files):

collect_all_with_timeout_and_retry
def collect_all_with_timeout_and_retry(
    lfs: list[pl.LazyFrame],
    heartbeat_lf: pl.LazyFrame,
    *,
    timeout_secs=30 + int(os.getenv("POLARS_ASYNC_TIMEOUT", "3")) * 60,
    heartbeat_interval_secs=10,
) -> list[pl.LazyFrame]:
    import warnings
    import asyncio
    from time import perf_counter

    print(
        f"collect_all_with_timeout_and_retry {timeout_secs = } {heartbeat_interval_secs = }"
    )

    is_complete = asyncio.Event()

    async def collect_loop():
        excs = []

        for i in range(1, 4):
            if i == 3:
                raise Exception(f"collect_loop failed after {i} tries ({excs = })")
            task = pl.collect_all_async(lfs)
            try:
                out = await asyncio.wait_for(task, timeout=timeout_secs)
            except Exception as e:
                excs.append(e)
                continue
            break

        if i > 1:
            raise Exception(
                f"collect_loop finished, but required {i} tries ({excs = })"
            )

        return out

    async def heartbeat_loop():
        while not is_complete.is_set():
            start = perf_counter()
            task = heartbeat_lf.select(pl.first()).head(1).collect_async()
            try:
                await asyncio.wait_for(task, timeout=heartbeat_interval_secs)
            except Exception as e:
                raise f"heartbeat_loop timed out {e = }"

            elapsed = perf_counter() - start
            sleep = heartbeat_interval_secs - elapsed
            sleep = max(0, sleep)

            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                next(asyncio.as_completed([is_complete.wait(), asyncio.sleep(sleep)]))

    async def f():
        task = asyncio.create_task(heartbeat_loop())
        out = await collect_loop()
        is_complete.set()
        await task

        return out

    return asyncio.run(f())

@kszlim
Copy link
Contributor Author

kszlim commented Apr 16, 2024

@ritchie46 @nameexhaustion

Is it possible:
https://github.com/pola-rs/polars/blob/main/crates/polars-lazy/src/physical_plan/expressions/window.rs#L466
Needs to be changed to:

        // Try to get cached grouptuples
        let (mut groups, _, cache_key) = if state.cache_window() {
            let mut cache_key = String::with_capacity(32 * group_by_columns.len());
            write!(&mut cache_key, "{}", state.branch_idx).unwrap();
            for s in &group_by_columns {
                cache_key.push_str(s.name());
            }

            let mut gt_map = state.group_tuples.write().unwrap();
            // we run sequential and partitioned
            // and every partition run the cache should be empty so we expect a max of 1.
            debug_assert!(gt_map.len() <= 1);
            if let Some(gt) = gt_map.get_mut(&cache_key) {
                if df.height() > 0 {
                    assert!(!gt.is_empty());
                };

                // We take now, but it is important that we set this before we return!
                // a next windows function may get this cached key and get an empty if this
                // does not happen
                (std::mem::take(gt), true, cache_key)
            } else {
                drop(gt_map); # Added a drop to the guard here
                (create_groups()?, false, cache_key)
            }

Specifically dropping the guard in the else case. Just shooting in the dark.

Seems to prevent me from deadlocking (though I'm not 100% sure if it's working or not or just getting lucky). I am running a lot of window expressions, so it's possible it's a very tough deadlock to trigger that most people wouldn't encounter.

@nameexhaustion
I couldn't get your collect_all_with_timeout_and_retry script working, i'll dig into it more if this proves to not be the solution.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 16, 2024

Okay nvm that didn't work.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 16, 2024

@nameexhaustion gave your change a try, it didn't seem to timeout after deadlocking for 5m+ wasn't sure about the python code you gave me, it didn't work, tried to tweak it, but wasn't sure what i was looking for.

@nameexhaustion
Copy link
Collaborator

The python function was to:

  • If your collect_all took more than 3 1/2 minutes, cancel it and try again. After the 2nd try print whether or not it succeeded and then exit.
  • During your collect_all, we periodically download 1 row of 1 column every 10 seconds, if this didn't finish within 10 seconds we would also print and error and exit.

Did you manage to get any output from it at all? You may have to ensure you don't have any map_elements / anything that would lock the GIL.

@ritchie46
Copy link
Member

@kszlim that's a good find. We should always drop mutexes before we spawn work with rayon as we don't want to hold the mutex when pushing other tasks on the work scheduler. This snippet in the window functions was indeed holding a rwlock when parallelizing work with rayon. Will issue a PR.

@kszlim
Copy link
Contributor Author

kszlim commented Apr 16, 2024

The python function was to:

  • If your collect_all took more than 3 1/2 minutes, cancel it and try again. After the 2nd try print whether or not it succeeded and then exit.
  • During your collect_all, we periodically download 1 row of 1 column every 10 seconds, if this didn't finish within 10 seconds we would also print and error and exit.

Did you manage to get any output from it at all? You may have to ensure you don't have any map_elements / anything that would lock the GIL.

When running with just a collect_all using your fork, it would just deadlock the same as before. I'll try to run your collect_all_with_timeout_and_retry sometime.

@kszlim
Copy link
Contributor Author

kszlim commented May 29, 2024

Still getting deadlock on 0.20.30, but this might be related Series.to_numpy when used in map_batches, when I set use_pyarrow=True it seems to no longer deadlock (but instead will potentially rarely segfault).

I unfortunately don't have code to reproduce this, but I have some backtraces captured by gdb.

latest_backtrace.txt
latest_backtrace2.txt

Not sure if these are helpful.

@stinodego this is related to what we discussed.

@kszlim
Copy link
Contributor Author

kszlim commented May 30, 2024

Is it possible there's some sort of wrong assumption being made about the pyo3/GIL that's causing the deadlock?

@kszlim
Copy link
Contributor Author

kszlim commented May 31, 2024

I wonder if this is the issue?

Thread 291 (Thread 0x7f7badffe700 (LWP 52886)):
#0  0x00007f7feebb32a9 in syscall () from /lib64/libc.so.6
#1  0x00007f7fe08230a3 in tokio::runtime::park::Inner::park ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#2  0x00007f7fdf3f970c in polars_io::pl_async::RuntimeManager::block_on_potential_spawn::{{closure}} ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#3  0x00007f7fdf3f5e4e in <polars_lazy::physical_plan::executors::scan::parquet::ParquetExec as polars_lazy::physical_plan::executors::executor::Executor>::execute::{{closure}} ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#4  0x00007f7fdf3f549b in <polars_lazy::physical_plan::executors::scan::parquet::ParquetExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#5  0x00007f7fdf2a5490 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#6  0x00007f7fdf371fa0 in <polars_lazy::physical_plan::executors::sort::SortExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#7  0x00007f7fdf35c82c in rayon_core::join::join_context::{{closure}} ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#8  0x00007f7fdf2b777b in <polars_lazy::physical_plan::executors::join::JoinExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#9  0x00007f7fdf37670e in <polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#10 0x00007f7fdf2a5490 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#11 0x00007f7fdf4397e3 in <polars_lazy::physical_plan::executors::filter::FilterExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#12 0x00007f7fdf37670e in <polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#13 0x00007f7fdf2a5490 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#14 0x00007f7fdf36b1d1 in <polars_lazy::physical_plan::executors::group_by::GroupByExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#15 0x00007f7fdf371fa0 in <polars_lazy::physical_plan::executors::sort::SortExec as polars_lazy::physical_plan::executors::executor::Executor>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#16 0x00007f7fdf2947c6 in polars_lazy::frame::LazyFrame::collect ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#17 0x00007f7fdd82ea47 in rayon::iter::plumbing::bridge_producer_consumer::helper ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#18 0x00007f7fdd341420 in rayon_core::join::join_context::{{closure}} ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#19 0x00007f7fdd82ed2b in rayon::iter::plumbing::bridge_producer_consumer::helper ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#20 0x00007f7fddb4c8b3 in <rayon_core::job::StackJob<L,F,R> as rayon_core::job::Job>::execute ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#21 0x00007f7fdd2ea62e in rayon_core::registry::WorkerThread::wait_until_cold ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#22 0x00007f7fe05c4507 in std::sys_common::backtrace::__rust_begin_short_backtrace ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#23 0x00007f7fe05c42db in core::ops::function::FnOnce::call_once{{vtable-shim}} ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#24 0x00007f7fe081bdeb in std::sys::pal::unix::thread::Thread::new::thread_start ()
   from /path/to/project/.venv/my-project-name/lib/python3.11/site-packages/polars/polars.abi3.so
#25 0x00007f7fef5c444b in start_thread () from /lib64/libpthread.so.0
#26 0x00007f7feebb852f in clone () from /lib64/libc.so.6

iirc if you run blocking tasks on the tokio runtime at all, it can cause a deadlock?

@kszlim
Copy link
Contributor Author

kszlim commented Jul 8, 2024

Still no MRE, but i've noticed that streaming=True will prevent this deadlock from happening. This is still occurring on polars 1.1.0.

@kszlim
Copy link
Contributor Author

kszlim commented Jul 8, 2024

Seems to also occur a lot more readily when doing something like:

from itertools import batched
import psutil

all_ids = [...] # list containing thousands of ids
results = []
for ids in batched(all_ids, max(len(all_ids) // psutil.cpu_count(), 1)):
    ldf = query_function(partition_on=ids)
    results.append(ldf)

dfs = pl.collect_all(results)
pl.concat(dfs)

vs

from itertools import batched
import psutil

all_ids = [...] # list containing thousands of ids
results = []
for ids in batched(all_ids, max(len(all_ids) // psutil.cpu_count(), 1)):
    ldf = query_function(partition_on=ids)
    results.append(ldf.collect().lazy())

dfs = pl.collect_all(results)
pl.concat(dfs)

I'm guessing the collect_all while running everything in parallel partitioned on those ids creates a lot more contention on some sort of lock somewhere and makes it much more likely to deadlock.

@kszlim
Copy link
Contributor Author

kszlim commented Jul 13, 2024

@nameexhaustion i'm starting to get convinced that this is caused somewhere here https://github.com/pola-rs/polars/blob/main/crates/polars-io/src/parquet/read/read_impl.rs#L734 i notice the oneshot channel in my backtrace when using rust-gdb and it seems we're stuck on a futex and getting starved of threads or something.

@kszlim
Copy link
Contributor Author

kszlim commented Aug 6, 2024

Okay, I've simplified my code to:

import os
os.environ["POLARS_CONCURRENCY_BUDGET"] = "1500"

import polars as pl
import psutil
import polars.selectors as cs

from collections.abc import Generator, Iterable
from itertools import islice
from typing import TypeVar

T = TypeVar("T")

def batched(iterable: Iterable[T], n: int) -> Generator[tuple[T, ...], None, None]:
    """Yield successive n-sized chunks from iterable."""
    if n < 1:
        msg = "n must be at least one"
        raise ValueError(msg)
    it = iter(iterable)
    while batch := tuple(islice(it, n)):
        yield batch

def get_ldf():
    url = f"s3://my_s3_bucket/table_name/*/*.parquet"
    ldf = pl.scan_parquet(
        url,
        retries=10,  # TODO: Move a bunch of this stuff into proper config
        hive_partitioning=True,
    )
    return ldf

base_ldf = get_ldf()
NUM_IDS = 2000
IDS = list(range(NUM_IDS))

ID_COL = 'some_hive_partition_id'

for i in range(100):
    ldfs = []
    for batch in batched(IDS, NUM_IDS // psutil.cpu_count()):
        ldf = base_ldf.filter(pl.col(ID_COL).is_in(batch))
        ldf = ldf.group_by(ID_COL).agg(cs.float().mean().name.suffix("_mean"), cs.float().median().name.suffix("_median"), cs.float().min().name.suffix("_min"), cs.float().max().name.suffix("_max"), cs.float().std().name.suffix("_std"))
        ldfs.append(ldf)
    print(f"On iteration: {i}")
    dfs = pl.collect_all(ldfs)
    df = pl.concat(dfs)
    print(df)

and I get this backtrace:
backtrace.txt

@nameexhaustion @ritchie46 I'm not sure, but maybe this is enough for you guys to track down the issue? It's not a full repro, but you can see there's nothing exotic being done in the query.

My polars version is 1.4.1.

@kszlim
Copy link
Contributor Author

kszlim commented Aug 7, 2024

Perhaps there needs to be a cleaner delineation between sync and async code? I think the block_on_potential_spawn might be the issue, and io might need to be 100% async driven and completely decoupled from rayon/sync blocking work? Would be a largish overhaul though.

@kszlim
Copy link
Contributor Author

kszlim commented Aug 7, 2024

Closing this in lieu of #18086

@kszlim kszlim closed this as completed Aug 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

4 participants