Skip to content

Commit

Permalink
Merge pull request #28 from vinted/fix/retry-aborted-appends
Browse files Browse the repository at this point in the history
fix: retry when stream writer is closed
  • Loading branch information
gintarasm committed Apr 8, 2024
2 parents 04101dc + 0b79ed2 commit 8dc88ce
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ protected void submitRequestEntries(List<StreamRequest> list, Consumer<List<Stre
logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage());
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case ABORTED: {
if (isStreamWriterClosed(t)) {
logger.info("Trace-id {} steam writer closed. Retrying: {}", traceId, t.getMessage());
return retry(t, traceId, request);
} else {
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
throw new AsyncWriterException(traceId, status.getCode(), t);
}
}
case UNAVAILABLE: {
this.recreateStreamWriter(traceId, request.getStream(), writer.getWriterId(), request.getTable());
return retry(t, traceId, request);
Expand Down Expand Up @@ -194,6 +203,20 @@ protected void submitRequestEntries(List<StreamRequest> list, Consumer<List<Stre

}

private boolean isStreamWriterClosed(Throwable t) {
if (t == null) {
return false;
}

for (Throwable cause = t; cause != null; cause = cause.getCause()) {
if (cause instanceof Exceptions.StreamWriterClosedException) {
return true;
}
}

return false;
}

private boolean isMaximumRequestCallbackWaitTimeExceededException(Throwable t) {
if (t == null) {
return false;
Expand Down
17 changes: 17 additions & 0 deletions src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTim
assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3);
}

@Test
public void shouldRetryFailingWithAbortedStatus(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenStreamWriterClosed();
mockClientProvider.givenRetryCount(2);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ public void givenFailingAppendWithStatus(Status status) throws Descriptors.Descr
.thenReturn(createAppendRowsResponseError(new StatusException(status)));
}

public void givenStreamWriterClosed() throws Descriptors.DescriptorValidationException, IOException {
var response = createAppendRowsResponseError(
new StatusException(Status.ABORTED.withCause(createStreamWriterClosedException()))
);
Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any()))
.thenReturn(response);
}

public void givenTimeoutForAppend() throws Descriptors.DescriptorValidationException, IOException {
Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any()))
.thenReturn(createTimeoutAppendRowsResponse());
Expand Down Expand Up @@ -148,6 +156,14 @@ private static Exceptions.OffsetAlreadyExists createOffsetAlreadyExistsException
return offsetMock;
}

private static Exceptions.StreamWriterClosedException createStreamWriterClosedException() {
var offsetMock = Mockito.mock(Exceptions.StreamWriterClosedException.class);
Mockito.when(offsetMock.getStatus()).thenReturn(Status.ABORTED);
Mockito.when(offsetMock.getStreamName()).thenReturn("stream");
Mockito.when(offsetMock.getCause()).thenReturn(new RuntimeException());
return offsetMock;
}

private static SettableApiFuture<AppendRowsResponse> createAppendRowsResponse() {
SettableApiFuture<AppendRowsResponse> result = SettableApiFuture.create();
result.set(AppendRowsResponse.newBuilder().buildPartial());
Expand Down

0 comments on commit 8dc88ce

Please sign in to comment.