Skip to content

Commit

Permalink
Make fetching tokens on segment reader creation async (#6622)
Browse files Browse the repository at this point in the history
Make fetching tokens on segment reader creation async
Signed-off-by: Tom Kaitchuck <tom.kaitchuck@emc.com>
  • Loading branch information
tkaitchuck committed Mar 7, 2022
1 parent 136e14e commit 34e4342
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ public ByteStreamReader createByteStreamReader(String streamName) {
}

private ByteStreamReader createByteStreamReaders(Segment segment) {
String delegationToken = Futures.getAndHandleExceptions(controller.getOrRefreshDelegationTokenFor(segment.getScope(),
segment.getStream().getStreamName(), AccessOperation.READ), RuntimeException::new);

DelegationTokenProvider tokenProvider = DelegationTokenProviderFactory.create(delegationToken, controller, segment, AccessOperation.READ);
DelegationTokenProvider tokenProvider = DelegationTokenProviderFactory.create(controller, segment, AccessOperation.READ);
tokenProvider.retrieveToken();
SegmentMetadataClient metaClient = metaStreamFactory.createSegmentMetadataClient(segment, tokenProvider);
long startOffset = Futures.getThrowingException(metaClient.getSegmentInfo()).getStartingOffset();
return new ByteStreamReaderImpl(inputStreamFactory.createInputStreamForSegment(segment, tokenProvider, startOffset),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

import com.google.common.annotations.VisibleForTesting;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.control.impl.Controller;
import io.pravega.common.MathHelpers;
import io.pravega.common.concurrent.Futures;
import java.util.concurrent.Semaphore;

import io.pravega.shared.security.auth.AccessOperation;
import java.util.concurrent.Semaphore;
import lombok.RequiredArgsConstructor;

@VisibleForTesting
Expand Down Expand Up @@ -55,10 +53,9 @@ public EventSegmentReader createEventReaderForSegment(Segment segment, int buffe
}

private EventSegmentReader getEventSegmentReader(Segment segment, Semaphore hasData, long startOffset, long endOffset, int bufferSize) {
String delegationToken = Futures.getAndHandleExceptions(controller.getOrRefreshDelegationTokenFor(segment.getScope(),
segment.getStream().getStreamName(), AccessOperation.READ), RuntimeException::new);
AsyncSegmentInputStreamImpl async = new AsyncSegmentInputStreamImpl(controller, cp, segment,
DelegationTokenProviderFactory.create(delegationToken, controller, segment, AccessOperation.READ), hasData);
DelegationTokenProvider tokenProvider = DelegationTokenProviderFactory.create(controller, segment, AccessOperation.READ);
tokenProvider.retrieveToken();
AsyncSegmentInputStreamImpl async = new AsyncSegmentInputStreamImpl(controller, cp, segment, tokenProvider, hasData);
async.getConnection(); //Sanity enforcement
bufferSize = MathHelpers.minMax(bufferSize, SegmentInputStreamImpl.MIN_BUFFER_SIZE, SegmentInputStreamImpl.MAX_BUFFER_SIZE);
return getEventSegmentReader(async, startOffset, endOffset, bufferSize);
Expand Down

0 comments on commit 34e4342

Please sign in to comment.