Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background execution and R process pool #311

Merged
merged 79 commits into from
Aug 9, 2023
Merged

Background execution and R process pool #311

merged 79 commits into from
Aug 9, 2023

Conversation

Sicheng-Pan
Copy link
Collaborator

This PR aims to implement an interface to create asynchronous jobs that can be executed in background threads.

Specifically, this will provide an alternative option to run the Lazy query in background without blocking the R console.

In order to run map and apply on the LazyFrame without using the existing R interpreter, new R processes have to be created in background to handle the evaluation tasks.

The user will get thread handles for the background jobs, and can check if they are done (non-blocking) or wait for them to finish (blocking).

Example:

> handle <- pl$LazyFrame(mtcars)$with_column(pl$col("mpg")$map_in_background(function(x) x * 0.43)$alias("kml"))$collect_in_background()
> handle$is_finished() |> unwrap()
[1] TRUE
> handle$join() |> unwrap()
shape: (32, 12)
┌──────┬─────┬───────┬───────┬───┬─────┬──────┬──────┬───────┐
│ mpgcyldisphp    ┆ … ┆ amgearcarbkml   │
│ ------------   ┆   ┆ ------------   │
│ f64f64f64f64   ┆   ┆ f64f64f64f64   │
╞══════╪═════╪═══════╪═══════╪═══╪═════╪══════╪══════╪═══════╡
│ 21.06.0160.0110.0 ┆ … ┆ 1.04.04.09.03  │
│ 21.06.0160.0110.0 ┆ … ┆ 1.04.04.09.03  │
│ 22.84.0108.093.0  ┆ … ┆ 1.04.01.09.804 │
│ 21.46.0258.0110.0 ┆ … ┆ 0.03.01.09.202 │
│ …    ┆ …   ┆ …     ┆ …     ┆ … ┆ …   ┆ …    ┆ …    ┆ …     │
│ 15.88.0351.0264.0 ┆ … ┆ 1.05.04.06.794 │
│ 19.76.0145.0175.0 ┆ … ┆ 1.05.06.08.471 │
│ 15.08.0301.0335.0 ┆ … ┆ 1.05.08.06.45  │
│ 21.44.0121.0109.0 ┆ … ┆ 1.04.02.09.202 │
└──────┴─────┴───────┴───────┴───┴─────┴──────┴──────┴───────┘

@sorhawell
Copy link
Collaborator

sorhawell commented Jul 5, 2023

hi @Sicheng-Pan this PR is very interesting ! :)

I'd like to hear your thoughts on the final syntax, would do you think of below?

I think $collect_in_background() is a meaningful variation of $collect(). The former does not halt the Rsession at the expense of at least one more R process is launched (extra process not need if not using map or apply). Also $collect_in_background() returns a handle, not a <DataFrame>.

however for $map_in_background() I suggest to use regular $map() and resolve if map is pushed to another R process via if $collect() or $collect_in_background() was used and/or some global polars_options.

It is also meaningful to map in another process even if not calling $collect_in_background() when several R lambdas(pure functions) could be mapped in parallel

also it could be advantageous with an option like pl$set_polars_options(r_extra_process_pool = 5), where default is 0 . If user sets this process are spun up and recycled for more jobs.

@eitsupi
Copy link
Collaborator

eitsupi commented Jul 5, 2023

Looks amazing!
Thanks for working on this!

Could you please resolve merge conflicts?

@Sicheng-Pan
Copy link
Collaborator Author

@sorhawell Since currently we do not have a global options store on the rust side, I think it is better to leave map and map_in_background as separate expressions and directly leave the choice to the users. Besides, map and map_in_background do have some differences: map can have side-effects in the main process, while map_in_background cannot have side-effects and can only support pure R functions for now.

@sorhawell
Copy link
Collaborator

before spawn processes in paralell

[1] "test 3a"
usr:   51ms sys:   15ms elp:   85ms ||  - 3a +io %bitrate %cpu foreground 
usr:   65ms sys:   86ms elp: 2054ms ||  - 3a +io %bitrate %cpu pool=8 background burn-in  
usr:   63ms sys:   87ms elp:   76ms ||  - 3a +io %bitrate %cpu pool=8 background 
usr:   58ms sys:   68ms elp:  186ms ||  - 3a +io %bitrate %cpu pool=4 background 
usr:   60ms sys:   69ms elp:  182ms ||  - 3a +io %bitrate %cpu pool=2 background 
usr:   60ms sys:   58ms elp:  325ms ||  - 3a +io %bitrate %cpu pool=1 background 
[1] "test 3b"
usr:10888ms sys:  452ms elp:11047ms ||  - 3b %io %bitrate +cpu foreground 
usr:    4ms sys:   12ms elp: 4415ms ||  - 3b %io %bitrate +cpu pool=8 background burn-in  
usr:    3ms sys:    7ms elp: 2891ms ||  - 3b %io %bitrate +cpu pool=8 background 
usr:    4ms sys:    8ms elp: 3273ms ||  - 3b %io %bitrate +cpu pool=6 background 
usr:    4ms sys:    7ms elp: 3709ms ||  - 3b %io %bitrate +cpu pool=4 background 
usr:    4ms sys:    6ms elp: 5648ms ||  - 3b %io %bitrate +cpu pool=2 background 
usr:    4ms sys:    8ms elp:11798ms ||  - 3b %io %bitrate +cpu pool=1 background 

after has lower burn-in times

[1] "test 3a - parallel"
usr:   55ms sys:   16ms elp:   96ms ||  - 3a +io %bitrate %cpu foreground 
usr:   65ms sys:   74ms elp:  713ms ||  - 3a +io %bitrate %cpu pool=8 background burn-in  
usr:   62ms sys:   85ms elp:   99ms ||  - 3a +io %bitrate %cpu pool=8 background 
usr:   70ms sys:  105ms elp:  142ms ||  - 3a +io %bitrate %cpu pool=4 background 
usr:   59ms sys:   67ms elp:  174ms ||  - 3a +io %bitrate %cpu pool=2 background 
usr:   54ms sys:   56ms elp:  321ms ||  - 3a +io %bitrate %cpu pool=1 background 
[1] "test 3b - parallel"
usr:10941ms sys:  459ms elp:11152ms ||  - 3b %io %bitrate +cpu foreground 
usr:    5ms sys:   13ms elp: 3474ms ||  - 3b %io %bitrate +cpu pool=8 background burn-in  
usr:    3ms sys:    9ms elp: 2656ms ||  - 3b %io %bitrate +cpu pool=8 background 
usr:    4ms sys:    7ms elp: 2993ms ||  - 3b %io %bitrate +cpu pool=6 background 
usr:    4ms sys:    7ms elp: 3562ms ||  - 3b %io %bitrate +cpu pool=4 background 
usr:    4ms sys:    7ms elp: 5637ms ||  - 3b %io %bitrate +cpu pool=2 background 
usr:    3ms sys:    6ms elp:11131ms ||  - 3b %io %bitrate +cpu pool=1 background 

@sorhawell
Copy link
Collaborator

sorhawell commented Aug 2, 2023

macbook pro 2015 - 4 cores - intel x64 - 16 GB RAM. rust-polars has as default n_workers = cores x 2

previous pool impl

