Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when to support for spark 2.1.x #11

Closed
MichaelXucf opened this issue Jul 20, 2017 · 12 comments
Closed

when to support for spark 2.1.x #11

MichaelXucf opened this issue Jul 20, 2017 · 12 comments

Comments

@MichaelXucf
Copy link

MichaelXucf commented Jul 20, 2017

Hi, vruusmann.
i meet a problem when i use jpmml-evaluator-spark with spark 2.1.1.

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.CreateStruct.(Lscala/collection/Seq;)V
at org.jpmml.evaluator.spark.PMMLTransformer.transform(PMMLTransformer.java:151)
at com.michaelxu.spark.TestPipeLine.testJPMML(TestPipeLine.scala:312)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

the version of jpmml-evaluator-spark i used is 1.1-SNAPSHOT ,it support version of spark is 2.0.0 to 2.0.2.
the CreateStruct defined in 2.0.2 is a case class

 /**
 * Returns a Row containing the evaluation of all children expressions.
 */
@ExpressionDescription(
  usage = "_FUNC_(col1, col2, col3, ...) - Creates a struct with the given field values.")
case class CreateStruct(children: Seq[Expression]) extends Expression {

  override def foldable: Boolean = children.forall(_.foldable)

  override lazy val dataType: StructType = {
    val fields = children.zipWithIndex.map { case (child, idx) =>
      child match {
        case ne: NamedExpression =>
          StructField(ne.name, ne.dataType, ne.nullable, ne.metadata)
        case _ =>
          StructField(s"col${idx + 1}", child.dataType, child.nullable, Metadata.empty)
      }
    }
    StructType(fields)
  }
......

but in spark 2.1.1, the CreateStruct defined as a object

/**
 * Returns a Row containing the evaluation of all children expressions.
 */
object CreateStruct extends FunctionBuilder {
  def apply(children: Seq[Expression]): CreateNamedStruct = {
    CreateNamedStruct(children.zipWithIndex.flatMap {
      case (e: NamedExpression, _) if e.resolved => Seq(Literal(e.name), e)
      case (e: NamedExpression, _) => Seq(NamePlaceholder, e)
      case (e, index) => Seq(Literal(s"col${index + 1}"), e)
    })
  }

  /**
   * Entry to use in the function registry.
   */
  val registryEntry: (String, (ExpressionInfo, FunctionBuilder)) = {
    val info: ExpressionInfo = new ExpressionInfo(
      "org.apache.spark.sql.catalyst.expressions.NamedStruct",
      null,
      "struct",
      "_FUNC_(col1, col2, col3, ...) - Creates a struct with the given field values.",
      "")
    ("struct", (info, this))
  }
}

i'm not sure how to change the code in PMMLTransformer,can you give me some suggestion?
thank you.

Expression evaluateExpression = new ScalaUDF(evaluatorFunction, getOutputSchema(), ScalaUtil.<Expression>singletonSeq(new CreateStruct(ScalaUtil.<Expression>toSeq(activeExpressions))), ScalaUtil.<DataType>emptySeq());
@vruusmann
Copy link
Member

For the moment, see @sidfeiner's adaption:
sidfeiner@cf897ed

The removal of CreateStruct function is a breaking change between Apache Spark 2.0 and 2.1 APIs. It would be wasteful to start maintaining two JPMML-Evaluator-Spark development branches (eg. 1.1-SNAPSHOT and 1.2-SNAPSHOT) only because of that.

The solution would be to introduce my own CreateStruct function, which works with all Apache Spark 2.X versions (including the latest 2.2.0 version).

@sidfeiner
Copy link

Hey, I've had another commit fixing this issue. The CreateStruct might not be a class anymore, but it still has an apply function so that's what I use

@MichaelXucf
Copy link
Author

@vruusmann @sidfeiner thank you, I have resloved my problem with using sidfeiner/jpmml-spark 's method.
By the way, it has a little problem when jpmml-spark worked with spark 2.1.1. The constructor of ScalaUDF is slightly different in spark 2.1.0 and 2.1.1。

In Spark 2.1.0

Expression evaluateExpression = new ScalaUDF(
		evaluatorFunction,
		getOutputSchema(),
		ScalaUtil.<Expression>singletonSeq(CreateStruct.apply(ScalaUtil.<Expression>toSeq(activeExpressions))),
		ScalaUtil.<DataType>emptySeq()
		);

In Spark 2.1.1, ScalaUDF need a Option paramter.

Expression evaluateExpression = new ScalaUDF(
		evaluatorFunction,
		getOutputSchema(),
		ScalaUtil.<Expression>singletonSeq(CreateStruct.apply(ScalaUtil.<Expression>toSeq(activeExpressions))),
		ScalaUtil.<DataType>emptySeq(), 
                None$.<String>empty());

@vruusmann
Copy link
Member

It would be a really bad news if Apache Spark APIs break like this already between minor versions.

As mentioned earlier, the JPMML-Evaluator-Spark should be source and binary compatible with the widest range of Apache Spark 2.X versions (ideally all 2.0.X, 2.1.X and 2.2.X release versions). I hope it can be achieved without having to write my own ScalaUDF function (in addition to CreateStruct function).

If my IDE autocomplete is not mistaken, then the CreateStruct#apply(..) method is not available in Apache Spark 2.0.0 version?

@MichaelXucf
Copy link
Author

I get an exception When I use sidfeiner/jpmml-spark 's branch worked with spark 2.1.1.
sidfeiner/jpmml-spark is compiled with spark 2.1.0. and , my project depend on spark-2.1.1.

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(Ljava/lang/Object;Lorg/apache/spark/sql/types/DataType;Lscala/collection/Seq;Lscala/collection/Seq;)V

	at org.jpmml.spark.PMMLTransformer.transform(PMMLTransformer.java:154)
	at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
	at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
	at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
	at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
	at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
	at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
	at com.michaelxu.spark.TestPipeLine.testJPMML(TestPipeLine.scala:306)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	......

In Spark 2.1.0 ScalaUDF is defined as

case class ScalaUDF(
    function: AnyRef,
    dataType: DataType,
    children: Seq[Expression],
    inputTypes: Seq[DataType] = Nil)

In Spark 2.1.1 ScalaUDF is defined as

case class ScalaUDF(
    function: AnyRef,
    dataType: DataType,
    children: Seq[Expression],
    inputTypes: Seq[DataType] = Nil,
    udfName: Option[String] = None)

When use ScalaUDF in java with spark 2.1.1, ScalaUDF must receive 5 parameters.This may be the problem.

CreateStruct#apply(..) method is not available in Apache Spark 2.0.2 version, just has an method
CreateStruct#unapply(..)

@sidfeiner
Copy link

The fact they added new params to the ScalaUDF constructor shouldn't break API's if they were written in Scala because they added params with default values. Our problem is that the evaluator is written in Java and this doesn't allow us to use the default params. We could fill them in by ourselves, and we could also rewrite PMMLTransformer in Scala as that may avoid future cases of Spark adding params with default values to their API's.

And CreateStruct was a case class in Spark 2.0, and all case classes have an apply function. Does your IDE/compiler throw errors if you try to call it's apply function?

@vruusmann
Copy link
Member

That's a good point that selected Java classes (such as org.jpmml.evaluator.spark.PMMLTransformer) should be translated to Scala.

Haven't set up any Scala projects so far. But based on what I've seen elsewhere, it seems pretty difficult to make a mixed Java/Scala project to compile and build nicely. I dread the idea that I would need to switch from Apache Maven build to SBT build etc.

@sidfeiner
Copy link

I have had a few mixed projects, if you want I can help you with it. You won't even have to switch to SBT because there's a Scala plugin for Maven

@samhendley
Copy link

Sorry to resurrect an old thread but I believe that translating classes into scala won't fix the default argument issues. The scala compiler copies the default values into the calling code during compilation. If you change the compiled library you would still get a failure but it would look like a MethodNotFoundException. This is my biggest gripe with scala, it is much more difficult to reason about what is/isn't a breaking binary change and the "scala way" promotes binary incompatibility by preferring defaults over explicit overloading.

@vruusmann
Copy link
Member

Thanks @samhendley - great insight into the operations of the scala compiler. So, the conclusion is that it's virtually impossible to write a unitary Java/Scala codebase that could be compiled, packaged and then run on a variety of different Apache Spark versions (in the current case, all 2.0.X, 2.1.X and 2.2.X versions). There must be some variance in Java/Scala codebase that detects the version of the running Apache Spark version, and then chooses appropriate parameterization for the ScalaUDF function?

Anyway, the starting point would be to extract the "variable" part of Java code from the TransformerBuilder#transform(Dataset) method, into a separate Scala utility method (that the scala compiler could see and understand, and apply its workarounds).

@valdo404
Copy link

valdo404 commented Oct 25, 2017

Hi, I just pushed some pull request about this. have a look. it's working with spark 2.2. And may work with spark 2.1 #12

@vladimir-lu
Copy link

vladimir-lu commented Nov 23, 2017

@vruusmann I've only done this with sbt projects - but essentially you can remain source-compatible with Spark by compiling against different versions of libraries against the same source folder (similarly for Scala versions but you can additionally have a scala-2.10/scala-2.11/scala-2.12 folder for when things are really not compatible). This is still somewhat hacky to do in sbt but at least it's doable. In maven I guess you would need a custom plugin.

+1 to PR to convert this project to Scala. While it doesn't solve all problems, wrapping Scala libraries is usually best done in Scala (and there's actually no compatibility need here to provide a Java interface AFAIS).

@samhendley Scala binary compatibility is non-trivial - this is why tools like MiMa are there to ensure it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants