diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index 3c1741ff29547..8b2a45d8e970a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -187,32 +187,9 @@ private[sql] object PartitioningUtils { Seq.empty } else { val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct - - def listConflictingPartitionColumns: String = { - def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = - seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) - - val partColNamesToPaths = groupByKey(pathsWithPartitionValues.map { - case (path, partValues) => partValues.columnNames -> path - }) - - val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map { - case (names, index) => - s"Partition column name list #$index: $names" - } - - // Lists out those non-leaf partition directories that also contain files - val suspiciousPaths = - distinctPartColNames.sortBy(_.length).init.flatMap(partColNamesToPaths) - - s"Conflicting partition column names detected:\n" + - distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + - "For partitioned table directories, data files should only live in leaf directories. " + - "Please check the following directories for unexpected files:\n" + - suspiciousPaths.mkString("\n\t", "\n\t", "\n") - } - - assert(distinctPartColNames.size == 1, listConflictingPartitionColumns) + assert( + distinctPartColNames.size == 1, + listConflictingPartitionColumns(pathsWithPartitionValues)) // Resolves possible type conflicts for each column val values = pathsWithPartitionValues.map(_._2) @@ -228,6 +205,34 @@ private[sql] object PartitioningUtils { } } + private[sql] def listConflictingPartitionColumns( + pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { + val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct + + def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = + seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) + + val partColNamesToPaths = groupByKey(pathWithPartitionValues.map { + case (path, partValues) => partValues.columnNames -> path + }) + + val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map { + case (names, index) => + s"Partition column name list #$index: $names" + } + + // Lists out those non-leaf partition directories that also contain files + val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths) + + s"Conflicting partition column names detected:\n" + + distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + + "For partitioned table directories, data files should only live in leaf directories.\n" + + "And directories at the same level should have the same partition column name.\n" + + "Please check the following directories for unexpected files or " + + "inconsistent partition column names:\n" + + suspiciousPaths.map("\t" + _).mkString("\n", "\n", "") + } + /** * Converts a string to a [[Literal]] with automatic type inference. Currently only supports * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 01df189d1f3be..971765bab9508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df) } } + + test("listConflictingPartitionColumns") { + def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]) = { + val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) => + s"\tPartition column name list #$index: $list" + }.mkString("\n", "\n", "\n") + + // scalastyle:off + s"""Conflicting partition column names detected: + |$conflictingColNameLists + |For partitioned table directories, data files should only live in leaf directories. + |And directories at the same level should have the same partition column name. + |Please check the following directories for unexpected files or inconsistent partition column names: + |${paths.map("\t" + _).mkString("\n", "\n", "")} + """.stripMargin.trim + // scalastyle:on + } + + assert( + listConflictingPartitionColumns( + Seq( + (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))), + (new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim === + makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1"))) + + assert( + listConflictingPartitionColumns( + Seq( + (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))), + (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim === + makeExpectedMessage( + Seq("a"), + Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1"))) + + assert( + listConflictingPartitionColumns( + Seq( + (new Path("file:/tmp/foo/a=1"), + PartitionValues(Seq("a"), Seq(Literal(1)))), + (new Path("file:/tmp/foo/a=1/b=foo"), + PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim === + makeExpectedMessage( + Seq("a", "a, b"), + Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo"))) + } }