Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdf_pivot returning AnalysisException due to ambiguous column name #1063

Closed
samurainate opened this issue Oct 10, 2017 · 3 comments
Closed
Labels

Comments

@samurainate
Copy link

samurainate commented Oct 10, 2017

I am trying to pivot a table with sdf_pivot. It has worked before, but now I am running into an analysis exception from catalyst over an ambiguous column name. Two things are different: I had to raise spark.sql.pivotMaxValues to accommodate a larger vocabulary, and I changed the metric that I am summing in the pivot from one numeric column to another. Maybe different code triggered by the larger pivotMaxValues? The vocabulary size is 11,310. I have run with over 9000 values before, similarly formatted as 'Key:Value' strings. Current limit is set to 15000. I am running Spark 2.0.0, on Hadoop 2.7 (HDP 2.5.3) in cluster mode.

UPDATE: I have tested incrementally increasing the vocab size up from 1000 to 10000 with default pivotMaxValues. It appears that only specific values in the pivot column lead to this exception. I can work around by adding them to a filter prior to calling sdf_pivot like

filter(! word %in% c('Subject:Civil_War','Subject:Space_Travel','Setting:Air_Force_base'))

Why these values cause problems, I have no idea. All values have colons, and most values have underscores if the original contained a space.

Code

agg.corpus <- corpus %>%
    select(document, tfidf, word) %>%
    sdf_pivot(document ~ word, fun.aggregate=list(tfidf="sum")) %>% # pivot and sum weight per word
    select(-document) %>%
    na.replace(0)

Error

Error: org.apache.spark.sql.AnalysisException: Reference 'Setting:Air_Force_base' is ambiguous, could be: Setting:Air_Force_base#74356, Setting:Air_Force_base#74358.; line 1 pos 281395
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:148)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5$$anonfun$31.apply(Analyzer.scala:604)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5$$anonfun$31.apply(Analyzer.scala:604)
	at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:604)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:600)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:191)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:201)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:205)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:205)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$5.apply(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:600)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:542)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:542)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:479)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at sparklyr.Invoke$.invoke(invoke.scala:102)
	at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
	at sparklyr.StreamHandler$.read(stream.scala:62)
	at sparklyr.BackendHandler.channelRead0(handler.scala:52)
	at sparklyr.BackendHandler.channelRead0(handler.scala:14)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:745)
trace:
withr::with_options(list(warning.length = 8000), {
    if (nzchar(msg)) {
        core_handle_known_errors(sc, msg)
        stop(msg, call. = FALSE)
    }
    else {
        msg <- core_read_spark_log_error(sc)
        stop(msg, call. = FALSE)
    }
})
core_invoke_method(sc, static, object, method, ...)
invoke_method.spark_shell_connection(spark_connection(jobj), 
    FALSE, jobj, method, ...)
invoke_method(spark_connection(jobj), FALSE, jobj, method, ...)
invoke.shell_jobj(hive, "sql", sql)
invoke(hive, "sql", sql)
spark_dataframe.tbl_spark(object)
spark_dataframe(object)
na.replace(spark_dataframe(object), ...)
na.replace.tbl_spark(., 0)
na.replace(., 0)
function_list[[k]](value)
withVisible(function_list[[k]](value))
freduce(value, `_function_list`)
`_fseq`(`_lhs`)
eval(expr, envir, enclos)
eval(quote(`_fseq`(`_lhs`)), env, env)
withVisible(eval(quote(`_fseq`(`_lhs`)), env, env))
corpus %>% select(document, tfidf, word) %>% sdf_pivot(document ~ 
    word, fun.aggregate = list(tfidf = "sum")) %>% select(-document) %>% 
    na.replace(0)
@hhch
Copy link

hhch commented Nov 23, 2017

I encounter the similar problem

@tarunparmar
Copy link

Not sure if this will help. I don't remember the exact error, but I had column names with -- hyphens and replacing them with _ underscore helped in my case.

@harishrvt
Copy link

I had the same issue and realized that its due to 2 (or more) of the columns having the same name. In your case Setting:Air_Force_base repeats twice, maybe with different upper/lower case values. I appended a unique row number and it worked fine for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants