diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 0bc893224026e..b18887abe317a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -711,7 +711,7 @@ object HiveTypeCoercion { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e - case e: ExpectsInputTypes => + case e: ExpectsInputTypes if (e.inputTypes.nonEmpty) => val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) => implicitCast(in, expected) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index caf021b016a41..fa782a3ce83cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -24,13 +24,18 @@ import org.apache.spark.sql.types.DataType * User-defined function. * @param dataType Return type of function. */ -case class ScalaUDF(function: AnyRef, dataType: DataType, children: Seq[Expression]) - extends Expression { +case class ScalaUDF( + function: AnyRef, + dataType: DataType, + children: Seq[Expression], + expectedInputTypes: Seq[DataType] = Nil) extends Expression with ExpectsInputTypes { override def nullable: Boolean = true override def toString: String = s"UDF(${children.mkString(",")})" + override def inputTypes: Seq[DataType] = expectedInputTypes + // scalastyle:off /** This method has been generated by this script diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 03dc37aa73f0c..a4bcf78a18e44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -126,7 +126,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag](name: String, func: Function0[RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType]() + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -138,7 +139,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -150,7 +152,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -162,7 +165,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -174,7 +178,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -186,7 +191,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](name: String, func: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -198,7 +204,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](name: String, func: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -210,7 +217,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](name: String, func: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -222,7 +230,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](name: String, func: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -234,7 +243,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](name: String, func: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -246,7 +256,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](name: String, func: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -258,7 +269,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](name: String, func: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -270,7 +282,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](name: String, func: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -282,7 +295,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](name: String, func: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -294,7 +308,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](name: String, func: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -306,7 +321,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](name: String, func: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType, ScalaReflection.schemaFor[A15].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -318,7 +334,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](name: String, func: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType, ScalaReflection.schemaFor[A15].dataType, ScalaReflection.schemaFor[A16].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -330,7 +347,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](name: String, func: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType, ScalaReflection.schemaFor[A15].dataType, ScalaReflection.schemaFor[A16].dataType, ScalaReflection.schemaFor[A17].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -342,7 +360,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](name: String, func: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType, ScalaReflection.schemaFor[A15].dataType, ScalaReflection.schemaFor[A16].dataType, ScalaReflection.schemaFor[A17].dataType, ScalaReflection.schemaFor[A18].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -354,7 +373,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](name: String, func: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType, ScalaReflection.schemaFor[A15].dataType, ScalaReflection.schemaFor[A16].dataType, ScalaReflection.schemaFor[A17].dataType, ScalaReflection.schemaFor[A18].dataType, ScalaReflection.schemaFor[A19].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -366,7 +386,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](name: String, func: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType, ScalaReflection.schemaFor[A15].dataType, ScalaReflection.schemaFor[A16].dataType, ScalaReflection.schemaFor[A17].dataType, ScalaReflection.schemaFor[A18].dataType, ScalaReflection.schemaFor[A19].dataType, ScalaReflection.schemaFor[A20].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -378,7 +399,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](name: String, func: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType, ScalaReflection.schemaFor[A15].dataType, ScalaReflection.schemaFor[A16].dataType, ScalaReflection.schemaFor[A17].dataType, ScalaReflection.schemaFor[A18].dataType, ScalaReflection.schemaFor[A19].dataType, ScalaReflection.schemaFor[A20].dataType, ScalaReflection.schemaFor[A21].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } @@ -390,7 +412,8 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](name: String, func: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT]): UserDefinedFunction = { val dataType = ScalaReflection.schemaFor[RT].dataType - def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType, ScalaReflection.schemaFor[A11].dataType, ScalaReflection.schemaFor[A12].dataType, ScalaReflection.schemaFor[A13].dataType, ScalaReflection.schemaFor[A14].dataType, ScalaReflection.schemaFor[A15].dataType, ScalaReflection.schemaFor[A16].dataType, ScalaReflection.schemaFor[A17].dataType, ScalaReflection.schemaFor[A18].dataType, ScalaReflection.schemaFor[A19].dataType, ScalaReflection.schemaFor[A20].dataType, ScalaReflection.schemaFor[A21].dataType, ScalaReflection.schemaFor[A22].dataType) + def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) functionRegistry.registerFunction(name, builder) UserDefinedFunction(func, dataType) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala index 831eb7eb0fae9..b14e00ab9b163 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala @@ -41,10 +41,13 @@ import org.apache.spark.sql.types.DataType * @since 1.3.0 */ @Experimental -case class UserDefinedFunction protected[sql] (f: AnyRef, dataType: DataType) { +case class UserDefinedFunction protected[sql] ( + f: AnyRef, + dataType: DataType, + inputTypes: Seq[DataType] = Nil) { def apply(exprs: Column*): Column = { - Column(ScalaUDF(f, dataType, exprs.map(_.expr))) + Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 4ee1fb8374b07..c3cfcaed1ee27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1589,7 +1589,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType]() + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1600,7 +1601,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1611,7 +1613,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1622,7 +1625,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1633,7 +1637,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1644,7 +1649,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1655,7 +1661,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1666,7 +1673,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1677,7 +1685,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1688,7 +1697,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } /** @@ -1699,7 +1709,8 @@ object functions { * @since 1.3.0 */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = { - UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) + val inputTypes = Seq[DataType](ScalaReflection.schemaFor[A1].dataType, ScalaReflection.schemaFor[A2].dataType, ScalaReflection.schemaFor[A3].dataType, ScalaReflection.schemaFor[A4].dataType, ScalaReflection.schemaFor[A5].dataType, ScalaReflection.schemaFor[A6].dataType, ScalaReflection.schemaFor[A7].dataType, ScalaReflection.schemaFor[A8].dataType, ScalaReflection.schemaFor[A9].dataType, ScalaReflection.schemaFor[A10].dataType) + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes) } ////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 703a34c47ec20..b8d3e38d99ebf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -96,4 +96,10 @@ class UDFSuite extends QueryTest { // 1 + 1 is constant folded causing a transformation. assert(ctx.sql("SELECT makeStruct(1 + 1, 2)").first().getAs[Row](0) === Row(2, 2)) } + + test("type coercion for udf inputs") { + ctx.udf.register("intExpected", (x: Int) => x) + // pass a decimal to intExpected. + assert(ctx.sql("SELECT intExpected(1.0)").head().getInt(0) === 1) + } }