Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unnest() fails on Databricks #3267

Open
tschutte opened this issue Jun 10, 2022 · 5 comments
Open

unnest() fails on Databricks #3267

tschutte opened this issue Jun 10, 2022 · 5 comments

Comments

@tschutte
Copy link

I have an interesting issue running sparklyr::unnest() inside of Databricks. The first time I use the function after a cluster restart, the function works perfectly. However, if I either restart my R session when using RStudio, or if I detach/reattach a notebook, the function will fail claiming it cannot locate a tmp table.

This is reproducible using the default sparklyr 1.7.5 provided by Databricks and also if I upgrade to 1.7.7. I am also using Databricks 10.4.

I assume the error is coming out of the creation of the temporary table near the bottom of this function, but it is not apparent to me what would cause this error, especially only upon a restart of the RSession.

Here is an easily reproducible example using a simple test case.

Let me know if I can provide any other info to help troubleshoot.

library(sparklyr)

sc <- spark_connect(method = "databricks")

tbl <- tibble::tibble(
  a = c(1, 1, 2, 2, 3, 3, 3),
  b = lapply(seq(7), function(x) rep(1, x)),
  c = seq(7),
  d = lapply(seq(7), function(x) list(a = x, b = 2 * x, c = -x)),
  e = seq(-7, -1)
)
sdf <- copy_to(sc, tbl, overwrite = TRUE)
sdf.nested <- sdf %>% tidyr::nest(n1 = c(b, c), n2 = c(d, e))

sdf.nested %>%
  tidyr::unnest(c(n1, n2)) %>%
  collect() %>%
  dplyr::arrange(c)

Error


Error : org.apache.spark.sql.AnalysisException: Table or view not found: sparklyr_tmp_650317cf_7698_4caf_a5a8_992cf23e1ac1; line 2 pos 5;
'Project [*]
+- 'Filter (0 = 1)
   +- 'SubqueryAlias q01
      +- 'UnresolvedRelation [sparklyr_tmp_650317cf_7698_4caf_a5a8_992cf23e1ac1], [], false

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$2(CheckAnalysis.scala:137)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$2$adapted(CheckAnalysis.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:358)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:357)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:357)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:357)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:357)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:357)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:357)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:357)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:357)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:357)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:104)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:99)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:99)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:222)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:276)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:331)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:273)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:128)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:151)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:265)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:265)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:129)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:126)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:118)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:101)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:803)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:798)
	at sun.reflect.GeneratedMethodAccessor227.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at sparklyr.Invoke.invoke(invoke.scala:161)
	at sparklyr.StreamHandler.handleMethodCall(stream.scala:141)
	at sparklyr.StreamHandler.read(stream.scala:62)
	at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:60)
	at scala.util.control.Breaks.breakable(Breaks.scala:42)
	at sparklyr.BackendHandler.channelRead0(handler.scala:41)
	at sparklyr.BackendHandler.channelRead0(handler.scala:14)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Session Info

R version 4.1.2 (2021-11-01)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 20.04.4 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.9.0
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.9.0

locale:
 [1] LC_CTYPE=C.UTF-8       LC_NUMERIC=C           LC_TIME=C.UTF-8       
 [4] LC_COLLATE=C.UTF-8     LC_MONETARY=C.UTF-8    LC_MESSAGES=C.UTF-8   
 [7] LC_PAPER=C.UTF-8       LC_NAME=C              LC_ADDRESS=C          
[10] LC_TELEPHONE=C         LC_MEASUREMENT=C.UTF-8 LC_IDENTIFICATION=C   

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] sparklyr_1.7.7

loaded via a namespace (and not attached):
 [1] pillar_1.7.0       compiler_4.1.2     dbplyr_2.1.1       TeachingDemos_2.10
 [5] r2d3_0.2.5         base64enc_0.1-3    tools_4.1.2        uuid_1.0-3        
 [9] digest_0.6.29      jsonlite_1.8.0     lifecycle_1.0.1    tibble_3.1.6      
[13] pkgconfig_2.0.3    rlang_1.0.1        DBI_1.1.2          cli_3.2.0         
[17] rstudioapi_0.13    yaml_2.3.5         parallel_4.1.2     fastmap_1.1.0     
[21] withr_2.4.3        dplyr_1.0.8        hwriter_1.3.2      httr_1.4.2        
[25] generics_0.1.2     vctrs_0.3.8        htmlwidgets_1.5.4  rprojroot_2.0.2   
[29] tidyselect_1.1.2   glue_1.6.1         forge_0.2.0        R6_2.5.1          
[33] fansi_1.0.2        purrr_0.3.4        tidyr_1.2.0        SparkR_3.2.0      
[37] blob_1.2.2         magrittr_2.0.2     hwriterPlus_1.0-3  ellipsis_0.3.2    
[41] htmltools_0.5.2    assertthat_0.2.1   Rserve_1.8-10      config_0.3.1      
[45] utf8_1.2.2         crayon_1.5.0      
@edgararuiz
Copy link
Collaborator

Hi, I wonder if after closing a connection, and reconnecting, the ability to create Temp Views is restricted.

Can you try creating a Temp View manually using DBI as shown below? The test should happen after you disconnect and reconnect

tbl <- tibble::tibble(
  a = c(1, 1, 2, 2, 3, 3, 3),
  b = lapply(seq(7), function(x) rep(1, x)),
  c = seq(7),
  d = lapply(seq(7), function(x) list(a = x, b = 2 * x, c = -x)),
  e = seq(-7, -1)
)
sdf <- copy_to(sc, tbl, overwrite = TRUE)
sdf.nested <- sdf %>% tidyr::nest(n1 = c(b, c), n2 = c(d, e))

DBI::dbGetQuery(sc, "CREATE TEMPORARY VIEW `my_test` AS (SELECT * FROM tbl)")
dplyr::tbl(sc, "my_test")

@tschutte
Copy link
Author

This does work fine.

I was doing some debugging, and it seems like the failure occurs in the call to sdf_register where the invoke of createOrReplaceView isn't actually creating the view (invoked from sdf_fast_bind_cols). Why this is however I cannot figure out yet.

@edgararuiz
Copy link
Collaborator

Hi @tschutte , is this something that still giving you problems in later versions of Databricks clusters?

@tschutte
Copy link
Author

Yes, this is still occurring with sparklyr 1.8.1 and DBR 12.2.

@romangehrn
Copy link

I have exactly the same issue, still with DBR 14.3 and sparklyr 1.8.1, it would be really helpful if that would work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants