Skip to content

Commit

Permalink
[SPARK-8138] [SQL] Improves error message when conflicting partition …
Browse files Browse the repository at this point in the history
…columns are found

This PR improves the error message shown when conflicting partition column names are detected.  This can be particularly annoying and confusing when there are a large number of partitions while a handful of them happened to contain unexpected temporary file(s).  Now all suspicious directories are listed as below:

```
java.lang.AssertionError: assertion failed: Conflicting partition column names detected:

        Partition column name list #0: b, c, d
        Partition column name list #1: b, c
        Partition column name list #2: b

For partitioned table directories, data files should only live in leaf directories. Please check the following directories for unexpected files:

        file:/tmp/foo/b=0
        file:/tmp/foo/b=1
        file:/tmp/foo/b=1/c=1
        file:/tmp/foo/b=0/c=0
```

Author: Cheng Lian <lian@databricks.com>

Closes apache#6610 from liancheng/part-errmsg and squashes the following commits:

7d05f2c [Cheng Lian] Fixes Scala style issue
a149250 [Cheng Lian] Adds test case for the error message
6b74dd8 [Cheng Lian] Also lists suspicious non-leaf partition directories
a935eb8 [Cheng Lian] Improves error message when conflicting partition columns are found
  • Loading branch information
liancheng committed Jun 24, 2015
1 parent 09fcf96 commit cc465fd
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)

// Creates the StructType which represents the partition columns.
val fields = {
Expand Down Expand Up @@ -181,19 +181,18 @@ private[sql] object PartitioningUtils {
* StringType
* }}}
*/
private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
// Column names of all partitions must match
val distinctPartitionsColNames = values.map(_.columnNames).distinct

if (distinctPartitionsColNames.isEmpty) {
private[sql] def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
assert(distinctPartitionsColNames.size == 1, {
val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
s"Conflicting partition column names detected:\n$list"
})
val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
assert(
distinctPartColNames.size == 1,
listConflictingPartitionColumns(pathsWithPartitionValues))

// Resolves possible type conflicts for each column
val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
Expand All @@ -206,6 +205,34 @@ private[sql] object PartitioningUtils {
}
}

private[sql] def listConflictingPartitionColumns(
pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct

def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })

val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
case (path, partValues) => partValues.columnNames -> path
})

val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
case (names, index) =>
s"Partition column name list #$index: $names"
}

// Lists out those non-leaf partition directories that also contain files
val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)

s"Conflicting partition column names detected:\n" +
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
"For partitioned table directories, data files should only live in leaf directories.\n" +
"And directories at the same level should have the same partition column name.\n" +
"Please check the following directories for unexpected files or " +
"inconsistent partition column names:\n" +
suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
}

/**
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
}
}

test("listConflictingPartitionColumns") {
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
s"\tPartition column name list #$index: $list"
}.mkString("\n", "\n", "\n")

// scalastyle:off
s"""Conflicting partition column names detected:
|$conflictingColNameLists
|For partitioned table directories, data files should only live in leaf directories.
|And directories at the same level should have the same partition column name.
|Please check the following directories for unexpected files or inconsistent partition column names:
|${paths.map("\t" + _).mkString("\n", "\n", "")}
""".stripMargin.trim
// scalastyle:on
}

assert(
listConflictingPartitionColumns(
Seq(
(new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))),
(new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim ===
makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1")))

assert(
listConflictingPartitionColumns(
Seq(
(new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))),
(new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim ===
makeExpectedMessage(
Seq("a"),
Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))

assert(
listConflictingPartitionColumns(
Seq(
(new Path("file:/tmp/foo/a=1"),
PartitionValues(Seq("a"), Seq(Literal(1)))),
(new Path("file:/tmp/foo/a=1/b=foo"),
PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim ===
makeExpectedMessage(
Seq("a", "a, b"),
Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
}
}

0 comments on commit cc465fd

Please sign in to comment.