Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Chapter 3 - Use the CROSS JOIN syntax to allow cartesian products between these relations #136

Closed
necronet opened this issue Dec 2, 2019 · 4 comments

Comments

@necronet
Copy link

necronet commented Dec 2, 2019

When trying to run this piece of code from chp3:

model.transform(toRecommend).select("artist","prediction").orderBy($"prediction".desc).limit(howMany)

I get:

Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1124)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1121)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

This gets fix if I use:
toRecommend.cache()
I'm not sure why is this the case though

@necronet necronet changed the title Chapter 3 - Use the CROSS JOIN syntax to allow cartesian products between these relations [Question] Chapter 3 - Use the CROSS JOIN syntax to allow cartesian products between these relations Dec 2, 2019
@srowen
Copy link
Collaborator

srowen commented Dec 2, 2019

Hm, that's weird. I don't recall ever seeing it, though it's been a long while since I ran the code. There aren't any other modifications?

There isn't a join in that line of course. Are you sure it's not triggering elsewhere? or definitely happens only as soon as you execute this line?

Or what happens if you execute that line with .explain() at the end?

@necronet
Copy link
Author

necronet commented Dec 7, 2019

Sorry for the delayed I did not realize that this issue had a response, this is the explain():

== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for LEFT OUTER join between logical plans
Project [_1#153 AS artist#160]
+- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#153, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#154]
   +- ExternalRDD [obj#152]
and
Project [_2#144 AS features#147]
+- Filter (UDF(2093760) = _1#143)
   +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#143, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#144]
      +- ExternalRDD [obj#142]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;```

And once I set the toRecommend.cache()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[prediction#295 DESC NULLS LAST], output=[artist#250,prediction#295])
+- *(9) Project [artist#250, UDF(features#237, features#247) AS prediction#295]
   +- SortMergeJoin [UDF(artist#250)], [id#246], LeftOuter
      :- *(6) Sort [UDF(artist#250) ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(UDF(artist#250), 200)
      :     +- *(5) Project [artist#250, features#237]
      :        +- SortMergeJoin [UDF(user#252)], [id#236], LeftOuter
      :           :- *(2) Sort [UDF(user#252) ASC NULLS FIRST], false, 0
      :           :  +- Exchange hashpartitioning(UDF(user#252), 200)
      :           :     +- *(1) InMemoryTableScan [artist#250, user#252]
      :           :           +- InMemoryRelation [artist#250, user#252], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      :           :                 +- *(1) Project [_1#243 AS artist#250, 2093760 AS user#252]
      :           :                    +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#243, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#244]
      :           :                       +- Scan ExternalRDDScan[obj#242]
      :           +- *(4) Sort [id#236 ASC NULLS FIRST], false, 0
      :              +- Exchange hashpartitioning(id#236, 200)
      :                 +- *(3) Project [_1#233 AS id#236, _2#234 AS features#237]
      :                    +- *(3) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#233, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#234]
      :                       +- Scan ExternalRDDScan[obj#232]
      +- *(8) Sort [id#246 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#246, 200)
            +- *(7) Project [_1#243 AS id#246, _2#244 AS features#247]
               +- *(7) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#243, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#244]
                  +- Scan ExternalRDDScan[obj#242]

@srowen
Copy link
Collaborator

srowen commented Dec 7, 2019

Hm, I also have no idea. If I can reproduce it later, maybe I'll just make the change you suggest.

@jesadrperez
Copy link

I got the same error and got it to work with the suggested addition of toRecommend.cache().

@srowen srowen closed this as completed Jul 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants