Skip to content

Like mutate, summarise needs to create subqueries when necessary #114

@ghost

Description

@AjarKeen commented on Jan 8, 2018, 9:44 PM UTC:

cc @javierluraschi since I've only tested this with sparklyr, not with other dbplyr backends.

I'm starting to use `sparklyr` to do grouped time series aggregations, and very quickly ran into this problem. Here's a modest reprex showing how it works locally, the `sparklyr` version that fails, and then a workaround:
library(dplyr)
library(sparklyr)
library(nycflights13)

# local version
flights %>%
  group_by(carrier) %>%
  summarize(count_num = n(), 
            mean_dep_delay = mean(dep_delay, na.rm = TRUE),
            ratio = mean_dep_delay / count_num) %>% 
  arrange(carrier)

#> # A tibble: 16 x 4
#>    carrier count_num mean_dep_delay    ratio
#>    <chr>       <int>          <dbl>    <dbl>
#>  1 9E          18460          16.7  0.000906
#>  2 AA          32729           8.59 0.000262
#>  3 AS            714           5.80 0.00813 
#>  4 B6          54635          13.0  0.000238
#>  5 DL          48110           9.26 0.000193
#>  6 EV          54173          20.0  0.000368
#>  7 F9            685          20.2  0.0295  
#>  8 FL           3260          18.7  0.00574 
#>  9 HA            342           4.90 0.0143  
#> 10 MQ          26397          10.6  0.000400
#> 11 OO             32          12.6  0.393   
#> 12 UA          58665          12.1  0.000206
#> 13 US          20536           3.78 0.000184
#> 14 VX           5162          12.9  0.00249 
#> 15 WN          12275          17.7  0.00144 
#> 16 YV            601          19.0  0.0316

# Spark version
sc <- spark_connect(master = "local")

flights_sdf <- copy_to(sc, flights, "flights")

flights_sdf %>%
  group_by(carrier) %>%
  summarize(count_num = n(), 
            mean_dep_delay = mean(dep_delay),
            ratio = mean_dep_delay / count_num) %>%
  collect()

#> Warning: Missing values are always removed in SQL.
#> Use `AVG(x, na.rm = TRUE)` to silence this warning
#> Error: org.apache.spark.sql.AnalysisException: cannot resolve '`mean_dep_delay`' given input columns: [dest, dep_delay, distance, dep_time, minute, carrier, origin, sched_arr_time, month, arr_time, day, flight, sched_dep_time, time_hour, arr_delay, air_time, hour, tailnum, year]; line 1 pos 81;
#> 'Aggregate [carrier#38], [carrier#38, count(1) AS count_num#447L, avg(dep_delay#34) AS mean_dep_delay#448, ('mean_dep_delay / 'count_num) AS ratio#449]
#> +- SubqueryAlias flights
#>    +- LogicalRDD [year#29, month#30, day#31, dep_time#32, sched_dep_time#33, dep_delay#34, arr_time#35, sched_arr_time#36, arr_delay#37, carrier#38, flight#39, tailnum#40, origin#41, dest#42, air_time#43, distance#44, hour#45, minute#46, time_hour#47]
#> 
#>  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$transformUp$1.apply(TreeNode.scala:289)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$transformUp$1.apply(TreeNode.scala:289)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$3.apply(TreeNode.scala:286)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$3.apply(TreeNode.scala:286)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:306)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$3.apply(TreeNode.scala:286)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$3.apply(TreeNode.scala:286)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:306)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:268)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:289)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1$1.apply(QueryPlan.scala:293)
#>  at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:234)
#>  at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:234)
#>  at scala.collection.immutable.List.foreach(List.scala:381)
#>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
#>  at scala.collection.immutable.List.map(List.scala:285)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$recursiveTransform$1(QueryPlan.scala:293)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$6.apply(QueryPlan.scala:298)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
#>  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
#>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
#>  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
#>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
#>  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
#>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#>  at java.lang.reflect.Method.invoke(Method.java:498)
#>  at sparklyr.Invoke$.invoke(invoke.scala:102)
#>  at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
#>  at sparklyr.StreamHandler$.read(stream.scala:62)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:52)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
#>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
#>  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
#>  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
#>  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
#>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
#>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
#>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
#>  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
#>  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
#>  at java.lang.Thread.run(Thread.java:748)

# Spark workaround
flights_sdf %>%
  group_by(carrier) %>%
  mutate(count_num = n(), 
         mean_dep_delay = mean(dep_delay),
         ratio = mean_dep_delay / count_num) %>%
  summarize(count_num = mean(count_num),
            mean_dep_delay = mean(mean_dep_delay),
            ratio = mean(ratio)) %>%
  arrange(carrier) %>%
  collect()

#> Warning: Missing values are always removed in SQL.
#> Use `AVG(x, na.rm = TRUE)` to silence this warning

#> Warning: Missing values are always removed in SQL.
#> Use `AVG(x, na.rm = TRUE)` to silence this warning

#> Warning: Missing values are always removed in SQL.
#> Use `AVG(x, na.rm = TRUE)` to silence this warning

#> Warning: Missing values are always removed in SQL.
#> Use `avg(x, na.rm = TRUE)` to silence this warning

#> # A tibble: 16 x 4
#>    carrier count_num mean_dep_delay    ratio
#>    <chr>       <dbl>          <dbl>    <dbl>
#>  1 9E        18460            16.7  0.000906
#>  2 AA        32729             8.59 0.000262
#>  3 AS          714             5.80 0.00813 
#>  4 B6        54635            13.0  0.000238
#>  5 DL        48110             9.26 0.000193
#>  6 EV        54173            20.0  0.000368
#>  7 F9          685            20.2  0.0295  
#>  8 FL         3260            18.7  0.00574 
#>  9 HA          342             4.90 0.0143  
#> 10 MQ        26397            10.6  0.000400
#> 11 OO           32.0          12.6  0.393   
#> 12 UA        58665            12.1  0.000206
#> 13 US        20536             3.78 0.000184
#> 14 VX         5162            12.9  0.00249 
#> 15 WN        12275            17.7  0.00144 
#> 16 YV          601            19.0  0.0316

My session info after I run the above:

> devtools::session_info()
Session info ---------------------------------------------------------------------
 setting  value                       
 version  R version 3.4.1 (2017-06-30)
 system   x86_64, linux-gnu           
 ui       RStudio (1.1.383)           
 language (EN)                        
 collate  en_US.UTF-8                 
 tz       <NA>                        
 date     2018-01-08                  

Packages -------------------------------------------------------------------------
 package      * version    date       source                           
 assertthat     0.2.0      2017-04-11 CRAN (R 3.4.1)                   
 backports      1.1.2      2017-12-13 cran (@1.1.2)                    
 base         * 3.4.1      2017-09-07 local                            
 base64enc      0.1-3      2015-07-28 CRAN (R 3.4.1)                   
 bindr          0.1        2016-11-13 CRAN (R 3.4.1)                   
 bindrcpp     * 0.2        2017-06-17 CRAN (R 3.4.1)                   
 broom          0.4.3      2017-11-20 cran (@0.4.3)                    
 callr          1.0.0      2016-06-18 CRAN (R 3.4.1)                   
 cli            1.0.0      2017-11-05 CRAN (R 3.4.1)                   
 clipr          0.4.0      2017-11-03 CRAN (R 3.4.1)                   
 compiler       3.4.1      2017-09-07 local                            
 config         0.2        2016-08-02 CRAN (R 3.4.1)                   
 crayon         1.3.4      2017-09-16 CRAN (R 3.4.1)                   
 datasets     * 3.4.1      2017-09-07 local                            
 DBI            0.7        2017-06-18 CRAN (R 3.4.1)                   
 dbplyr         1.2.0      2018-01-03 cran (@1.2.0)                    
 devtools       1.13.4     2017-11-09 CRAN (R 3.4.1)                   
 digest         0.6.13     2017-12-14 cran (@0.6.13)                   
 dplyr        * 0.7.4      2017-09-28 CRAN (R 3.4.1)                   
 evaluate       0.10.1     2017-06-24 CRAN (R 3.4.1)                   
 foreign        0.8-69     2017-06-22 CRAN (R 3.4.1)                   
 glue           1.2.0      2017-10-29 CRAN (R 3.4.1)                   
 graphics     * 3.4.1      2017-09-07 local                            
 grDevices    * 3.4.1      2017-09-07 local                            
 grid           3.4.1      2017-09-07 local                            
 htmltools      0.3.6      2017-04-28 CRAN (R 3.4.1)                   
 httpuv         1.3.5      2017-07-04 CRAN (R 3.4.1)                   
 httr           1.3.1      2017-08-20 CRAN (R 3.4.1)                   
 jsonlite       1.5        2017-06-01 CRAN (R 3.4.1)                   
 knitr          1.17       2017-08-10 CRAN (R 3.4.1)                   
 lattice        0.20-35    2017-03-25 CRAN (R 3.4.1)                   
 lazyeval       0.2.1      2017-10-29 CRAN (R 3.4.1)                   
 magrittr       1.5        2014-11-22 CRAN (R 3.4.1)                   
 memoise        1.1.0      2017-04-21 CRAN (R 3.4.1)                   
 methods      * 3.4.1      2017-09-07 local                            
 mime           0.5        2016-07-07 CRAN (R 3.4.1)                   
 mnormt         1.5-5      2016-10-15 CRAN (R 3.4.1)                   
 nlme           3.1-131    2017-02-06 CRAN (R 3.4.1)                   
 nycflights13 * 0.2.2      2017-01-27 CRAN (R 3.4.1)                   
 openssl        0.9.9      2017-11-10 cran (@0.9.9)                    
 parallel       3.4.1      2017-09-07 local                            
 pillar         1.0.1      2017-11-27 cran (@1.0.1)                    
 pkgconfig      2.0.1      2017-03-21 CRAN (R 3.4.1)                   
 plyr           1.8.4      2016-06-08 CRAN (R 3.4.1)                   
 psych          1.7.8      2017-09-09 CRAN (R 3.4.1)                   
 purrr          0.2.4      2017-10-18 CRAN (R 3.4.1)                   
 R6             2.2.2      2017-06-17 CRAN (R 3.4.1)                   
 Rcpp           0.12.14    2017-11-23 cran (@0.12.14)                  
 reprex       * 0.1.1      2017-01-13 CRAN (R 3.4.1)                   
 reshape2       1.4.3      2017-12-11 cran (@1.4.3)                    
 rlang          0.1.6      2017-12-21 cran (@0.1.6)                    
 rmarkdown      1.7        2017-11-10 CRAN (R 3.4.1)                   
 rprojroot      1.3-2      2018-01-03 cran (@1.3-2)                    
 rstudioapi     0.7        2017-09-07 CRAN (R 3.4.1)                   
 shiny          1.0.5      2017-08-23 CRAN (R 3.4.1)                   
 sparklyr     * 0.7.0-9106 2018-01-08 Github (rstudio/sparklyr@41d145a)
 stats        * 3.4.1      2017-09-07 local                            
 stringi        1.1.5      2017-04-07 CRAN (R 3.4.1)                   
 stringr        1.2.0      2017-02-18 CRAN (R 3.4.1)                   
 tibble         1.4.1      2017-12-25 cran (@1.4.1)                    
 tidyr          0.7.2      2017-10-16 CRAN (R 3.4.1)                   
 tools          3.4.1      2017-09-07 local                            
 utf8           1.1.3      2018-01-03 cran (@1.1.3)                    
 utils        * 3.4.1      2017-09-07 local                            
 whisker        0.3-2      2013-04-28 CRAN (R 3.4.1)                   
 withr          2.1.1      2017-12-19 cran (@2.1.1)                    
 xtable         1.8-2      2016-02-05 CRAN (R 3.4.1)                   
 yaml           2.1.14     2016-11-12 CRAN (R 3.4.1) 

I'm not sure if this is related to a similar-sounding issue with mutate / rename that was resolved in dbplyr 1.2.0. I can work around it for now, but it will get clunky fast as I get into more complex aggregation operations with sparklyr.

This issue was moved by krlmlr from tidyverse/dplyr/issues/3295.

Metadata

Metadata

Assignees

No one assigned

    Labels

    featurea feature request or enhancementverb trans 🤖Translation of dplyr verbs to SQL

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions