Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error for the weight argument in dplyr::sample_frac #2592

Closed
ajing opened this issue Jul 7, 2020 · 8 comments · Fixed by #2606
Closed

Error for the weight argument in dplyr::sample_frac #2592

ajing opened this issue Jul 7, 2020 · 8 comments · Fixed by #2606
Assignees
Milestone

Comments

@ajing
Copy link

@ajing ajing commented Jul 7, 2020

the weight argument in dplyr::sample_frac seems not working. Is there any way to resolve that?

spark_version <- "2.4.5"
sc <- spark_connect(master = "local", version = spark_version)

library(dplyr, warn.conflicts = FALSE)
sc <- spark_connect("local")
m <- sdf_copy_to(sc, mtcars)

m %>%
  filter(am == 0) %>%
  sample_frac(0.5, weight = gear)
Error in structure(list(name = name, x = x, dots = dots, args = args), : object 'gear' not found
Traceback:

1. m %>% filter(am == 0) %>% sample_frac(0.5, weight = gear)
2. withVisible(eval(quote(`_fseq`(`_lhs`)), env, env))
3. eval(quote(`_fseq`(`_lhs`)), env, env)
4. eval(quote(`_fseq`(`_lhs`)), env, env)
5. `_fseq`(`_lhs`)
6. freduce(value, `_function_list`)
7. withVisible(function_list[[k]](value))
8. function_list[[k]](value)
9. sample_frac(., 0.5, weight = gear)
10. sample_frac.tbl_spark(., 0.5, weight = gear)
11. add_op_single("sample_frac", .data = tbl, args = list(size = size, 
  .     replace = replace, weight = weight, .env = .env))
12. op_single(name, x = .data$ops, dots = dots, args = args)
13. structure(list(name = name, x = x, dots = dots, args = args), 
  .     class = c(paste0("op_", name), "op_single", "op"))
devtools::session_info()
─ Session info ───────────────────────────────────────────────────────────────
 setting  value                       
 version  R version 3.6.1 (2019-07-05)
 os       Ubuntu 18.04.3 LTS          
 system   x86_64, linux-gnu           
 ui       X11                         
 language (EN)                        
 collate  C.UTF-8                     
 ctype    C.UTF-8                     
 tz       America/Los_Angeles         
 date     2020-07-07                  

─ Packages ───────────────────────────────────────────────────────────────────
 package     * version date       lib source        
 askpass       1.0     2019-01-02 [2] CRAN (R 3.6.0)
 assertthat    0.2.1   2019-03-21 [2] CRAN (R 3.6.0)
 backports     1.1.4   2019-04-10 [2] CRAN (R 3.6.0)
 base64enc     0.1-3   2015-07-28 [2] CRAN (R 3.6.0)
 callr         3.4.3   2020-03-28 [2] CRAN (R 3.6.1)
 cli           2.0.2   2020-02-28 [2] CRAN (R 3.6.1)
 config        0.3     2018-03-27 [1] CRAN (R 3.6.1)
 crayon        1.3.4   2017-09-16 [2] CRAN (R 3.6.0)
 DBI           1.0.0   2018-05-02 [2] CRAN (R 3.6.0)
 dbplyr        1.4.0   2019-04-23 [2] CRAN (R 3.6.0)
 desc          1.2.0   2018-05-01 [2] CRAN (R 3.6.1)
 devtools    * 2.3.0   2020-04-10 [2] CRAN (R 3.6.1)
 digest        0.6.25  2020-02-23 [2] CRAN (R 3.6.1)
 dplyr       * 0.8.0.1 2019-02-15 [2] CRAN (R 3.6.0)
 ellipsis      0.3.0   2019-09-20 [2] CRAN (R 3.6.1)
 evaluate      0.13    2019-02-12 [2] CRAN (R 3.6.0)
 fansi         0.4.0   2018-10-05 [2] CRAN (R 3.6.0)
 forge         0.2.0   2019-02-26 [1] CRAN (R 3.6.1)
 fs            1.4.2   2020-06-30 [2] CRAN (R 3.6.1)
 generics      0.0.2   2018-11-29 [2] CRAN (R 3.6.0)
 glue          1.4.1   2020-05-13 [2] CRAN (R 3.6.1)
 htmltools     0.3.6   2017-04-28 [2] CRAN (R 3.6.0)
 htmlwidgets   1.3     2018-09-30 [2] CRAN (R 3.6.0)
 httr          1.4.1   2019-08-05 [2] CRAN (R 3.6.1)
 IRdisplay     0.7.0   2018-11-29 [2] CRAN (R 3.6.1)
 IRkernel      1.1     2019-12-06 [2] CRAN (R 3.6.1)
 jsonlite      1.7.0   2020-06-25 [2] CRAN (R 3.6.1)
 lifecycle     0.2.0   2020-03-06 [2] CRAN (R 3.6.1)
 magrittr      1.5     2014-11-22 [2] CRAN (R 3.6.0)
 memoise       1.1.0   2017-04-21 [2] CRAN (R 3.6.0)
 openssl       1.3     2019-03-22 [2] CRAN (R 3.6.0)
 pbdZMQ        0.3-3   2018-05-05 [2] CRAN (R 3.6.1)
 pillar        1.4.3   2019-12-20 [2] CRAN (R 3.6.1)
 pkgbuild      1.0.8   2020-05-07 [2] CRAN (R 3.6.1)
 pkgconfig     2.0.2   2018-08-16 [2] CRAN (R 3.6.0)
 pkgload       1.1.0   2020-05-29 [2] CRAN (R 3.6.1)
 prettyunits   1.0.2   2015-07-13 [2] CRAN (R 3.6.0)
 processx      3.4.3   2020-07-05 [2] CRAN (R 3.6.1)
 ps            1.3.0   2018-12-21 [2] CRAN (R 3.6.0)
 purrr         0.3.4   2020-04-17 [2] CRAN (R 3.6.1)
 r2d3          0.2.3   2018-12-18 [1] CRAN (R 3.6.1)
 R6            2.4.0   2019-02-14 [2] CRAN (R 3.6.0)
 Rcpp          1.0.1   2019-03-17 [2] CRAN (R 3.6.0)
 remotes       2.1.1   2020-02-15 [2] CRAN (R 3.6.1)
 repr          1.1.0   2020-01-28 [2] CRAN (R 3.6.1)
 rlang         0.4.6   2020-05-02 [2] CRAN (R 3.6.1)
 rprojroot     1.3-2   2018-01-03 [1] CRAN (R 3.6.1)
 rstudioapi    0.11    2020-02-07 [2] CRAN (R 3.6.1)
 sessioninfo   1.1.1   2018-11-05 [2] CRAN (R 3.6.1)
 sparklyr    * 1.2.0   2020-04-20 [1] CRAN (R 3.6.1)
 testthat      2.3.2   2020-03-02 [2] CRAN (R 3.6.1)
 tibble        3.0.1   2020-04-20 [2] CRAN (R 3.6.1)
 tidyselect    0.2.5   2018-10-11 [2] CRAN (R 3.6.0)
 usethis     * 1.6.1   2020-04-29 [2] CRAN (R 3.6.1)
 uuid          0.1-2   2015-07-28 [2] CRAN (R 3.6.0)
 vctrs         0.3.1   2020-06-05 [2] CRAN (R 3.6.1)
 withr         2.1.2   2018-03-15 [2] CRAN (R 3.6.0)
 yaml          2.2.0   2018-07-25 [2] CRAN (R 3.6.0)

@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Jul 8, 2020

@ajing Thanks for reporting this!

Given that the query works on a R data frame (i.e., mtcars %>% filter(am == 0) %>% sample_frac(0.5, weight = gear) works) and does not work on a Spark data frame, it's clear that something is missing in sparklyr for dbplyr to be able to translate sample_frac with weight = gear arg successfully into Spark SQL. I'll look into it.

@yitao-li yitao-li self-assigned this Jul 8, 2020
@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Jul 8, 2020

@ajing Further to my comment above: I think one reason the dplyr sample_frac(..., weight = ...) manipulation is not yet supported for a Spark Dataframe is there is no equivalent of weighed sampling in Spark SQL yet (i.e., there are only TABLESAMPLE (...) PERCENT and TABLESAMPLE (...) ROWS that I could find from https://docs.databricks.com/spark/latest/spark-sql/language-manual/select.html#sampling, none of which performs the type of weighted sampling required).

Because dplyr relies on a translation layer that converts all data manipulation verbs in R into SQL queries that Apache Spark can process, it is limited by what Spark SQL can actually support.

However, while the weighted sampling from a Spark dataframe will not be directly feasible with dplyr, there is always a possibility that it can be supported by a separate helper method in Sparklyr. The only distinction here is such helper method will be under a different name, and also most likely will have to create a temporary view with some additional columns for the purpose of making weighted random sampling expressible via a Spark SQL select query. It could be a fun exercise and could potentially be part of the next sparklyr release. Let me know what you think.

@ajing
Copy link
Author

@ajing ajing commented Jul 8, 2020

@yl790 sounds good to me. We need a view with a weight column and a column with random numbers. Will you work on that or I can also look into it.

@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Jul 8, 2020

@ajing I'll definitely aim to have that as part of sparklyr 1.4 -- It feels like something that should be part of sparklyr asap.

@ajing
Copy link
Author

@ajing ajing commented Jul 8, 2020

@yl790 thanks for giving it a high priority, Yitao! What is the expected date for the release of sparkly 1.4? I would like to try it.

@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Jul 8, 2020

@ajing It could be 1-2 months away (given that Sparklyr 1.3 was only released just recently). Meanwhile instead of waiting for a release, there is always the option of devtools::install_github("sparklyr/sparklyr", ref = "master") to try out something that is not released yet. Usually only sufficiently stable changes are checked in to the master branch of sparklyr, and also, the weighted sampling feature in question feels like something that should be quite straightforward as it doesn't depend much on other components of sparklyr and doesn't have many external dependencies either apart from Spark itself.

@ajing
Copy link
Author

@ajing ajing commented Jul 8, 2020

@yl790 cool, thanks! definitely will try it when you have a working version.

@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Jul 20, 2020

@ajing You can now run devtools::install_github("yl790/sparklyr", ref = "weighted_sampling") to install a version of sparklyr with the sparklyr::sdf_weighted_sample(...) functionalities.

sparklyr::sdf_weighted_sample(...) has been designed to make use of the exponential variate technique described in https://dl.acm.org/doi/10.5555/1138831.1711169 to have the following desirable properties:

  • Correctness: copy_to(sc, df) %>% sparklyr::sdf_weighted_sample(weight_col = "weight", replacement = [replacement]) and df %>% dplyr::slice_sample(weight_by = weight, replace = [replacement]) will have equivalent sampling behaviors, for both replacement = FALSE and replacement = TRUE (this is verified by Kolmogorov–Smirnov tests on large number of samples produced by both methods)
  • Efficiency: sparklyr::sdf_weighted_sample(...) will require only 1 pass through all data points (whereas the naive approach would have required at least 2 passes for replacement = TRUE and 2+ passes for replacement = FALSE)
  • Scalability: if your Spark data frame contains multiple partitions, then sparklyr::sdf_weighted_sample(...) will process those partitions in parallel, fully utilizing available executors in a Spark cluster
  • (bonus) sparklyr::sdf_weighted_sample(..., replacement = FALSE) can also be slightly modified to perform weighted reservoir sampling on a stream of data points (I'll probably look into this at some point but I imagine it should be fairly easy)

Example usage:

library(sparklyr)
sc <- spark_connect(master = "local")

example_data <- data.frame(
  id = seq(100),
  weight = c(rep(1, 50), rep(2, 25), rep(4, 10), rep(8, 10), rep(16, 5))
)  # create a R dataframe with 100 rows, with row 1-50 having weight 1, row 51-75 having weight 2, row 76-85 having weight 4, etc

sdf <- copy_to(sc, example_data) # copy example data into a Spark data frame

samples_sdf <- sdf %>% sdf_weighted_sample(weight_col = "weight", k = 20, replacement = FALSE) # obtain a weighted sample set of size 20 without replacement
samples_sdf <- sdf %>% sdf_weighted_sample(weight_col = "weight", k = 20, replacement = TRUE) # obtain a weighted sample set of size 20 with replacement

@yitao-li yitao-li added this to the 1.4.0 milestone Jul 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants