Skip to content

Commit

Permalink
Adds test case for the error message
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jun 24, 2015
1 parent 6b74dd8 commit a149250
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,32 +187,9 @@ private[sql] object PartitioningUtils {
Seq.empty
} else {
val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct

def listConflictingPartitionColumns: String = {
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })

val partColNamesToPaths = groupByKey(pathsWithPartitionValues.map {
case (path, partValues) => partValues.columnNames -> path
})

val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
case (names, index) =>
s"Partition column name list #$index: $names"
}

// Lists out those non-leaf partition directories that also contain files
val suspiciousPaths =
distinctPartColNames.sortBy(_.length).init.flatMap(partColNamesToPaths)

s"Conflicting partition column names detected:\n" +
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
"For partitioned table directories, data files should only live in leaf directories. " +
"Please check the following directories for unexpected files:\n" +
suspiciousPaths.mkString("\n\t", "\n\t", "\n")
}

assert(distinctPartColNames.size == 1, listConflictingPartitionColumns)
assert(
distinctPartColNames.size == 1,
listConflictingPartitionColumns(pathsWithPartitionValues))

// Resolves possible type conflicts for each column
val values = pathsWithPartitionValues.map(_._2)
Expand All @@ -228,6 +205,34 @@ private[sql] object PartitioningUtils {
}
}

private[sql] def listConflictingPartitionColumns(
pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct

def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })

val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
case (path, partValues) => partValues.columnNames -> path
})

val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
case (names, index) =>
s"Partition column name list #$index: $names"
}

// Lists out those non-leaf partition directories that also contain files
val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)

s"Conflicting partition column names detected:\n" +
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
"For partitioned table directories, data files should only live in leaf directories.\n" +
"And directories at the same level should have the same partition column name.\n" +
"Please check the following directories for unexpected files or " +
"inconsistent partition column names:\n" +
suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
}

/**
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
}
}

test("listConflictingPartitionColumns") {
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]) = {
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
s"\tPartition column name list #$index: $list"
}.mkString("\n", "\n", "\n")

// scalastyle:off
s"""Conflicting partition column names detected:
|$conflictingColNameLists
|For partitioned table directories, data files should only live in leaf directories.
|And directories at the same level should have the same partition column name.
|Please check the following directories for unexpected files or inconsistent partition column names:
|${paths.map("\t" + _).mkString("\n", "\n", "")}
""".stripMargin.trim
// scalastyle:on
}

assert(
listConflictingPartitionColumns(
Seq(
(new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))),
(new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim ===
makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1")))

assert(
listConflictingPartitionColumns(
Seq(
(new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))),
(new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim ===
makeExpectedMessage(
Seq("a"),
Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))

assert(
listConflictingPartitionColumns(
Seq(
(new Path("file:/tmp/foo/a=1"),
PartitionValues(Seq("a"), Seq(Literal(1)))),
(new Path("file:/tmp/foo/a=1/b=foo"),
PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim ===
makeExpectedMessage(
Seq("a", "a, b"),
Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
}
}

0 comments on commit a149250

Please sign in to comment.