Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leading periods in the suffix argument of dplyr::inner_join causes Spark error on column conflict #2648

Closed
wkdavis opened this issue Aug 11, 2020 · 11 comments · Fixed by #2651
Assignees
Milestone

Comments

@wkdavis
Copy link
Contributor

@wkdavis wkdavis commented Aug 11, 2020

When joining multiple tbl_spark tables using dplyr::inner_join(), using leading periods (.) in the suffix argument (which happens with the argument default, c(".x", ".y")) causes an error.

suppressPackageStartupMessages(library("dplyr"))
#> Warning: package 'dplyr' was built under R version 3.6.2
packageVersion("dplyr")
#> [1] '1.0.0'
library("sparklyr")
#> Warning: package 'sparklyr' was built under R version 3.6.2
packageVersion("sparklyr")
#> [1] '1.3.0'

conf <- spark_config()
conf$`sparklyr.shell.driver-memory` <- "8G"  
conf$spark.memory.fraction <- 0.8
sc <- spark_connect(master = "local", config = conf, version = "3.0.0")

df1 <- data.frame(group = sample(c("A", "B"),size = 10, replace = T),
                   value1 = rnorm(10),
                  conflict = "df1")
df2 <- data.frame(group = c("A","B"),
                  lookupA = c("aA","aB"),
                  conflict = "df2")
df3 <- data.frame(group = c("A","B"),
                  lookupB = c("bA","bB"),
                  conflict = "df3")
dt1 <- copy_to(sc, df1, "test1", overwrite = TRUE)
dt2 <- copy_to(sc, df2, "test2", overwrite = TRUE)
dt3 <- copy_to(sc, df3, "test3", overwrite = TRUE)


inner_join(
  tbl(sc, "test1"),
  tbl(sc, "test2"),
  by = "group",
  suffix = c(".table1", ".table2")
) %>%
  inner_join(tbl(sc, "test3"),
             by = "group",
             suffix = c(".join1", ".table3")) -> x
head(x)
#> Error: org.apache.spark.sql.AnalysisException: cannot resolve '`LHS.conflict.table1`' given input columns: [RHS.conflict, LHS.conflict.table1, LHS.conflict.table2, RHS.group, LHS.group, LHS.lookupA, RHS.lookupB, LHS.value1]; line 2 pos 67;
#> 'GlobalLimit 6
#> +- 'LocalLimit 6
#>    +- 'Project [*]
#>       +- 'SubqueryAlias dbplyr_001
#>          +- 'Project [group#648 AS group#653, value1#649 AS value1#654, 'LHS.conflict.table1 AS conflict.table1#655, lookupA#651 AS lookupA#656, 'LHS.conflict.table2 AS conflict.table2#657, lookupB#453 AS lookupB#658, conflict#454 AS conflict#659]
#>             +- Join Inner, (group#648 = group#452)
#>                :- SubqueryAlias LHS
#>                :  +- Project [group#24 AS group#648, value1#25 AS value1#649, conflict#26 AS conflict.table1#650, lookupA#239 AS lookupA#651, conflict#240 AS conflict.table2#652]
#>                :     +- Join Inner, (group#24 = group#238)
#>                :        :- SubqueryAlias LHS
#>                :        :  +- SubqueryAlias test1
#>                :        :     +- LogicalRDD [group#24, value1#25, conflict#26], false
#>                :        +- SubqueryAlias RHS
#>                :           +- SubqueryAlias test2
#>                :              +- LogicalRDD [group#238, lookupA#239, conflict#240], false
#>                +- SubqueryAlias RHS
#>                   +- SubqueryAlias test3
#>                      +- LogicalRDD [group#452, lookupB#453, conflict#454], false
#> 
#>  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:143)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:140)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:106)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:134)
#>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
#>  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
#>  at scala.collection.immutable.List.map(List.scala:298)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:134)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:106)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:140)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:92)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:92)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:89)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:130)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:156)
#>  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
#>  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
#>  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
#>  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
#>  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
#>  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
#>  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
#>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
#>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
#>  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
#>  at sparklyr.Invoke.invoke(invoke.scala:147)
#>  at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
#>  at sparklyr.StreamHandler.read(stream.scala:61)
#>  at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:58)
#>  at scala.util.control.Breaks.breakable(Breaks.scala:42)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:39)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerC

Removing the . in the suffixes runs successfully.

inner_join(
  tbl(sc, "test1"),
  tbl(sc, "test2"),
  by = "group",
  suffix = c("table1", "table2")
) %>%
  inner_join(tbl(sc, "test3"),
             by = "group",
             suffix = c("join1", "table3")) -> x
head(x)
#> # Source: spark<?> [?? x 7]
#>   group  value1 conflicttable1 lookupA conflicttable2 lookupB conflict
#>   <chr>   <dbl> <chr>          <chr>   <chr>          <chr>   <chr>   
#> 1 B      0.554  df1            aB      df2            bB      df3     
#> 2 B      0.168  df1            aB      df2            bB      df3     
#> 3 B     -0.877  df1            aB      df2            bB      df3     
#> 4 B     -0.0242 df1            aB      df2            bB      df3     
#> 5 B      0.127  df1            aB      df2            bB      df3     
#> 6 A     -0.951  df1            aA      df2            bA      df3

Created on 2020-08-11 by the reprex package (v0.3.0)

@wkdavis
Copy link
Contributor Author

@wkdavis wkdavis commented Aug 11, 2020

Could the solution be as simple as writing join methods for tbl_spark that substitute underscores _ in the suffix argument?

inner_join.tbl_spark <-
  function(x,
           y,
           by = NULLL,
           copy = FALSE,
           suffix = c("_x", "_y"),
           auto_index = FALSE,
           ...,
           sql_on = NULL) {
    
    if (any(grepl("\\.", suffix))) {
      suffix <- gsub("\\.", "_", suffix)
      message("Replacing '.' with '_' in suffixes. New suffixes: ",
              paste(suffix, collapse = ", "))
    }
    
    NextMethod()
  }

inner_join(
  tbl(sc, "test1"),
  tbl(sc, "test2"),
  by = "group",
  suffix = c(".table1", ".table2")
) %>%
  inner_join(tbl(sc, "test3"),
             by = "group",
             suffix = c(".join1", ".table3")) -> x
#> Replacing '.' with '_' in suffixes. New suffixes: _table1, _table2
#> Replacing '.' with '_' in suffixes. New suffixes: _join1, _table3

head(x)
#> # Source: spark<?> [?? x 7]
#>   group   value1 conflict_table1 lookupA conflict_table2 lookupB conflict
#>   <chr>    <dbl> <chr>           <chr>   <chr>           <chr>   <chr>   
#> 1 B     -0.632   df1             aB      df2             bB      df3     
#> 2 B     -2.68    df1             aB      df2             bB      df3     
#> 3 A      1.13    df1             aA      df2             bA      df3     
#> 4 B     -0.966   df1             aB      df2             bB      df3     
#> 5 A      1.87    df1             aA      df2             bA      df3     
#> 6 A     -0.00613 df1             aA      df2             bA      df3

Created on 2020-08-11 by the reprex package (v0.3.0)

@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Aug 11, 2020

@wkdavis Good call!

Let me know if you want to send a pull request for this fix. Alternatively, I can create a pull request for it and list you as co-auhtor.

I think substituting '.' with '_' looks like the only viable solution here, given the restriction of permitted chars in a valid column/table names in Spark SQL.

@yitao-li yitao-li self-assigned this Aug 11, 2020
@yitao-li yitao-li added this to the 1.4.0 milestone Aug 11, 2020
@wkdavis
Copy link
Contributor Author

@wkdavis wkdavis commented Aug 11, 2020

@yl790 Thanks, that sounds good! I'll go ahead and create the PR.

@wkdavis
Copy link
Contributor Author

@wkdavis wkdavis commented Aug 12, 2020

@yl790 I was writing tests for my solution and came across something interesting. It looks like the problem only occurs on subsequent joins where a previous join had a conflict that resulted in the period suffix being used.

suppressPackageStartupMessages(library("dplyr"))
#> Warning: package 'dplyr' was built under R version 3.6.2
packageVersion("dplyr")
#> [1] '1.0.0'
library("sparklyr")
#> Warning: package 'sparklyr' was built under R version 3.6.2
packageVersion("sparklyr")
#> [1] '1.3.0'

conf <- spark_config()
conf$`sparklyr.shell.driver-memory` <- "8G"  
conf$spark.memory.fraction <- 0.8
sc <- spark_connect(master = "local", config = conf, version = "3.0.0")

s1 <- data.frame(group = sample(c("A", "B"),size = 10, replace = T),
                 value1 = rnorm(10),
                 conflict = "df1")
s2 <- data.frame(group = c("A","B"),
                 conflict = "df2")
s3 <- data.frame(group = c("A","B"),
                 conflict = "df3")

d1 <- copy_to(sc, s1, "test1", overwrite = TRUE)
d2 <- copy_to(sc, s2, "test2", overwrite = TRUE)
d3 <- copy_to(sc, s3, "test3", overwrite = TRUE)

left_join(d1, d2, by = c("group")) -> j1
head(j1)
#> # Source: spark<?> [?? x 4]
#>   group  value1 conflict.x conflict.y
#>   <chr>   <dbl> <chr>      <chr>     
#> 1 B     -0.423  df1        df2       
#> 2 A      0.0395 df1        df2       
#> 3 B     -0.292  df1        df2       
#> 4 B     -0.454  df1        df2       
#> 5 A      0.408  df1        df2       
#> 6 A     -1.17   df1        df2

d1 %>%
  left_join(d2, by = c("group")) -> j2
head(j2)
#> # Source: spark<?> [?? x 4]
#>   group  value1 conflict.x conflict.y
#>   <chr>   <dbl> <chr>      <chr>     
#> 1 B     -0.423  df1        df2       
#> 2 A      0.0395 df1        df2       
#> 3 B     -0.292  df1        df2       
#> 4 B     -0.454  df1        df2       
#> 5 A      0.408  df1        df2       
#> 6 A     -1.17   df1        df2

Both joins above succeeded despite having duplicates - they successfully appended column names with .. Only now, in future joins, does it become a problem.

left_join(j1, d3, by = "group")
#> Error: org.apache.spark.sql.AnalysisException: cannot resolve '`LHS.conflict.x`' given input columns: [RHS.conflict, LHS.conflict.x, LHS.conflict.y, RHS.group, LHS.group, LHS.value1]; line 1 pos 61;
#> 'Project [group#941 AS group#945, value1#942 AS value1#946, 'LHS.conflict.x AS conflict.x#947, 'LHS.conflict.y AS conflict.y#948, conflict#395 AS conflict#949]
#> +- Join LeftOuter, (group#941 = group#394)
#>    :- SubqueryAlias LHS
#>    :  +- Project [group#24 AS group#941, value1#25 AS value1#942, conflict#26 AS conflict.x#943, conflict#238 AS conflict.y#944]
#>    :     +- Join LeftOuter, (group#24 = group#237)
#>    :        :- SubqueryAlias LHS
#>    :        :  +- SubqueryAlias test1
#>    :        :     +- LogicalRDD [group#24, value1#25, conflict#26], false
#>    :        +- SubqueryAlias RHS
#>    :           +- SubqueryAlias test2
#>    :              +- LogicalRDD [group#237, conflict#238], false
#>    +- SubqueryAlias RHS
#>       +- SubqueryAlias test3
#>          +- LogicalRDD [group#394, conflict#395], false
#> 
#>  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:143)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:140)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:106)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:134)
#>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
#>  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
#>  at scala.collection.immutable.List.map(List.scala:298)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:134)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:106)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:140)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:92)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:92)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:89)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:130)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:156)
#>  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
#>  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
#>  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
#>  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
#>  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
#>  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
#>  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
#>  at jdk.internal.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
#>  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
#>  at sparklyr.Invoke.invoke(invoke.scala:147)
#>  at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
#>  at sparklyr.StreamHandler.read(stream.scala:61)
#>  at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:58)
#>  at scala.util.control.Breaks.breakable(Breaks.scala:42)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:39)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
#>  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
#>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:6

left_join(j2, d3, by = "group")
#> Error: org.apache.spark.sql.AnalysisException: cannot resolve '`LHS.conflict.x`' given input columns: [RHS.conflict, LHS.conflict.x, LHS.conflict.y, RHS.group, LHS.group, LHS.value1]; line 1 pos 61;
#> 'Project [group#950 AS group#954, value1#951 AS value1#955, 'LHS.conflict.x AS conflict.x#956, 'LHS.conflict.y AS conflict.y#957, conflict#395 AS conflict#958]
#> +- Join LeftOuter, (group#950 = group#394)
#>    :- SubqueryAlias LHS
#>    :  +- Project [group#24 AS group#950, value1#25 AS value1#951, conflict#26 AS conflict.x#952, conflict#238 AS conflict.y#953]
#>    :     +- Join LeftOuter, (group#24 = group#237)
#>    :        :- SubqueryAlias LHS
#>    :        :  +- SubqueryAlias test1
#>    :        :     +- LogicalRDD [group#24, value1#25, conflict#26], false
#>    :        +- SubqueryAlias RHS
#>    :           +- SubqueryAlias test2
#>    :              +- LogicalRDD [group#237, conflict#238], false
#>    +- SubqueryAlias RHS
#>       +- SubqueryAlias test3
#>          +- LogicalRDD [group#394, conflict#395], false
#> 
#>  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:143)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:140)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:106)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:134)
#>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
#>  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
#>  at scala.collection.immutable.List.map(List.scala:298)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:134)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:106)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:140)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:92)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:92)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:89)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:130)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:156)
#>  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
#>  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
#>  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
#>  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
#>  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
#>  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
#>  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
#>  at jdk.internal.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
#>  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
#>  at sparklyr.Invoke.invoke(invoke.scala:147)
#>  at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
#>  at sparklyr.StreamHandler.read(stream.scala:61)
#>  at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:58)
#>  at scala.util.control.Breaks.breakable(Breaks.scala:42)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:39)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
#>  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
#>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:6

Created on 2020-08-12 by the reprex package (v0.3.0)

The error message is interesting to me:

cannot resolve 'LHS.conflict.x' given input columns: [RHS.conflict, LHS.conflict.x, LHS.conflict.y, RHS.group, LHS.group, LHS.value1]

It says it can't resolve the column LHS.conflict.x, but it clearly shows that column in the list of input columns. I'm not sure if switching to underscores is the right solution now. It almost seems like more of an escaping issue, perhaps in dbplyr? If you still support the use of underscores I can submit that PR. I was ready to submit the PR until I was able to run this part without error:

left_join(d1, d2, by = c("group")) -> j1
head(j1)
#> # Source: spark<?> [?? x 4]
#>   group  value1 conflict.x conflict.y
#>   <chr>   <dbl> <chr>      <chr>     
#> 1 B     -0.423  df1        df2       
#> 2 A      0.0395 df1        df2       
#> 3 B     -0.292  df1        df2       
#> 4 B     -0.454  df1        df2       
#> 5 A      0.408  df1        df2       
#> 6 A     -1.17   df1        df2

and then I began to question if the underscores were the right solution.

@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Aug 12, 2020

@wkdavis That's interesting!!

It definitely looks like a quoting issue with dbplyr, or a combination of quoting issue and other inter-op issues with Spark. I guess the best way to debug this is to run dplyr::left_join(...) %>% dplyr::show_query() to see what's quoted with backticks and what's not.

To debug something like this further, you might also need to run spark_df %>% dbplyr::remote_name() to show the name of the Spark temporary view of spark_df, and if it's NULL, then spark_df <- spark_df %>% dplyr::compute() to create a temp view containing some intermediate query results, and finally, DBI::dbGetQuery(sc, "<sql>") to run some debugging Spark SQL on a temporary view directly.

Also, if I remember correctly, Spark SQL actually allows dot in column names in presence of backticks (e.g., one can have
select (backtick)a.b(backtick) from table_c).

@wkdavis
Copy link
Contributor Author

@wkdavis wkdavis commented Aug 12, 2020

@yl790 I think we're on to it. I was not familiar with dbplyr::remote_name() or dplyr::compute(). Running compute() (below) seems to point to dbplyr

suppressPackageStartupMessages(library("dplyr"))
#> Warning: package 'dplyr' was built under R version 3.6.2
packageVersion("dplyr")
#> [1] '1.0.0'
library("sparklyr")
#> Warning: package 'sparklyr' was built under R version 3.6.2
packageVersion("sparklyr")
#> [1] '1.3.0'

conf <- spark_config()
conf$`sparklyr.shell.driver-memory` <- "8G"  
conf$spark.memory.fraction <- 0.8
sc <- spark_connect(master = "local", config = conf, version = "3.0.0")

df1 <- data.frame(group = sample(c("A", "B"),size = 10, replace = T),
                   value1 = rnorm(10),
                  conflict = "df1")
df2 <- data.frame(group = c("A","B"),
                  conflict = "df2")

dt1 <- copy_to(sc, df1, "test1", overwrite = TRUE)
dt2 <- copy_to(sc, df2, "test2", overwrite = TRUE)

inner_join(
  dt1,
  dt2,
  by = "group"
) %>% compute()
#> Error: org.apache.spark.sql.AnalysisException: cannot resolve '`conflict.x`' given input columns: [dbplyr_001.conflict.x, dbplyr_001.conflict.y, dbplyr_001.group, dbplyr_001.value1]; line 1 pos 26;
#> 'Project [group#525, value1#526, 'conflict.x AS conflict.x#529, 'conflict.y AS conflict.y#530]
#> +- SubqueryAlias dbplyr_001
#>    +- Project [group#24 AS group#525, value1#25 AS value1#526, conflict#26 AS conflict.x#527, conflict#238 AS conflict.y#528]
#>       +- Join Inner, (group#24 = group#237)
#>          :- SubqueryAlias LHS
#>          :  +- SubqueryAlias test1
#>          :     +- LogicalRDD [group#24, value1#25, conflict#26], false
#>          +- SubqueryAlias RHS
#>             +- SubqueryAlias test2
#>                +- LogicalRDD [group#237, conflict#238], false
#> 
#>  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:143)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:140)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:106)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:134)
#>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
#>  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
#>  at scala.collection.immutable.List.map(List.scala:298)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:134)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:106)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:140)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:92)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:92)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:89)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:130)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:156)
#>  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
#>  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
#>  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
#>  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
#>  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
#>  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
#>  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
#>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
#>  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
#>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
#>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
#>  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
#>  at sparklyr.Invoke.invoke(invoke.scala:147)
#>  at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
#>  at sparklyr.StreamHandler.read(stream.scala:61)
#>  at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:58)
#>  at scala.util.control.Breaks.breakable(Breaks.scala:42)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:39)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
#>  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
#>  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
#>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
#>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
#>  at io.netty.channel.nio.NioEventLoop.run(NioEv

Which is interesting given the fact that this prints without a problem:

left_join(d1, d2, by = c("group")) -> j1
head(j1)
#> # Source: spark<?> [?? x 4]
#>   group  value1 conflict.x conflict.y
#>   <chr>   <dbl> <chr>      <chr>     
#> 1 B     -0.423  df1        df2       
#> 2 A      0.0395 df1        df2       
#> 3 B     -0.292  df1        df2       
#> 4 B     -0.454  df1        df2       
#> 5 A      0.408  df1        df2       
#> 6 A     -1.17   df1        df2

The lazy nature of the "local" object seems to be mis-representing the actual state of the "remote" object.

Created on 2020-08-12 by the reprex package (v0.3.0)

@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Aug 12, 2020

@wkdavis the only use for dplyr::left_join(...) %>% dplyr::compute() is to force the generated Spark SQL query to be evaluated (because otherwise everything is evaluated lazily), but sometimes for non-complicated queries, one can also just eye-ball the output from dplyr::left_join(...) %>% dplyr::show_query() and see what went wrong without even asking Spark to evaluate the entire query.

@wkdavis
Copy link
Contributor Author

@wkdavis wkdavis commented Aug 12, 2020

@yl790 Based on the output above it looks like dbplyr doesn't like that it is receiving back-ticks from Spark since dbplyr also uses back-ticks to separate table and column names. You can see the problem below in this line:

SELECT `LHS`.`group` AS `group`, `LHS`.`value1` AS `value1`, `LHS`.`conflict`.`x` AS `conflict.x`

Because it parses on the ., conflict.x is no longer present on the LHS since it is now LHS`.`conflict`.`x

inner_join(
  dt1,
  dt2,
  by = "group"
)  %>% dplyr::show_query()
#> <SQL>
#> SELECT `LHS`.`group` AS `group`, `LHS`.`value1` AS `value1`, `LHS`.`conflict` AS `conflict.x`, `RHS`.`conflict` AS `conflict.y`
#> FROM `test1` AS `LHS`
#> INNER JOIN `test2` AS `RHS`
#> ON (`LHS`.`group` = `RHS`.`group`)

inner_join(
  dt1,
  dt2,
  by = "group"
) %>%
  inner_join(dt3, by = "group") %>% dplyr::show_query()
#> <SQL>
#> SELECT `LHS`.`group` AS `group`, `LHS`.`value1` AS `value1`, `LHS`.`conflict`.`x` AS `conflict.x`, `LHS`.`conflict`.`y` AS `conflict.y`, `RHS`.`conflict` AS `conflict`
#> FROM (SELECT `LHS`.`group` AS `group`, `LHS`.`value1` AS `value1`, `LHS`.`conflict` AS `conflict.x`, `RHS`.`conflict` AS `conflict.y`
#> FROM `test1` AS `LHS`
#> INNER JOIN `test2` AS `RHS`
#> ON (`LHS`.`group` = `RHS`.`group`)
#> ) `LHS`
#> INNER JOIN `test3` AS `RHS`
#> ON (`LHS`.`group` = `RHS`.`group`)

Created on 2020-08-12 by the reprex package (v0.3.0)

@wkdavis
Copy link
Contributor Author

@wkdavis wkdavis commented Aug 12, 2020

It looks like in copy_to() the name change already takes place:. Any column with a . is replaced with the _. So I guess to be consistent with that the solution would be to use the _ in the join suffix.

df4 <- data.frame(group.name = c("A","B"),
                  conflict = "df4")
dt4 <- copy_to(sc, df4, "test4", overwrite = TRUE)
dt4
#> # Source: spark<test4> [?? x 2]
#>   group_name conflict
#>   <chr>      <chr>   
#> 1 A          df4     
#> 2 B          df4

Created on 2020-08-12 by the reprex package (v0.3.0)

@yitao-li
Copy link
Contributor

@yitao-li yitao-li commented Aug 12, 2020

@wkdavis Yep -- completely agree. I just tried a bunch of examples myself and have seen there is not really any way for dbplyr to work with column names containing dot without getting confused.

@wkdavis
Copy link
Contributor Author

@wkdavis wkdavis commented Aug 12, 2020

PR: #2651

@yitao-li yitao-li linked a pull request Aug 12, 2020 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants