Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Is it possible to restart reading a stream at a specified index? #523

Closed
andye2004 opened this issue Apr 13, 2021 · 7 comments
Closed

Comments

@andye2004
Copy link
Contributor

I'm looking to read the content InputStream in a Spring batch reader BUT due to the way the transaction boundaries work, I either run into deadlock scenarios or I have to read the entire InputStream into a buffer. Ideally I'd like to be able to open / start reading the stream at a specified index, would this be possible?

I can provide a detailed explanation of what is actually happening with an indication of the inter-play going on between spring batch and Spring content but it is pretty complex and is likely to be lengthy, hence the simplistic question above. That said, I'm sure there are lots of occasions where we might want to start reading content at a specified location.

Thanks in advance, Andy.

@paulcwarren
Copy link
Owner

Hi @andye2004 ,

I think this is possible. How is batch consuming your Spring Content Storage?

If you are using Spring Content REST on top of a Store then you have an endpoint that supports byte ranges. So, you can make a request with a Byte-Range header.

If you are using the Store Java API directly then after getting the InputStream can you skip forward to the point in the stream where you want to start from?

Or am I misunderstanding your question?

@andye2004
Copy link
Contributor Author

Hi @paulcwarren, thanks for a super fast follow-up!

For background, and I'm sure you already know how Spring batch works but.....

When executing a step (not a tasklet) in a job and the reader batches are smaller than the data available (I know, the whole point of batching) the transaction boundary is (roughly), and apologies for terrible pseudo code:-

while (reader.hasMore()) {
  begin trans
  reader.read (n times)
  processor.process (n times)
  writer.write() (once)
  reader.update() (once)
  end trans
}

Under the covers Spring batch uses a countdown Semaphore and when it hits 0, processing blocks, until the semaphore is released by other processing threads. The semaphore lock is only released when the transaction is committed and this would normally be done as above. However when I inject the contentStore into the equation, it is automatically using the same transaction context as the batch process so when the transaction is committed I lose the stream from that point.

I tried the opposite and began the transaction boundary when calling contentStore.getResource(), but that is what triggered the deadlock as it held the transaction open when the batch framework hit the semaphore and it had run out of leases. Remember the leases are only released when the transaction boundary is crossed.

This is what led me to asking the question and I had thought about skipping etc BUT I'm reading 40Mb+ files at whatever the configured postgres blob size is defined as, say 1024 bytes maybe? I might end up reading the first 1024 bytes thousands of time and simply discarding them and this is going to be pretty costly thing to do and I reckon simply an infeasible approach to take.

After thinking on this some more, I think I need to find a way of using a different session for the Spring content InputStream as opposed to have it use the same session/transaction as is in the current context, that way when the batch transaction is committed the content session is untouched and I can continue to read from the DB without having to re-open the stream and discard n bytes each time.

Hopefully that all made sense.

@paulcwarren
Copy link
Owner

paulcwarren commented Apr 14, 2021

Hmmm...interesting problem. Thanks for sharing the context. The behavior you describe makes sense to me, I think. I wrote the jpa storage module to participate in the current transaction as folks asked for basic transaction support and the use case for that was something more like a developer is developing a boot app with spring data and spring content where a service does some work on one or more entities, then does some content operation on content associated with those entities, then perhaps does some more work on those entities. If anything fails, roll it all back.

But it sounds like when using Spring Batch (definitely not an expert) processing steps start and stop the transaction. However, since a single step run might not process the entire content you therefore want to maintain the open cursor into that content stream.

I can think of some options:

  1. Spring transaction model is tied to the current thread (as I understand it) so you could wrap the stream reader in a thread. BUT that assumes that you don't have multiple steps processing from the stream at the same time
  2. Use different storage. All other storage models are free of transactions and would alleviate this problem I think BUT I assume you are tied to JPA blob storage?

@andye2004
Copy link
Contributor Author

andye2004 commented Apr 14, 2021

Hi @paulcwarren, I did actually spend a bit of time on this problem again yesterday exploring some options and realised I could use a fairly simple approach that would work by wrapping the call to the content store in an Async method returning a callable future, e.g.

@Service
public class AsyncStreamingService {

    private final FileContentStore contentStore;

    @Autowired
    public AsyncStreamingService(
            @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") FileContentStore contentStore) {

        this.contentStore = contentStore;
    }

    @Async
    public CompletableFuture<InputStream> getContent(String contentId) {
        CompletableFuture<InputStream> future = new CompletableFuture<>();
        try {
            future.complete(contentStore.getResource(contentId).getInputStream());
        } catch (IOException iox) {
            future.completeExceptionally(iox);
        }
        return future;
    }
}

then in the reader I can just call service.getContent(contentId).get() and I have an input stream that will work with the reader as it currently stands. This works because the transaction is opened, via the getInputStream(), in a different thread to the batch processing thread, meaning the contentStore transaction is not associated with the batch thread at this point.

This is clearly not a solution to the original question around returning an input stream starting at a specific offset but it does solve the transactional issue described in my previous post. At least it should for the majority of situations but I've now run into another issue. The input stream being returned from the content store wraps the inputStream.close() method for obvious good reason, this means that as soon as close() is called, the content store closes / cleans up the transaction. In the above completable future solution the inputStream.close() method is called from the batch reader thread (unlike when the stream was opened) meaning the session associated with the thread is removed as part of the content stores reliance on Springs transaction clean-up.

This is probably sufficient for most cases as you would normally call inputStream.close() from the reader.close() method which gets called by the batch framework only at the very end of the batch process so everything should be fine. I think it is worth pointing this out in case anyone else ends up here looking for a solution to any similar problem.

However, even this doesn't solve my current current issue. I need to wrap the input stream in a reader I that I have no control over which (in)conveniently calls close() on the wrapped input stream as soon as it has read the last byte. This is before my reader has had a chance to return the last item and complete the last batch cycle. The resulting transaction clean-up means I lose the content of the last batch cycle.

More thought required methinks.

P.S. DB storage is mandatory unfortunately.

@andye2004
Copy link
Contributor Author

Sorry should've said, feel free to close this off. I'm happy the original question has been answered. I'll update the thread if I find a solution that solves my last remaining problem.

Thanks again for providing such an excellent library and the speedy response to my question. Apologies for drivelling on in such long posts :), bad habit.

@andye2004
Copy link
Contributor Author

OK, so this was easier than I thought. Final two problems were

  • prevent the underlying reader from closing the input stream until I wanted it to be closed
  • ensure that closing the input stream did not happen on the batch processing thread

I thought about wrapping the input stream returned from the content store, but then realised someone else must've done this already and they have, loads of implementations out there. Then it was just a case of not calling close in the batch reader.close() method and we'd be good.

Final service and non-closing InputStream wrapper:-

@Service
public class AsyncStreamingService {

    private final FileContentStore contentStore;

    @Autowired
    public AsyncStreamingService(
            @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") FileContentStore contentStore) {

        this.contentStore = contentStore;
    }

    @SuppressWarnings("java:S3242")
    public void closeContentStream(InputStream is) {
        CompletableFuture.runAsync(() -> {
            try {
                is.close();
            } catch (IOException iox) {
                // ignore
            }
        });
    }

    public CompletableFuture<InputStream> getContentStream(String contentId) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return contentStore.getResource(contentId).getInputStream();
            } catch (IOException e) {
                throw new CompletionException(e);
            }
        });
    }
}

@SuppressWarnings("java:S4929")
public class NonClosingInputStream extends FilterInputStream {
    protected NonClosingInputStream(InputStream in) {
        super(in);
    }

    @Override
    public void close() throws IOException {
        // prevents the wrapped input stream from being closed
        // we will close the wrapped instance when read.close()
        // is called. We need this as the XMLStreamReader calls
        // close automatically when it gets the to
    }
}

With all that, all I need to do is call the getContentStream() method from the reader.open() and wrap it in the NonClosingInputStream. When we're all done and the batch framework calls reader.close() I just need to call `closeContentStream()'.

Works like a dream.

Thanks again for all your help!

@paulcwarren
Copy link
Owner

Thanks for the very detailed write up @andye2004. As I try to build a community around Spring Content it is super helpful to have these kind of write up on issues allowing others to advance faster and Spring Content to gain more usage so I really appreciate the time you took to do that.

And thanks also for going the extra mile to figure it out. That's a clever use of the CompletableFuture API leveraging. Its characteristic to run arbitrary code in a different thread led to a simply, clean solution I think. A good outcome.

I'll go ahead and close this issue but if you ever spot a better implementation alternative for the blob resource that servers the basic transaction support use case and these sorts of batch use cases then please don't be spy. I'd be all ears.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants