-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix udfs #99
Conversation
During testing, I've discovered a bug with |
@kanterov man this is amazing but so complex. It will take me hours to review :). Did you figure out the issue with collect? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm totally unfamiliar with Spark's udf
and the CodeGen
APIs used in this PR, I cannot do more than a superficial review...
Question: is (or could?) the code complied for evaluation be cached?
} | ||
|
||
check(forAll(prop[Int, Int, Int] _)) | ||
check(forAll(prop[String, Int, Int] _)) | ||
check(forAll(prop[Option[Int], X2[Double, Long], Int] _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove these checks? I looks like you don't have any X2
in the new ones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to implementation details, there shouldn't be a difference between X1
and X2
because they are both encoded using the same type of encoder. I wanted to simplify things a bit.
new TypedColumn[T, R](scalaUdf) | ||
} | ||
} | ||
|
||
case class FramelessUdf[T, R]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be private[functions]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should stop people from accessing internals if they want, what if we document this as internal?
new TypedColumn[T, R](scalaUdf) | ||
} | ||
} | ||
|
||
case class FramelessUdf[T, R]( | ||
function: AnyRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand where function
is used inside of FramelessUdf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's tricky, it used in generated code:
ctx.addMutableState(funcClassName, funcTerm,
s"this.$funcTerm = ($funcClassName)((($framelessUdfClassName)references" +
s"[$funcExpressionIdx]).function());")
@kanterov can you run this test with your changes? test("x") {
val f = TypedDataset.create((1,Vector(2,2,3)) :: (1,Vector(2,4)) :: Nil)
val ooo = f.makeUDF( (v: Vector[Int]) => v.sum )
f.select( f('_1), ooo(f('_2))).show().run()
} |
@imarios yeah, it works now |
@OlivierBlanvillain I've addressed your comment for marking |
Codecov Report@@ Coverage Diff @@
## master #99 +/- ##
==========================================
+ Coverage 90.06% 90.67% +0.61%
==========================================
Files 25 25
Lines 473 504 +31
Branches 7 8 +1
==========================================
+ Hits 426 457 +31
Misses 47 47
Continue to review full report at Codecov.
|
1d6a172
to
6cf5d07
Compare
We didn't use our encoders before, instead we used `ScalaUDF` that uses default ones that aren't compatible with `TypedEncoder`. We introduce `FramelessUdf` that uses `TypedEncoder` to support udfs. There are 2 special cases to mention: 1. udf is called within codegen 2. udf is directly evaluated within local projection `TypedEncoder` doesn't support runtime evaluation, only codegen, that's why we handle (2) by generating code from (1), compiling and executing it.
@kanterov we still run to the case where we call a method that exists in "Vector" but doesn't exists on "Arrays", right? |
@imarios we shouldn't, current implementation would take |
@kanterov sounds good! I'm not sure if I can help further on this one, unless someone else wants to give it a shot I would say we can merge :) |
I think we should merge too. I will try to add a LOT more unit tests to cover different cases. I will open another PR for the new test additions. |
Awesome, thanks for review, merging. |
We didn't use our encoders before, instead we used
ScalaUDF
that usesdefault ones that aren't compatible with
TypedEncoder
. We introduceFramelessUdf
that usesTypedEncoder
to support udfs. There are2 special cases to mention:
TypedEncoder
doesn't support runtime evaluation, only codegen, that'swhy we handle (2) by generating code from (1), compiling and executing
it.