Skip to content

Commit

Permalink
[SPARK-27619][SQL] MapType should be prohibited in hash expressions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
`hash()` and `xxhash64()` cannot be used on elements of `Maptype`. A new configuration `spark.sql.legacy.useHashOnMapType` is introduced to allow users to restore the previous behaviour.

When `spark.sql.legacy.useHashOnMapType` is set to false:

```
scala> spark.sql("select hash(map())");
org.apache.spark.sql.AnalysisException: cannot resolve 'hash(map())' due to data type mismatch: input to function hash cannot contain elements of MapType; line 1 pos 7;
'Project [unresolvedalias(hash(map(), 42), None)]
+- OneRowRelation
```

when `spark.sql.legacy.useHashOnMapType` is set to true :

```
scala> spark.sql("set spark.sql.legacy.useHashOnMapType=true");
res3: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("select hash(map())").first()
res4: org.apache.spark.sql.Row = [42]

```

### Why are the changes needed?

As discussed in Jira, SparkSql's map hashcodes depends on their order of insertion which is not consistent with the normal scala behaviour which might confuse users.
Code snippet from JIRA :
```
val a = spark.createDataset(Map(1->1, 2->2) :: Nil)
val b = spark.createDataset(Map(2->2, 1->1) :: Nil)

// Demonstration of how Scala Map equality is unaffected by insertion order:
assert(Map(1->1, 2->2).hashCode() == Map(2->2, 1->1).hashCode())
assert(Map(1->1, 2->2) == Map(2->2, 1->1))
assert(a.first() == b.first())

// In contrast, this will print two different hashcodes:
println(Seq(a, b).map(_.selectExpr("hash(*)").first()))
```

Also `MapType` is prohibited for aggregation / joins / equality comparisons apache#7819 and set operations apache#17236.

### Does this PR introduce any user-facing change?
Yes. Now users cannot use hash functions on elements of `mapType`. To restore the previous behaviour set `spark.sql.legacy.useHashOnMapType` to true.

### How was this patch tested?
UT added.

Closes apache#27580 from iRakson/SPARK-27619.

Authored-by: iRakson <raksonrakesh@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
iRakson authored and Seongjin Cho committed Apr 14, 2020
1 parent 300c867 commit d8b6f4e
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 17 deletions.
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ license: |

- Since Spark 3.0, when casting string value to integral types(tinyint, smallint, int and bigint), datetime types(date, timestamp and interval) and boolean type, the leading and trailing whitespaces (<= ASCII 32) will be trimmed before converted to these type values, e.g. `cast(' 1\t' as int)` results `1`, `cast(' 1\t' as boolean)` results `true`, `cast('2019-10-10\t as date)` results the date value `2019-10-10`. In Spark version 2.4 and earlier, while casting string to integrals and booleans, it will not trim the whitespaces from both ends, the foregoing results will be `null`, while to datetimes, only the trailing spaces (= ASCII 32) will be removed.

- Since Spark 3.0, An analysis exception will be thrown when hash expressions are applied on elements of MapType. To restore the behavior before Spark 3.0, set `spark.sql.legacy.useHashOnMapType` to true.

- Since Spark 3.0, numbers written in scientific notation(e.g. `1E2`) would be parsed as Double. In Spark version 2.4 and earlier, they're parsed as Decimal. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.exponentLiteralAsDecimal.enabled` to `true`.

- Since Spark 3.0, we pad decimal numbers with trailing zeros to the scale of the column for `spark-sql` interface, for example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
Expand Down Expand Up @@ -232,9 +233,6 @@ case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInp
* - array: The `result` starts with seed, then use `result` as seed, recursively
* calculate hash value for each element, and assign the element hash value
* to `result`.
* - map: The `result` starts with seed, then use `result` as seed, recursively
* calculate hash value for each key-value, and assign the key-value hash
* value to `result`.
* - struct: The `result` starts with seed, then use `result` as seed, recursively
* calculate hash value for each field, and assign the field hash value to
* `result`.
Expand All @@ -249,10 +247,21 @@ abstract class HashExpression[E] extends Expression {

override def nullable: Boolean = false

private def hasMapType(dt: DataType): Boolean = {
dt.existsRecursively(_.isInstanceOf[MapType])
}

override def checkInputDataTypes(): TypeCheckResult = {
if (children.length < 1) {
TypeCheckResult.TypeCheckFailure(
s"input to function $prettyName requires at least one argument")
} else if (children.exists(child => hasMapType(child.dataType)) &&
!SQLConf.get.getConf(SQLConf.LEGACY_USE_HASH_ON_MAPTYPE)) {
TypeCheckResult.TypeCheckFailure(
s"input to function $prettyName cannot contain elements of MapType. In Spark, same maps " +
"may have different hashcode, thus hash expressions are prohibited on MapType " +
s"elements. To restore previous behavior set ${SQLConf.LEGACY_USE_HASH_ON_MAPTYPE.key} " +
"to true.")
} else {
TypeCheckResult.TypeCheckSuccess
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2200,6 +2200,12 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_USE_HASH_ON_MAPTYPE = buildConf("spark.sql.legacy.useHashOnMapType")
.doc("When set to true, hash expressions can be applied on elements of MapType. Otherwise, " +
"an analysis exception will be thrown.")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,28 +554,14 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
.add("arrayOfString", arrayOfString)
.add("arrayOfArrayOfString", ArrayType(arrayOfString))
.add("arrayOfArrayOfInt", ArrayType(ArrayType(IntegerType)))
.add("arrayOfMap", ArrayType(mapOfString))
.add("arrayOfStruct", ArrayType(structOfString))
.add("arrayOfUDT", arrayOfUDT))

testHash(
new StructType()
.add("mapOfIntAndString", MapType(IntegerType, StringType))
.add("mapOfStringAndArray", MapType(StringType, arrayOfString))
.add("mapOfArrayAndInt", MapType(arrayOfString, IntegerType))
.add("mapOfArray", MapType(arrayOfString, arrayOfString))
.add("mapOfStringAndStruct", MapType(StringType, structOfString))
.add("mapOfStructAndString", MapType(structOfString, StringType))
.add("mapOfStruct", MapType(structOfString, structOfString)))

testHash(
new StructType()
.add("structOfString", structOfString)
.add("structOfStructOfString", new StructType().add("struct", structOfString))
.add("structOfArray", new StructType().add("array", arrayOfString))
.add("structOfMap", new StructType().add("map", mapOfString))
.add("structOfArrayAndMap",
new StructType().add("array", arrayOfString).add("map", mapOfString))
.add("structOfUDT", structOfUDT))

test("hive-hash for decimal") {
Expand Down
19 changes: 19 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2120,6 +2120,25 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}

test("SPARK-27619: Throw analysis exception when hash and xxhash64 is used on MapType") {
Seq("hash", "xxhash64").foreach {
case hashExpression =>
intercept[AnalysisException] {
spark.createDataset(Map(1 -> 10, 2 -> 20) :: Nil).selectExpr(s"$hashExpression(*)")
}
}
}

test("SPARK-27619: when spark.sql.legacy.useHashOnMapType is true, hash can be used on Maptype") {
Seq("hash", "xxhash64").foreach {
case hashExpression =>
withSQLConf(SQLConf.LEGACY_USE_HASH_ON_MAPTYPE.key -> "true") {
val df = spark.createDataset(Map() :: Nil)
checkAnswer(df.selectExpr(s"$hashExpression(*)"), sql(s"SELECT $hashExpression(map())"))
}
}
}

test("xxhash64 function") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
withTempView("tbl") {
Expand Down

0 comments on commit d8b6f4e

Please sign in to comment.