[1] "test 1a - sequential"
usr:  257ms sys:   31ms elp:  344ms ||  - 1a +io %bitrate %cpu foreground 
usr:  126ms sys:   47ms elp:  518ms ||  - 1a +io %bitrate %cpu background1 
usr:  132ms sys:   48ms elp:  534ms ||  - 1a +io %bitrate %cpu background2 
[1] "test 1b - sequential"
usr:  196ms sys:  114ms elp:  311ms ||  - 1b -io +bitrate %cpu foreground 
usr:  277ms sys:  353ms elp: 1651ms ||  - 1b -io +bitrate %cpu background1 
usr:  285ms sys:  338ms elp: 1665ms ||  - 1b -io +bitrate %cpu background2 
[1] "test 2a - sequential"
usr: 2444ms sys:  585ms elp: 3032ms ||  - 2a %io ~bitrate +cpu foreground 
usr:    1ms sys:    5ms elp: 3003ms ||  - 2a %io ~bitrate +cpu background 
usr:    2ms sys:    8ms elp: 2927ms ||  - 2a %io ~bitrate +cpu background2 
[1] "test 3a - parallel"
usr:   69ms sys:   22ms elp:  131ms ||  - 3a +io %bitrate %cpu foreground 
usr:   68ms sys:   91ms elp: 2112ms ||  - 3a +io %bitrate %cpu pool=8 background burn-in  
usr:   65ms sys:   90ms elp:   80ms ||  - 3a +io %bitrate %cpu pool=8 background 
usr:   66ms sys:   93ms elp: 2609ms ||  - 3a +io %bitrate %cpu pool=4 background 
usr:  150ms sys:  412ms elp:114714ms ||  - 3a +io %bitrate %cpu pool=2 background 
usr:  165ms sys:  506ms elp:149871ms ||  - 3a +io %bitrate %cpu pool=1 background 
[1] "test 3b - parallel"
usr:10863ms sys:  429ms elp:11048ms ||  - 3b %io %bitrate +cpu foreground 
usr:    5ms sys:   12ms elp: 4166ms ||  - 3b %io %bitrate +cpu pool=8 background burn-in  
usr:    4ms sys:    9ms elp: 2642ms ||  - 3b %io %bitrate +cpu pool=8 background 
usr:    4ms sys:   11ms elp: 3288ms ||  - 3b %io %bitrate +cpu pool=6 background 
usr:    4ms sys:   13ms elp: 3807ms ||  - 3b %io %bitrate +cpu pool=4 background 
usr:    5ms sys:   14ms elp: 4245ms ||  - 3b %io %bitrate +cpu pool=2 background 
usr:    6ms sys:   19ms elp: 4979ms ||  - 3b %io %bitrate +cpu pool=1 background 
[1] "test 3c - parallel"
usr:10988ms sys:  715ms elp:11511ms ||  - 3c %io +bitrate +cpu foreground  
usr:  166ms sys:  132ms elp: 4276ms ||  - 3c %io +bitrate +cpu pool=8 background burn-in  
usr:  246ms sys:  123ms elp: 2798ms ||  - 3c %io +bitrate +cpu pool=8 background 
usr:  162ms sys:   95ms elp: 3393ms ||  - 3c %io +bitrate +cpu pool=6 background 
usr:  166ms sys:  108ms elp: 3810ms ||  - 3c %io +bitrate +cpu pool=4 background 
usr:  131ms sys:   90ms elp: 4257ms ||  - 3c %io +bitrate +cpu pool=2 background 
usr:  124ms sys:   97ms elp: 4929ms ||  - 3c %io +bitrate +cpu pool=1 background 
[1] "test 3d - parallel + r-polars conversion"
usr:  418ms sys:   78ms elp:  494ms ||  - 3d %io +bitrate +cpu foreground  
usr:  513ms sys: 2706ms elp: 2869ms ||  - 3d %io +bitrate +cpu pool=8 background burn-in  
usr:  619ms sys: 6368ms elp: 1548ms ||  - 3d %io +bitrate +cpu pool=8 background 
usr:  568ms sys: 2132ms elp: 2079ms ||  - 3d %io +bitrate +cpu pool=6 background 
usr:  480ms sys:  986ms elp: 4128ms ||  - 3d %io +bitrate +cpu pool=4 background 
usr:  414ms sys:  655ms elp: 6512ms ||  - 3d %io +bitrate +cpu pool=2 background 
usr:  437ms sys:  518ms elp: 7901ms ||  - 3d %io +bitrate +cpu pool=1 background 

new queued pool impl

EDIT: closed browser and reran bench mark. ~8% improvement

[1] "test 1a - sequential"
usr:  283ms sys:   35ms elp:  414ms ||  - 1a +io %bitrate %cpu foreground 
usr:  130ms sys:   50ms elp:  534ms ||  - 1a +io %bitrate %cpu background1 
usr:  132ms sys:   51ms elp:  542ms ||  - 1a +io %bitrate %cpu background2 
[1] "test 1b - sequential"
usr:  201ms sys:  123ms elp:  324ms ||  - 1b -io +bitrate %cpu foreground 
usr:  284ms sys:  357ms elp: 1692ms ||  - 1b -io +bitrate %cpu background1 
usr:  282ms sys:  331ms elp: 1679ms ||  - 1b -io +bitrate %cpu background2 
[1] "test 2a - sequential"
usr: 2437ms sys:  522ms elp: 2967ms ||  - 2a %io ~bitrate +cpu foreground 
usr:    1ms sys:    5ms elp: 2974ms ||  - 2a %io ~bitrate +cpu background 
usr:    1ms sys:    7ms elp: 3057ms ||  - 2a %io ~bitrate +cpu background2 
[1] "test 3a - parallel"
usr:   68ms sys:   21ms elp:  151ms ||  - 3a +io %bitrate %cpu foreground 
usr:   66ms sys:   76ms elp:  526ms ||  - 3a +io %bitrate %cpu pool=8 background burn-in  
usr:   63ms sys:   87ms elp:   83ms ||  - 3a +io %bitrate %cpu pool=8 background 
usr:   69ms sys:   92ms elp:  111ms ||  - 3a +io %bitrate %cpu pool=4 background 
usr:   54ms sys:   57ms elp:  164ms ||  - 3a +io %bitrate %cpu pool=2 background 
usr:   56ms sys:   54ms elp:  314ms ||  - 3a +io %bitrate %cpu pool=1 background 
[1] "test 3b - parallel"
usr:10911ms sys:  531ms elp:11068ms ||  - 3b %io %bitrate +cpu foreground 
usr:    4ms sys:   11ms elp: 3150ms ||  - 3b %io %bitrate +cpu pool=8 background burn-in  
usr:    3ms sys:    9ms elp: 2510ms ||  - 3b %io %bitrate +cpu pool=8 background 
usr:    4ms sys:    8ms elp: 3061ms ||  - 3b %io %bitrate +cpu pool=6 background 
usr:    3ms sys:    7ms elp: 3325ms ||  - 3b %io %bitrate +cpu pool=4 background 
usr:    4ms sys:    6ms elp: 5594ms ||  - 3b %io %bitrate +cpu pool=2 background 
usr:    4ms sys:    6ms elp:11030ms ||  - 3b %io %bitrate +cpu pool=1 background 
[1] "test 3c - parallel"
usr:11018ms sys:  732ms elp:11368ms ||  - 3c %io +bitrate +cpu foreground  
usr:  171ms sys:  192ms elp: 3127ms ||  - 3c %io +bitrate +cpu pool=8 background burn-in  
usr:  257ms sys:  123ms elp: 2437ms ||  - 3c %io +bitrate +cpu pool=8 background 
usr:  182ms sys:  100ms elp: 2947ms ||  - 3c %io +bitrate +cpu pool=6 background 
usr:  127ms sys:   61ms elp: 3306ms ||  - 3c %io +bitrate +cpu pool=4 background 
usr:   95ms sys:   59ms elp: 5712ms ||  - 3c %io +bitrate +cpu pool=2 background 
usr:   89ms sys:   48ms elp:11436ms ||  - 3c %io +bitrate +cpu pool=1 background 
[1] "test 3d - parallel + r-polars conversion"
usr:  469ms sys:   78ms elp:  592ms ||  - 3d %io +bitrate +cpu foreground  
usr:  502ms sys: 5470ms elp: 1812ms ||  - 3d %io +bitrate +cpu pool=8 background burn-in  
usr:  599ms sys: 6734ms elp: 1593ms ||  - 3d %io +bitrate +cpu pool=8 background 
usr:  532ms sys: 2434ms elp:  906ms ||  - 3d %io +bitrate +cpu pool=6 background 
usr:  434ms sys:  835ms elp:  658ms ||  - 3d %io +bitrate +cpu pool=4 background 
usr:  355ms sys:  387ms elp:  819ms ||  - 3d %io +bitrate +cpu pool=2 background 
usr:  324ms sys:  286ms elp: 1383ms ||  - 3d %io +bitrate +cpu pool=1 background 

@sorhawell
Copy link
Collaborator

sorhawell commented Aug 2, 2023

conclusion as far as I can see:
Previous pool impl is equally fast if number of processes is equal to polars workers or for any sequential maps . See "test 3abc pool=8 not burn-in" and "1x+2x"

Previous pool impl was slower in parallel especially for IO bounded tasks for lower number of pool size (see test 3a), because some or most processes would be created and destroyed after each worker iteration. For CPU bound test 3c, previous pool impl is actually faster, but that is because it was never limited on number of processes and could use 8 on this 4 core machine.

In general the IO overhead of background is about 2x more than foreground.
The bitrate overhead (inter process communnication) is about 5x (or perhaps up to 100x). But it is tough to compete with arrow zero-cost abstraction.
If using r-polars conversion to R vectors which is CPU bound, then bitrate is not especially limiting see test "3d".

The downside of the new impl it has +200 lines of code. The upside it will allow to use background R processes in more scenarios which are not extremely CPU bounded, e.g. for aggregating over groups. Also, users can now set the actual max number of R processes via pl$set_global_rpool()which may be useful to reduce memory usage.

So far old impl seemed to hit deadlock 3 times out of perhaps 100 tests. I have not seen any deadlocks on new pool impl yet. But this is very spurious and data is limited, so who knows. Having multiple workers that contend for two Mutex could perhaps lead to a deadlock if two workers got one each and is waiting for the other. This is just a theory, maybe not an issue.

@sorhawell
Copy link
Collaborator

I will write a counter suggestion and we will compare on benchmark. It might be ok with an unbounded pool. However maybe some user would like to bound their mem usage with pool size. Also, polars uses 2x thread of physical cores, e.g. 8 thread on my machine. I don't think it is always beneficial to have that many R sessions open at once. If 8 threads and cap=2. This explains the slowness because about 6 threads must create new processes and destroy them again continuously. Maybe you could patch the unbounded pool to not continuously open and close processes but it seems difficult to me.

I can also make an option for the user to configure whether the upper limit is a soft limit of hard limit.

I could not see an easy way to add a hard limit without some global accounting of total leased/pool process-handlers. How do you find this updated pool impl?

@Sicheng-Pan
Copy link
Collaborator Author

I could not see an easy way to add a hard limit without some global accounting of total leased/pool process-handlers. How do you find this updated pool impl?

@sorhawell I was thinking about this and I came to the conclusion that a counter for leased processes is necessary. Also we would need either a Condvar or a queue of mspc::Sender so that threads can wait for the in-use R processes, and I planned to make this an Option<_> so that we could tell if we have a hard limit (Some(condvar/queue)) or soft limit (None). But I thought this will change a lot to the old implementation, especially because I inevitably need to write a inner struct for the thread pool, so I did not proceed to do this. It seems that you implemented all these except for the Option<_> part, and the code looks pretty good to me. Thanks for the help!

Merge branch 'main' into lazy_in_background

# Conflicts:
#	R/extendr-wrappers.R
#	src/rust/src/lazy/dsl.rs
Merge branch 'main' into lazy_in_background

# Conflicts:
#	man/pl_pl.Rd
#	src/rust/src/lazy/dsl.rs
#	src/rust/src/rpolarserr.rs
@sorhawell sorhawell requested a review from eitsupi August 8, 2023 12:15
@sorhawell
Copy link
Collaborator

@eitsupi I have changed the Makefile make install to no handle dependencies. I guess make requirements could do that. I have added install to make docs because some examples require polars being build as a package to be loaded in background R processes.

@sorhawell
Copy link
Collaborator

@Sicheng-Pan As far as I see I think the huge PR is ready to go. Many thanks for your patience!

Copy link
Collaborator

@eitsupi eitsupi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this great work!

A few questions.

src/Makevars Outdated Show resolved Hide resolved
inst/misc/benchmark_rbackground.R Outdated Show resolved Hide resolved
Merge branch 'main' into lazy_in_background

# Conflicts:
#	NEWS.md
#	R/expr__expr.R
#	R/extendr-wrappers.R
#	R/lazyframe__lazy.R
#	man/LazyFrame_collect_background.Rd
#	src/rust/src/lazy/dataframe.rs
@sorhawell
Copy link
Collaborator

@Sicheng-Pan many thanks !!

@sorhawell sorhawell merged commit aa7acdd into main Aug 9, 2023
11 checks passed
@sorhawell sorhawell deleted the lazy_in_background branch August 9, 2023 15:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants