Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream can hang if there is an error reading a source file from S3 #414

Closed
blissd opened this issue Jul 15, 2020 · 3 comments
Closed

Stream can hang if there is an error reading a source file from S3 #414

blissd opened this issue Jul 15, 2020 · 3 comments

Comments

@blissd
Copy link

blissd commented Jul 15, 2020

Problem

We have an fs2 job that reads a JSON file from S3 and then processes this file with 30 parallel streams to extract different sets of data to write to 30 parquet files. We find that if there is an error reading the input file, such as the file key being incorrect or the S3 permissions being incorrect, then the job will hang, print no errors, and produce no output.

However, we also find that if we configure the job to produce fewer out files (which results in fewer parallel streams), then the job doesn't hang and an appropriate exception is produce and shows up in the logs.

Work Around

The problem appears to be that when the stream from S3 is read it does not use the Blocker that it should. To work around this we no longer use readS3File, but instead use our own version, which is as follows:

  def readFromS3(bucket: String, key: String, blocker: Blocker, chunkSize: Int)(implicit sync: Sync[IO], shift: ContextShift[IO]): ByteStream = {
    fs2.io.readInputStream[IO](
      blocker.blockOn(IO(AmazonS3ClientBuilder.defaultClient().getObject(new GetObjectRequest(bucket, key)).getObjectContent)),
      chunkSize = chunkSize,
      blocker = blocker,
      closeAfterUse = true)
  }

The first difference from the fs2-aws API is that this readFromS3 method takes a Blocker instead of an ExecutionContext, which seems more correct as we already have a Blocker in context to use for this. The second different from the fs2-aws API is that blocker.blockOn(...) is used to wrap the IO[InputSTream], which is what ultimately appears to fix the hanging problem.

@semenodm
Copy link
Member

Thank you for using this library, it would be great if you create PR so we can merge this change to mainline. Or i can revisit it little bit later, this problem worth to fix, thank you again for creating the issue this makes this library better.

@blissd
Copy link
Author

blissd commented Jul 30, 2020

I've experimented with the v3 API and the problem persists. The workaround for v3 is:

fs2.io.readInputStream(
          blocker.blockOn(IO(
            S3Client.create.getObject(
              GetObjectRequest
                .builder()
                .bucket(bucket)
                .key(key)
                .build()
            )
          )),
          chunkSize = 4096,
          closeAfterUse = true,
          blocker = blocker
        )

@austinwarrren
Copy link

As far as I can tell, it seems this issue has been resolved. Tested today, and discovered through testing that running s3.readFile (method name appears to have changed since creation of this issue) on a path which does not exist in S3 results in the exception NoSuchKeyException being returned, rather than the process hanging as described above. If this is not the case, is there any additional context surrounding the issue which may be useful in resolving it? Or if this issue has been resolved, may this issue be closed? Steps I performed in reaching this conclusion:

  1. Set up S3 environment via localstack.
  2. Create test in S3Suite which should fail in the above specified conditions--added a file key which did not exist.
  3. Test fails, but returns an exception (NoSuchKeyException) rather than an empty stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants