-
-
Notifications
You must be signed in to change notification settings - Fork 138
Joins with boolean column expressions, 2nd attempt #175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #175 +/- ##
==========================================
- Coverage 96.99% 96.56% -0.44%
==========================================
Files 52 51 -1
Lines 866 874 +8
Branches 10 11 +1
==========================================
+ Hits 840 844 +4
- Misses 26 30 +4
Continue to review full report at Codecov.
|
|
@kanterov Would you have time for a review? |
|
@OlivierBlanvillain looking into it now! :) |
|
👍🏻 on explicit casts. |
|
The trick with I tried to play with implicit tricks we discussed #162, and everything seems to break inference in IntelliJ. The only thing that worked properly was: /** Lifts typed columns in join expressions */
trait Join[L, R]
object Join {
def apply[L, R](): Join[L, R] = new Join[L, R] {}
}
implicit class LeftSyntax[L, U](c: TypedColumn[L, U]) {
def left[R](scope: Join[L, R]): TypedColumn[Join[L, R], U] = new TypedColumn(c.expr)(c.uencoder)
}
implicit class RightSyntax[R, U](c: TypedColumn[R, U]) {
def right[L](scope: Join[L, R]): TypedColumn[Join[L, R], U] = new TypedColumn(c.expr)(c.uencoder)
}With def joinInner[U](other: TypedDataset[U])(condition: Join[T, U] => TypedColumn[Join[T, U], Boolean])And user code: leftDs
.joinLeft(rightDs)(s => leftDs.col('a).left(s) === rightDs.col('a).right(s))As you can see, it's super explicit lifting of everything. It doesn't even work in I feel we can solve any syntactic problem with investing into two macros: def expr(f: A => B): TypedColumn[A, B]
def joinExpr(f: (A, B) => C): TypedColumn[Join[A, B], C]And having a macro-free way that doesn't necessarily has nice syntax, but must have nice type inference. |
|
One more thing, it's important to distinguish column that comes from the left dataset, and from the right dataset. Consider expression: ds.joinInner(ds)(s => ds.col('a).left(s) === ds.col('a).right(s))vs. ds.joinInner(ds)(s => ds.col('a).left(s) === ds.col('a).left(s))Now Spark fixed this only for equality (see In vanilla Spark API, in the case joined datasets aren't exactly the same, such ambiguity can be resolved by explicitly building expressions from a left and right datasets like: val a: TypedDataset[X1[A]]
val b: TypedDataset[X1[A]]
a.joinInner(b, a.col('a) === b.col('a))But this is very fragile to any transformations, for instance: a.joinInner(b.filter(...), a.col('a) === b.col('a))In this sense, being able to determine that column belongs to a right or left part would allow us to resolve this problem. Unfortunately, approach with |
|
@kanterov thanks for the quick feedback :)
Do you have some example I could play with? The test suite compiled fine after these changes, I thought that was enough ;)
I didn't follow you here. Do we need this distinction at the typelevel? The proposed API mirrors vanilla Spark, it's possible to do I feel I'm missing something, maybe an executable example would help me better understand the issue. |
|
@OlivierBlanvillain , @kanterov do you guys want to wait for this to close before cutting 0.4? |
|
ping @kanterov |
|
@OlivierBlanvillain sorry for the delay, I'm on vacation and have limited access to my machine, the problem comes from the fact, that column expressions like Scala compiler infers that Another problem is the way Spark resolves columns, I've prepared this code snippet to illustrate different kinds of problems https://gist.github.com/kanterov/49014d462b265eaf54353f09f0ac517a. Please let me know what do you think. |
|
I did a quick experiment and I think the trait Col[T, U]
trait TA
trait TB
val t1: Col[TA, Boolean] = null
val t2: Col[TA, Boolean] = null
val t3: Col[TB, Boolean] = null
// That won't be free in term of compile time, but I think if would still be
// better than a Coproduct representation.
trait With[A, B] { type Out }
trait LowPrioWith {
implicit def identity[T]: With[T, T] { type Out = T } = null
}
object With extends LowPrioWith {
implicit def combine[A, B]: With[A, B] { type Out = A with B } = null
}
def ++[A, B, U](a: Col[A, U], b: Col[B, U])(implicit w: With[A, B]): Col[w.Out, U] =null
scala> ++(t1, t3)
res4: Col[TA with TB,Boolean] = null
scala> ++(t1, t2)
res5: Col[TA,Boolean] = nullThanks for the gist, I will have have a closer look later! |
|
@OlivierBlanvillain is this PR still in progress? Can we merge parts of it? |
|
No I don't think it's partially mergeable... |
e40c62c to
3acca14
Compare
|
@kanterov I think I finally addressed your review, it only took 6 months 😅 The overall diff might be getting too large, so you might want to look at commits instead. Here are the two interesting ones: |
|
@OlivierBlanvillain This looks amazing ✨. Have been trying to break this with funky joins on options, collections, etc. Works great so far. |
| * apache/spark | ||
| */ | ||
| def =!=(other: TypedColumn[T, U]): TypedColumn[T, Boolean] = withExpr { | ||
| def =!=[TT, W](other: TypedColumn[TT, U])(implicit w: With.Aux[T, TT, W]): TypedColumn[W, Boolean] = withExpr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we go a bit over the With type class? why do we need to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I answered in the main thread to not lose the message on rebase
| } | ||
|
|
||
| /** Left hand side disambiguation of `col` for join expressions. | ||
| * To be used when writting self-joins, noop in other circonstances. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
French : )? circonstances
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🇫🇷
So, suppose you want to write a funky join: Where dsA and dsB have unrelated types A and B. The signature of You might wonder, why not just write |
|
Hey, @OlivierBlanvillain. While testing I found this: scala> t.joinFull(t2)(t2('_1) === t2('_1)).show().run
org.apache.spark.sql.AnalysisException: Detected cartesian product for FULL OUTER join between logical plans
LocalRelation [_1#2, _2#3]
and
LocalRelation [_1#13, _2#14]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) |
|
@imarios This is the expected behavior. the join condition you wrote is trivially true, so it's equivalent to writing If you want to write a self join, you need to use the explicit t2.joinFull(t2)(t2.colLeft('_1) === t2.colRight('_1)).show().runBut there is not much we can do about trivial join condition, for example, you also explicitly write t2.joinFull(t2)(t2.colLeft('_1) === t2.colLeft('_1)).show().run |
|
@OlivierBlanvillain I see that the condition doesn't make sense, but probably this will be the only Frameless op that leads to run-time error but we say it's an expected behavior. Currently, anything that would cause a runtime exception gives compilation error, which is pretty much our sale point. The only current exception that we know of is explode(), which we already classified as a bug. I feel that any join you do that doesn't involve cross-dataset comparison should not compile. Can't we just enforce that the joined condition (Boolean column) is either of type A with B or B with A, but not A with A, B with B, or A with C etc.? scala> t.joinFull(t2)(t2('_1) === t2('_2).cast[String]).show().run
org.apache.spark.sql.AnalysisException: Detected cartesian product for FULL OUTER join between logical plans
LocalRelation [_1#110, _2#111]
and
LocalRelation [_1#121, _2#122]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) |
|
I think all my above examples hit corner case where we try to join two datasets with the same schema. |
It's more complicated than that, for instance expressions like conf.set("spark.sql.selfJoinAutoResolveAmbiguity", "true")
conf.set("spark.sql.crossJoin.enabled", "true")There is no way to enforce that at compile time, so I'm not sure what's would be the best option to enforce such configuration... (we already have the first requirement, see here) |
|
Fair enough, I guess we can always try to improve on this on a seperate PR. Do you think we can do better in terms of test coverage? |
|
Sure, I know what's missing. I'm just waiting for #153 to basically redo everything on |
3acca14 to
536a529
Compare
|
Hey @OlivierBlanvillain this is the last code related PR for 0.5! I know you must be busy, so whenever you have the time. Thank you! |
43142cd to
3309e76
Compare
Implicit conversions don't compose; I propose we simply cast everything explicitly.
With computes the intersection of two types while leaving less noise than A with B: - With[A, A] = A - With[A, B] = A with B (when A != B) This type function is needed to prevent IDEs from infering large types with shape `A with A with ... with A`. These types could be confusing for both end users and IDE's type checkers.
This commit introduces a disambiguation mechanism for self-joins. The idea is
to use ds.colLeft('a) and ds.colRight('a) instead of ds.col('a) when writing
join expression, where the Left and Right suffix are use to tag the column for
being on the left hand side of on the right hand side of the join expression.
The implementation adds two new nodes to the Spark IR DisambiguateLeft and
DisambiguateRight, which behave at dummies (just forward to underlying) but
are intersepted at runtime when building the join plans to actually implement
the disambiguation.
3309e76 to
90fbbf8
Compare
|
@imarios It's rebased and should be hopefully be green 😄 |
|
Looks good @OlivierBlanvillain! Do you mind the code coverage concerns from Travis? |
|
The two lines not covered are these two, not sure how to trigger genCode but here I simply forwards to the underlying |
This PR replaces #162 with a nicer solution that is both safer than the original approach (columns keep their "source dataset" phantom type) and simpler than the
implicit CanAccessapproach.The trick is to abuse Scala type system to model "a boolean columns coming from either table A or B" as
TypedColumn[A with B, Boolean]. The only changes that makes this possible are on theTypedColumnoperation which now take an extra type parameter for the second argument of binary operations and combines the phantom types as expected. From a uses perspective nothing changes:The implicit conversions using for the widden import broke with this change. I propose to remove this mechanism altogether and require users to explicitly cast when they convert columns from one type to another. Implicit conversion do not compose and break type inference left and right, these changes are a good illustration of that.