Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrently read and write files with parquet4s-fs2 #285

Closed
flipp5b opened this issue Jan 13, 2023 · 9 comments · Fixed by #287
Closed

Concurrently read and write files with parquet4s-fs2 #285

flipp5b opened this issue Jan 13, 2023 · 9 comments · Fixed by #287

Comments

@flipp5b
Copy link
Contributor

flipp5b commented Jan 13, 2023

I face an issue when I try to concurrently read and write files with parquet4s-fs2.

A simplified reproducer looks as follows:

import cats.effect.{IO, IOApp}
import com.github.mjakubowski84.parquet4s.{ParquetReader, ParquetWriter, Path, parquet}
import org.apache.hadoop.conf.Configuration

object App extends IOApp.Simple {
  override def run: IO[Unit] = {
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.defaultFS", "hdfs://hdfs-address")

    val readIn = parquet
      .fromParquet[IO]
      .as[In]
      .options(ParquetReader.Options(hadoopConf = hadoopConf))
      .read(Path("/path/to/in.parquet"))

    val writeOut = parquet
      .writeSingleFile[IO]
      .of[Out]
      .options(ParquetWriter.Options(hadoopConf = hadoopConf))
      .write(Path("/path/to/out.parquet"))

    readIn.map(in => Out(in.i.toString)).through(writeOut).compile.drain
  }

  case class In(i: Int)

  case class Out(s: String)
}

It fails with a java.nio.channels.ClosedChannelException. I've added some tracing messages and formatted the output for the sake of readability:

io.validateWritePath /path/to/out.parquet
  FileSystem.get /path/to/out.parquet
    FileSystem.get hdfs://hdfs-address
  DistributedFileSystem.close (user (auth:SIMPLE))@hdfs://hdfs-address
    DFSClient.closeAllFilesBeingWritten filesBeingWritten = {}
    DFSClient.closeAllFilesBeingWritten filesBeingWritten = {}

ParquetWriter.internalWriter
  FileSystem.get /path/to/out.parquet
    FileSystem.get hdfs://hdfs-address

io.findPartitionedPaths /path/to/in.parquet
  FileSystem.get /path/to/in.parquet
    FileSystem.get hdfs://hdfs-address
  DistributedFileSystem.close (user (auth:SIMPLE))@hdfs://hdfs-address
    DFSClient.closeAllFilesBeingWritten filesBeingWritten = {287686131=DFSOutputStream:block==null}
      DFSOutputStream.close /path/to/out.parquet
    DFSClient.closeAllFilesBeingWritten filesBeingWritten = {}

reader.parquetIteratorResource
  FileSystem.get /path/to/in.parquet
    FileSystem.get hdfs://hdfs-address

writer.Writer.writeAll
...
writer.Writer.writeAll
ParquetIterator.close
writer.Writer.writeAll
...
writer.Writer.writeAll

writer.Writer.close
  java.nio.channels.ClosedChannelException: null
      at org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
      at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:153)
      at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
      at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
      at java.io.DataOutputStream.write(DataOutputStream.java:107)
      at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
      at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.write(HadoopPositionOutputStream.java:45)
      at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
      at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:896)
      at org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:842)
      at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:310)
      at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:458)
      at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:186)
      at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
      at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
      at com.github.mjakubowski84.parquet4s.parquet.writer$Writer.close(writer.scala:90)
      at uncancelable @ fs2.Compiler$Target.uncancelable(Compiler.scala:165)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:161)
      at modify @ fs2.internal.Scope.close(Scope.scala:262)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at modify @ fs2.internal.Scope.close(Scope.scala:262)
      at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
      at flatMap @ fs2.Pull$.$anonfun$compile$18(Pull.scala:1208)
      at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:161)

parquet4s-fs2 calls path.getFileSystem(conf) here and there and wraps the resulting filesystem with a cats.effect.Resource so the filesystem is closed when the resource is released. But the issue is that by default, path.getFileSystem(conf) uses caching, so the resource may potentially close the filesystem that is used by someone else.

In the trace above, we can see that the writer prepares to write the file, gets a filesystem, opens DFSOutputStream, etc. But then the reader calls findPartitionedPaths which gets the same filesystem and closes it and the linked DFSOutputStream.

It's possible to disable the filesystem cache using fs.hdfs.impl.disable.cache and this solves the problem. But this also may lead to a FileSystem leak: parquet-mr calls path.getFileSystem(conf) in some places, and it looks like it doesn't close received FileSystem objects.

Could you please advise the proper way to solve the issue? Am I doing something wrong, or is it probably better not to close file systems inside parquet4s-fs2?

@mjakubowski84
Copy link
Owner

Hi @flipp5b !
Thank you for the heads up! I would say that you found the bug, and parquet4s-fs2 should not close the file system. Feel free to create a PR with a fix.

@flipp5b
Copy link
Contributor Author

flipp5b commented Jan 16, 2023

Hi @mjakubowski84,

Thank you for the great library! Sure, I'll prepare the PR.

@flipp5b
Copy link
Contributor Author

flipp5b commented Jan 16, 2023

@mjakubowski84, I'd like to add a test to reproduce the bug, but it looks like the issue is specific to a distributed file system. So I wonder if it's acceptable to add a new test suite with testcontainers running a custom single-node Hadoop cluster image?

@mjakubowski84
Copy link
Owner

sure! please do!

flipp5b added a commit to flipp5b/parquet4s that referenced this issue Jan 17, 2023
flipp5b added a commit to flipp5b/parquet4s that referenced this issue Jan 17, 2023
@mjakubowski84
Copy link
Owner

@flipp5b Thanks for the contribution! Would you like to add a fix also to core & akka so that the bug fix release solves the problem completely?

@mjakubowski84 mjakubowski84 reopened this Jan 17, 2023
@flipp5b
Copy link
Contributor Author

flipp5b commented Jan 17, 2023 via email

@mjakubowski84
Copy link
Owner

Thanks, I appreciate it!

@mjakubowski84
Copy link
Owner

mjakubowski84 commented Jan 18, 2023

@flipp5b Your fixes are released as v2.8.0. Thank you for your contribution!

@flipp5b
Copy link
Contributor Author

flipp5b commented Jan 19, 2023

@mjakubowski84, it was a pleasure! Thank you for the instant feedback and the bugfix release!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants