Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reduce() and fold() #403

Merged
merged 14 commits into from
Oct 9, 2023
Merged

Implement reduce() and fold() #403

merged 14 commits into from
Oct 9, 2023

Conversation

etiennebacher
Copy link
Collaborator

@etiennebacher etiennebacher commented Sep 28, 2023

I started to look into how to implement those two functions but for now I run into a panic in a sub-thread:

df = pl$DataFrame(mtcars)

df$with_columns(
  pl$fold(
     acc = pl$lit(1), lambda = \(acc, x) acc, exprs = pl$col("mpg", "drat")
  )
)
Error: Execution halted with the following contexts
0: In R: in $with_columns()
0: During function call [df$with_columns(pl$reduce(lambda = function(acc, x) acc, exprs = pl$col("mpg",
"drat")))]
1: A polars sub-thread panicked. See panic msg, which is likely more informative than this error: Any { .. }

and I can't find a way to get the panic message. Also, this error comes from concurrent_handler() in Rust but I don't understand when this function is actually called.


@sorhawell this is mostly me dabbling with Rust so this might be completely off. Let me know if you want to rewrite that from scratch, otherwise I'd greatly appreciate some pointers on this if you have the time 😄

Related to #400

@sorhawell
Copy link
Collaborator

sorhawell commented Sep 28, 2023

this polars fold is more dynamic than the macro-like fold i proposed in #400 as this fold here folds over the Series at query time, whereas in #400 we folded over the "abstract" expressions. This fold is more related to map
The difference is this map-like fold, over a macro-like fold, is here it can access the actual values via the Series, but it will also a bit slower if a LOT of exprs to fold over, and likely less optimization is possible because polars cannot see the entire query at once and polars cannot reason about what happens in the R/python code and side effects are possible.

polars and R lambdas / map is pretty hairy stuff. I will try to write it as hint as how I think it should be done

This seems to be the py-polars call stack for fold
https://github.com/pola-rs/polars/blob/py-0.19.5/py-polars/polars/functions/lazy.py#L999-L1104
https://github.com/pola-rs/polars/blob/7c2098cd77bcd25aae1931fc9e3337059d070082/py-polars/src/functions/lazy.rs#L330
https://github.com/pola-rs/polars/blob/7c2098cd77bcd25aae1931fc9e3337059d070082/py-polars/src/map/lazy.rs#L71

py-polars can benefit from pyo3 to interface lambdas, where we our own extendr_concurrent.rs . It looks like you knew that :)

our lambda-interface have 4 "sections"

1 - the core polars-R interface module in extendr_concurrent.rs
2 - the initialization of ThreadCom
3 - the more specific derived "host"-side functions for collect/profile/fetch in utils/concurrent.rs
4 - the function "client"-side functions like $map
and $map calls this fn

It seems you have used snippets from 2,3,4. You should likely only need something like in 4. Perhaps you may be able to derive $fold from $map() and just piggy-bag all the hairy stuff.

this is how rpolars map looks like

Expr_map = function(f, output_type = NULL, agg_list = FALSE, in_background = FALSE) {

The signature of binary_lambda is (Series,Series) -> Series, whereas it is (Series)->(Series) for map. I hope we can fairly cheap reuse the map implementation by using a struct to stitch acc Series together with x Series for input.

@etiennebacher
Copy link
Collaborator Author

Thanks for all the details!

Perhaps you may be able to derive $fold from $map() and just piggy-bag all the hairy stuff.
...
The signature of binary_lambda is (Series,Series) -> Series, whereas it is (Series)->(Series) for map. I hope we can fairly cheap
reuse the map implementation by using a struct to stitch acc Series together with x Series for input.

This is actually what I tried to do so far. The problem is that, as you say, the move call in map() has (Series)->(Series) while it should be (Series, Series)->(Series) in the case of fold(). Although I didn't try yet, I think stitching the two inputs Series together and passing them as a single arg will be a problem since the R function still expects two inputs.

Therefore, I thought the issue is that the thread_com object in map() can only take one Series because of how it is initialized with &CONFIG. This is why I defined my own FOO config in fold() that accepts two input Series. Given the error message in my initial post, it looks like something is wrong in the subthread, either when sending the request or when receiving the results.

@sorhawell
Copy link
Collaborator

sorhawell commented Sep 29, 2023

I persued the wrap struct approach. I will try also to look into what your error is exactly. I'm on long family visit and only have a few hours with a computer per day.

I have a panic in fold2 because the global threadcom is destroyed if folding over multiple exprs instead of just 1.

otherwise it works, not sure why this error, it seems someone "hangs up the phone" a little too early. Will try to look more into it.

Merge remote-tracking branch 'origin/main' into reduce-fold

# Conflicts:
#	man/pl_pl.Rd
@sorhawell
Copy link
Collaborator

@etiennebacher The reason your approach gave errors are:

both host side (collect, fetch...) lambda called serve_r and client side lambda(map, fold, reduce) must have compatible signatures fn(Series) -> Seiries. Even though concurrent_handler is a generic function we must currently pick a single signature for the entire package. We could now or in future rework it such that the signature was an Enum of various signatures. @Sicheng-Pan used a similar pattern for sending various types IPC R jobs.

Also there can only be one global storage CONFIG where thread can acquire a ThreadCom and send R jobs to main thread otherwise.

@sorhawell
Copy link
Collaborator

sorhawell commented Sep 29, 2023

I found a subtle bug in the concurrent handling, where if a user writes a lambda that uses e.g. collect inside the concurrent handler, then a new concurrent handler will start and drop the old one. The result is no new job can be submitted and the query will stop. If all R jobs already were in queue, the error would not manifest. With $fold(), new R job are submitted sequentially, and therefore the error showed. Now the collectors: collect(), fetch() & profile() will check if there already is a concurrent handler running.

I think any recursive polars query inside embedded R code inside a polars query ... and so on can still submit R jobs to the single outer concurrent handler. So it should be fine now.

@sorhawell
Copy link
Collaborator

sorhawell commented Sep 29, 2023

@etiennebacher we need to decide if we wanna go with Enum-signatures over struct-wrappers. Enum has near zero overhead, where struct just quite small in any practical sense. Enum would allow more future flexibility. I don't know how clean the Enum-signtures can be implemented, or it would be mentally too abstract.

I can give it a try. Otherwise we could leave for another time.

@sorhawell
Copy link
Collaborator

pl$fold2(pl$lit(1:5),\(acc,x) acc + 2L*x, list(pl$lit(5:1), pl$lit(11:15)))$lit_to_s()
polars Series: shape: (5,)
Series: '' [i32]
[
	33
	34
	35
	36
	37
]

@etiennebacher
Copy link
Collaborator Author

For info the current implementation doesn't work with the simple example from py-polars:

df = pl$DataFrame(mtcars)

# Make the rowwise sum of two columns and add 1 to it
df$with_columns(
  pl$fold2(
     acc = pl$lit(1), lambda = \(acc, x) acc + x, exprs = pl$col("mpg", "drat")
  )
)
Error: Execution halted with the following contexts
0: In R: in $with_columns()
0: During function call [df$with_columns(pl$fold2(acc = pl$lit(1), lambda = function(acc,
x) acc + x, exprs = pl$col("mpg", "drat")))]
1: Encountered the following error in Rust-Polars:
lengths don't match: could not create a new dataframe: series "_a" has length 1 while series "mpg" has length 32

we need to decide if we wanna go with Enum-signatures over struct-wrappers. Enum has near zero overhead, where struct just quite small in any practical sense. Enum would allow more future flexibility. I don't know how clean the Enum-signtures can be implemented, or it would be mentally too abstract.

Maybe I'm missing some info, but from what you describe it sounds like Enum only has advantages so I'd say we should use it.

I found a subtle bug in the concurrent handling

Is this bug important outside of this PR? If so, it should probably be fixed in a separate smaller PR because this one might take a while to be merged.

@sorhawell
Copy link
Collaborator

Makes sense I will separate out bugfix and enum-signature in one or two PRs.

@etiennebacher etiennebacher mentioned this pull request Oct 3, 2023
Merge remote-tracking branch 'origin/main' into reduce-fold

# Conflicts:
#	src/rust/Cargo.toml
#	src/rust/src/concurrent.rs
#	src/rust/src/rlib.rs
@sorhawell sorhawell changed the title [WIP] Start implementing reduce() and fold() Start implementing reduce() and fold() Oct 8, 2023
@sorhawell
Copy link
Collaborator

fold reduce seems to work well now

Copy link
Collaborator Author

@etiennebacher etiennebacher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking over this PR, @sorhawell. This looks good to me but I can't find a nice way to adapt the rowwise-median example of #400. Do you want to update your answer there and close it at the same time?

Also, I can't approve this PR since I created it

@etiennebacher etiennebacher marked this pull request as ready for review October 9, 2023 11:29
@etiennebacher etiennebacher changed the title Start implementing reduce() and fold() Implement reduce() and fold() Oct 9, 2023
@etiennebacher etiennebacher merged commit 58f83f3 into main Oct 9, 2023
11 checks passed
@etiennebacher etiennebacher deleted the reduce-fold branch October 9, 2023 11:39
@sorhawell
Copy link
Collaborator

Thanks for taking over this PR, @sorhawell. This looks good to me but I can't find a nice way to adapt the rowwise-median example of #400. Do you want to update your answer there and close it at the same time?

Also, I can't approve this PR since I created it

I will assign the issue to me. The corresponding data.table example was not very efficient either because OP accessed a single row at the time likely without making use of data.table indexing.

Probably the best solution is to chunk the frame lazily and then transpose, compute medians on 100-1000 rows at the time and then concat back the result. It is likely fairly cache friendly. Any computation which can be performed vectorized can be done with some fold.

@etiennebacher etiennebacher mentioned this pull request Nov 6, 2023
29 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants