diff --git a/core/src/it/scala/com/github/mjakubowski84/parquet4s/IOOpsITSpec.scala b/core/src/it/scala/com/github/mjakubowski84/parquet4s/IOOpsITSpec.scala index 3d23629a..758beb92 100644 --- a/core/src/it/scala/com/github/mjakubowski84/parquet4s/IOOpsITSpec.scala +++ b/core/src/it/scala/com/github/mjakubowski84/parquet4s/IOOpsITSpec.scala @@ -2,10 +2,12 @@ package com.github.mjakubowski84.parquet4s import org.apache.hadoop.fs.FileAlreadyExistsException import org.apache.parquet.hadoop.ParquetFileWriter.Mode +import org.scalatest.BeforeAndAfter +import org.scalatest.EitherValues import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import org.scalatest.{BeforeAndAfter, EitherValues} -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory class IOOpsITSpec extends AnyFlatSpec with Matchers with IOOps with TestUtils with BeforeAndAfter with EitherValues { @@ -139,6 +141,26 @@ class IOOpsITSpec extends AnyFlatSpec with Matchers with IOOps with TestUtils wi dir.schema should be(Col("x") :: Nil) } + it should "ignore paths that do not match the pathFilter predicate" in { + val path_X1 = tempPath.append("_x=1").append("file.parquet") + val pathX1 = tempPath.append("x=1").append("file.parquet") + fileSystem.createNewFile(path_X1.toHadoop) + fileSystem.createNewFile(pathX1.toHadoop) + + val dir = findPartitionedPaths(tempPath, configuration, !_.getName().startsWith("_")).value + dir.paths should be(List(PartitionedPath(pathX1, configuration, (Col("x") -> "1") :: Nil))) + dir.schema should be(List(Col("x"))) + } + + it should "accept hadoop default hidden paths when pathFilter always returns true" in { + val pathX1 = tempPath.append("_x=1/.y=1").append("file.parquet") + fileSystem.createNewFile(pathX1.toHadoop) + + val dir = findPartitionedPaths(tempPath, configuration, _ => true).value + dir.paths should be(List(PartitionedPath(pathX1, configuration, List(Col("_x") -> "1", Col(".y") -> "1")))) + dir.schema should be(List(Col("_x"), Col(".y"))) + } + it should "return a partitioned directory with single file of no partitions values" in { val pathX1 = tempPath.append("x=1").append("file.parquet") fileSystem.createNewFile(pathX1.toHadoop) @@ -186,4 +208,15 @@ class IOOpsITSpec extends AnyFlatSpec with Matchers with IOOps with TestUtils wi findPartitionedPaths(tempPath, configuration) should be a Symbol("Left") } + it should "fail to create partitions from inconsistent directory [case3]" in { + val path1 = tempPath.append("x=1/file.parquet") + val path2 = tempPath.append("_x=1/file.parquet") + fileSystem.createNewFile(path1.toHadoop) + fileSystem.createNewFile(path2.toHadoop) + // This case ignores `_x=1`, so there's no inconsistent directory. + findPartitionedPaths(tempPath, configuration) should be a Symbol("Right") + // This case regards `_x=1` an as ordinary directory, so it conflicts with `x=1`. + findPartitionedPaths(tempPath, configuration, _ => true) should be a Symbol("Left") + } + } diff --git a/core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala b/core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala index 14baf0e5..8117c365 100644 --- a/core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala +++ b/core/src/main/scala/com/github/mjakubowski84/parquet4s/IOOps.scala @@ -1,12 +1,17 @@ package com.github.mjakubowski84.parquet4s import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, FileSystem, RemoteIterator} +import org.apache.hadoop.fs.FileAlreadyExistsException +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.PathFilter +import org.apache.hadoop.fs.RemoteIterator import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.util.HiddenFileFilter import org.slf4j.Logger -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.ExecutionContext +import scala.concurrent.Future import scala.util.matching.Regex private[parquet4s] object IOOps { @@ -67,12 +72,18 @@ trait IOOps { } } + /** @param path + * a location in a file tree from which `findPartitionedPaths` collects descendant paths recursively + * @param pathFilter + * `findPartitionedPaths` traverses paths that match this predicate + */ protected def findPartitionedPaths( path: Path, - configuration: Configuration + configuration: Configuration, + pathFilter: PathFilter = HiddenFileFilter.INSTANCE ): Either[Exception, PartitionedDirectory] = { val fs = path.toHadoop.getFileSystem(configuration) - findPartitionedPaths(fs, configuration, path, List.empty).fold( + findPartitionedPaths(fs, configuration, path, pathFilter, List.empty).fold( PartitionedDirectory.failed, PartitionedDirectory.apply ) @@ -82,10 +93,11 @@ trait IOOps { fs: FileSystem, configuration: Configuration, path: Path, + pathFilter: PathFilter, partitions: List[Partition] ): Either[List[Path], List[PartitionedPath]] = { val (dirs, files) = fs - .listStatus(path.toHadoop, HiddenFileFilter.INSTANCE) + .listStatus(path.toHadoop, pathFilter) .toList .partition(_.isDirectory) if (dirs.nonEmpty && files.nonEmpty) @@ -96,11 +108,11 @@ trait IOOps { Right(List.empty) // empty leaf dir else if (partitionedDirs.isEmpty) // leaf files - Right(files.map(fileStatus => PartitionedPath(fileStatus, configuration, partitions))) + Right(files.map(PartitionedPath(_, configuration, partitions))) else partitionedDirs .map { case (subPath, partition) => - findPartitionedPaths(fs, configuration, subPath, partitions :+ partition) + findPartitionedPaths(fs, configuration, subPath, pathFilter, partitions :+ partition) } .foldLeft[Either[List[Path], List[PartitionedPath]]](Right(List.empty)) { case (Left(invalidPaths), Left(moreInvalidPaths)) => diff --git a/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetReader.scala b/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetReader.scala index 2786226b..d50c769a 100644 --- a/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetReader.scala +++ b/core/src/main/scala/com/github/mjakubowski84/parquet4s/ParquetReader.scala @@ -2,11 +2,15 @@ package com.github.mjakubowski84.parquet4s import com.github.mjakubowski84.parquet4s.etl.CompoundParquetIterable import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.PathFilter import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.hadoop import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.InputFile -import org.apache.parquet.schema.{MessageType, Type} -import org.slf4j.{Logger, LoggerFactory} +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.Type +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.io.Closeable import java.util.TimeZone @@ -42,6 +46,12 @@ object ParquetReader extends IOOps { */ def filter(filter: Filter): Builder[T] + /** @param pathFilter + * optional path filter; ParquetReader traverses paths that match this predicate to resolve partitions. It uses + * org.apache.parquet.hadoop.util.HiddenFileFilter by default. + */ + def pathFilter(filter: PathFilter): Builder[T] + /** Attempt to read data as partitioned. Partition names must follow Hive format. Partition values will be set in * read records to corresponding fields. */ @@ -77,7 +87,8 @@ object ParquetReader extends IOOps { options: ParquetReader.Options = ParquetReader.Options(), filter: Filter = Filter.noopFilter, projectedSchemaResolverOpt: Option[ParquetSchemaResolver[T]] = None, - columnProjections: Seq[ColumnProjection] = Seq.empty + columnProjections: Seq[ColumnProjection] = Seq.empty, + pathFilter: PathFilter = hadoop.util.HiddenFileFilter.INSTANCE ) extends Builder[T] { override def options(options: ParquetReader.Options): Builder[T] = this.copy(options = options) @@ -85,6 +96,8 @@ object ParquetReader extends IOOps { override def filter(filter: Filter): Builder[T] = this.copy(filter = filter) + override def pathFilter(filter: PathFilter): Builder[T] = this.copy(pathFilter = pathFilter) + override def partitioned: Builder[T] = this override def read(path: Path)(implicit decoder: ParquetRecordDecoder[T]): ParquetIterable[T] = @@ -96,7 +109,7 @@ object ParquetReader extends IOOps { inputFile match { case hadoopInputFile: HadoopInputFile => - partitionedIterable(Path(hadoopInputFile.getPath), valueCodecConfiguration, hadoopConf) + partitionedIterable(Path(hadoopInputFile.getPath), valueCodecConfiguration, hadoopConf, pathFilter) case _ => singleIterable( inputFile = inputFile, @@ -112,9 +125,10 @@ object ParquetReader extends IOOps { private def partitionedIterable( path: Path, valueCodecConfiguration: ValueCodecConfiguration, - hadoopConf: Configuration + hadoopConf: Configuration, + pathFilter: PathFilter )(implicit decoder: ParquetRecordDecoder[T]): ParquetIterable[T] = - findPartitionedPaths(path, hadoopConf) match { + findPartitionedPaths(path, hadoopConf, pathFilter) match { case Left(exception) => throw exception case Right(partitionedDirectory) =>