Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark_apply() with group_by parameter and arrow package returns summary results by batch, not group #3305

Closed
josephd000 opened this issue Dec 16, 2022 · 1 comment

Comments

@josephd000
Copy link

Summary

It seems that using the arrow package (v9.0.0.1) with sparklyr::spark_apply() (v1.7.5) and its group_by parameter may cause unexpected results due to batches being used by the worker(s). Namely, summary results are returned by batch rather than by group. Detaching arrow leads to summary results by group rather than by batch. My apologies if this is expected behavior or previously reported (I couldn't find it) or if this belongs under arrow's issues.

Unfortunately, my real problem demands results by group but I receive results by batch even without using arrow. I'll add that reprex if I figure it out.

Observed

There are only two values in the group_by column, "A" and "B". However, sparklyr::spark_apply() returns four result rows, two for "A" and two for "B". This is consistent with batches allowing 10,000 rows (see #2243 and/or #2503 for 10,000 as a magic number). As each group has 15,000 rows, it seems the rows are split into two batches for "A" and two for "B" and results are returned by batch rather than by group.

library(arrow)

        ##### my_func() does the following
        conf <- sparklyr::spark_config()
        conf$`sparklyr.shell.driver-class-path` <- "/msjdbcdir/sqljdbc_8.4/enu/mssql-jdbc-8.4.1.jre8.jar"
        conf$`sparklyr.shell.driver-memory` <- "8G"
        conf$spark.memory.fraction <- 0.9
        library(sparklyr)
        Sys.setenv(SPARK_HOME = "~/spark/spark-3.2.1-bin-hadoop3.2")
        library(rsparkling)
        sc <- sparklyr::spark_connect(
          master = "local", 
          version = "3.2.1", 
          config = conf)
        ##### my_func() ends

##### Detach `rsparkling` so it's not a hidden culprit introduced by `my_func()`
if ("rsparkling" %in% .packages()) detach("package:rsparkling")
data_df <- data.frame(
    Petal_Length = rnorm(30000), 
    Species = rep(c("A", "B"), times = 15000))
data_sf <- sparklyr::copy_to(
    sc, 
    data_df, 
    overwrite = TRUE)

sparklyr::spark_apply(
    data_sf, 
    f = function(x) {mean(x$Petal_Length)}, 
    group_by = "Species")
#> # Source: spark<?> [?? x 2]
#>   Species   result
#>   <chr>      <dbl>
#> 1 B        0.00733
#> 2 B       -0.0187 
#> 3 A        0.0114 
#> 4 A        0.00688

sessionInfo()
#> R version 4.2.1 (2022-06-23)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Ubuntu 20.04.5 LTS
#> 
#> Matrix products: default
#> BLAS:   /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3
#> LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/liblapack.so.3
#> 
#> locale:
#>  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
#>  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
#>  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
#>  [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
#>  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
#> [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       
#> 
#> attached base packages:
#> [1] stats     graphics  grDevices utils     datasets  methods   base     
#> 
#> other attached packages:
#> [1] sparklyr_1.7.5 arrow_9.0.0.1 
#> 
#> loaded via a namespace (and not attached):
#>  [1] tidyselect_1.1.2          xfun_0.33                
#>  [3] forge_0.2.0               purrr_0.3.4              
#>  [5] h2o_3.38.0.1              vctrs_0.4.1              
#>  [7] generics_0.1.3            htmltools_0.5.3          
#>  [9] yaml_2.3.5                base64enc_0.1-3          
#> [11] utf8_1.2.2                rlang_1.0.6              
#> [13] pillar_1.8.1              glue_1.6.2               
#> [15] withr_2.5.0               DBI_1.1.3                
#> [17] rappdirs_0.3.3            bit64_4.0.5              
#> [19] dbplyr_2.1.1              uuid_1.1-0               
#> [21] lifecycle_1.0.2           stringr_1.4.1            
#> [23] MYPACKAGE_0.0.24.9001       htmlwidgets_1.5.4        
#> [25] evaluate_0.16             knitr_1.40               
#> [27] fastmap_1.1.0             parallel_4.2.1           
#> [29] fansi_1.0.3               highr_0.9                
#> [31] r2d3_0.2.6                openssl_2.0.3            
#> [33] jsonlite_1.8.0            config_0.3.1             
#> [35] fs_1.5.2                  bit_4.0.4                
#> [37] askpass_1.1               digest_0.6.29            
#> [39] stringi_1.7.8             dplyr_1.0.10             
#> [41] rprojroot_2.0.3           cli_3.4.0                
#> [43] tools_4.2.1               bitops_1.0-7             
#> [45] magrittr_2.0.3            RCurl_1.98-1.7           
#> [47] tibble_3.1.8              tidyr_1.2.1              
#> [49] pkgconfig_2.0.3           ellipsis_0.3.2           
#> [51] reprex_2.0.2              assertthat_0.2.1         
#> [53] rmarkdown_2.16            httr_1.4.4               
#> [55] rstudioapi_0.14           R6_2.5.1                 
#> [57] rsparkling_3.38.0.1-1-3.2 compiler_4.2.1

Created on 2022-12-16 with reprex v2.0.2

Expected

There are only two values in the group_by column, "A" and "B", and sparklyr::spark_apply() returns two result rows and results are returned by group.

if ("arrow" %in% .packages()) detach("package:arrow")

        ##### my_func() does the following
        conf <- sparklyr::spark_config()
        conf$`sparklyr.shell.driver-class-path` <- "/msjdbcdir/sqljdbc_8.4/enu/mssql-jdbc-8.4.1.jre8.jar"
        conf$`sparklyr.shell.driver-memory` <- "8G"
        conf$spark.memory.fraction <- 0.9
        library(sparklyr)
        Sys.setenv(SPARK_HOME = "~/spark/spark-3.2.1-bin-hadoop3.2")
        library(rsparkling)
        sc <- sparklyr::spark_connect(
          master = "local", 
          version = "3.2.1", 
          config = conf)
        ##### my_func() ends

##### Detach `rsparkling` so it's not a hidden culprit introduced by `my_func()`
if ("rsparkling" %in% .packages()) detach("package:rsparkling")
data_df <- data.frame(
    Petal_Length = rnorm(30000), 
    Species = rep(c("A", "B"), times = 15000))
data_sf <- sparklyr::copy_to(
    sc, 
    data_df, 
    overwrite = TRUE)

sparklyr::spark_apply(
    data_sf, 
    f = function(x) {mean(x$Petal_Length)}, 
    group_by = "Species")
#> # Source: spark<?> [?? x 2]
#>   Species  result
#>   <chr>     <dbl>
#> 1 B       0.00453
#> 2 A       0.00383

sessionInfo()
#> R version 4.2.1 (2022-06-23)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Ubuntu 20.04.5 LTS
#> 
#> Matrix products: default
#> BLAS:   /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3
#> LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/liblapack.so.3
#> 
#> locale:
#>  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
#>  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
#>  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
#>  [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
#>  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
#> [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       
#> 
#> attached base packages:
#> [1] stats     graphics  grDevices utils     datasets  methods   base     
#> 
#> other attached packages:
#> [1] sparklyr_1.7.5
#> 
#> loaded via a namespace (and not attached):
#>  [1] compiler_4.2.1            pillar_1.8.1             
#>  [3] dbplyr_2.1.1              highr_0.9                
#>  [5] MYPACKAGE_0.0.24.9001       bitops_1.0-7             
#>  [7] r2d3_0.2.6                base64enc_0.1-3          
#>  [9] tools_4.2.1               uuid_1.1-0               
#> [11] digest_0.6.29             jsonlite_1.8.0           
#> [13] evaluate_0.16             lifecycle_1.0.2          
#> [15] tibble_3.1.8              pkgconfig_2.0.3          
#> [17] rlang_1.0.6               reprex_2.0.2             
#> [19] DBI_1.1.3                 cli_3.4.0                
#> [21] rstudioapi_0.14           parallel_4.2.1           
#> [23] yaml_2.3.5                xfun_0.33                
#> [25] fastmap_1.1.0             withr_2.5.0              
#> [27] dplyr_1.0.10              stringr_1.4.1            
#> [29] httr_1.4.4                knitr_1.40               
#> [31] askpass_1.1               rappdirs_0.3.3           
#> [33] generics_0.1.3            fs_1.5.2                 
#> [35] vctrs_0.4.1               htmlwidgets_1.5.4        
#> [37] rprojroot_2.0.3           tidyselect_1.1.2         
#> [39] glue_1.6.2                forge_0.2.0              
#> [41] R6_2.5.1                  rsparkling_3.38.0.1-1-3.2
#> [43] fansi_1.0.3               rmarkdown_2.16           
#> [45] h2o_3.38.0.1              tidyr_1.2.1              
#> [47] purrr_0.3.4               magrittr_2.0.3           
#> [49] ellipsis_0.3.2            htmltools_0.5.3          
#> [51] assertthat_0.2.1          config_0.3.1             
#> [53] utf8_1.2.2                stringi_1.7.8            
#> [55] openssl_2.0.3             RCurl_1.98-1.7

Created on 2022-12-16 with reprex v2.0.2

@josephd000
Copy link
Author

Ah, sorry, I was trying to be helpful with this issue that is close to my actual real-world issue, but I can see that this is expected behavior in Chapter 11.9.2 Apache Arrow.

So I'll close this for now and ask my question elsewhere:

  • Is there a way to say: I have 800 groups, 8 workers, and 100 groups per machine so it's OK to make batches within partition 1's 100 groups but not to break groups across batches?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant