Skip to content

Commit

Permalink
Return CommitStateUnknownException in case of an internal server erro…
Browse files Browse the repository at this point in the history
…r to prevent commit abort leading to data loss. (#63)

* Fix error handling to prevent commit abort from causing data loss

* Update to capture responseBody as part of CommitStateUnknownException

* Empty commit

* Fix typo
  • Loading branch information
maluchari committed Apr 26, 2024
1 parent 8ec3429 commit 52abbc5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void testCreateUpdateTableErrorHandle() {
when(base.snapshots()).thenReturn(snapshotList);

// Ensure tableApi throw expected exception

when(mockTableApi.updateTableV1(anyString(), anyString(), any()))
.thenReturn(Mono.error(mock(WebClientResponseException.ServiceUnavailable.class)));
Assertions.assertThrows(
Expand All @@ -99,6 +100,10 @@ public void testCreateUpdateTableErrorHandle() {
.thenReturn(Mono.error(mock(WebClientResponseException.NotFound.class)));
Assertions.assertThrows(
NoSuchTableException.class, () -> openHouseTableOperations.doCommit(base, metadata));
when(mockTableApi.updateTableV1(anyString(), anyString(), any()))
.thenReturn(Mono.error(mock(WebClientResponseException.InternalServerError.class)));
Assertions.assertThrows(
CommitStateUnknownException.class, () -> openHouseTableOperations.doCommit(base, metadata));
when(mockTableApi.updateTableV1(anyString(), anyString(), any()))
.thenReturn(Mono.error(mock(WebClientRequestException.class)));
Assertions.assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,16 @@ static Mono<GetTableResponseBody> handleCreateUpdateHttpError(
return Mono.error(
new CommitFailedException(
casted, casted.getStatusCode().value() + " , " + casted.getResponseBodyAsString()));

} else if (e instanceof WebClientResponseException.GatewayTimeout
| e instanceof WebClientResponseException.ServiceUnavailable) {
return Mono.error(new CommitStateUnknownException(e));
|| e instanceof WebClientResponseException.ServiceUnavailable
|| e instanceof WebClientResponseException.InternalServerError) {
/**
* This is done to avoid any data loss that could occur when a commit aborts at the caller
* leads to deletion of iceberg metadata files.
*/
WebClientResponseException casted = (WebClientResponseException) e;
return Mono.error(new CommitStateUnknownException(casted.getResponseBodyAsString(), casted));
} else if (e instanceof WebClientResponseException.BadRequest) {
WebClientResponseException casted = (WebClientResponseException) e;
return Mono.error(
Expand All @@ -288,9 +295,10 @@ static Mono<GetTableResponseBody> handleCreateUpdateHttpError(
return Mono.error(new WebClientResponseWithMessageException((WebClientResponseException) e));
} else {
/**
* This serves as a catch-all for any other exceptions that are not
* WebClientResponseException. It helps in skipping any unexpected cleanup that could occur
* when a commit aborts at the caller, thus avoiding any potential data loss.
* This serves as a catch-all for any unexpected exceptions that could occur during doCommit,
* (i.e) exceptions that are not WebClientResponseException. This is a conservative approach
* to skip any unexpected cleanup that could occur when a commit aborts at the caller, thus
* avoiding any potential data loss.
*/
return Mono.error(new CommitStateUnknownException(e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static com.linkedin.openhouse.spark.SparkTestBase.*;

import com.linkedin.openhouse.javaclient.OpenHouseTableOperations;
import com.linkedin.openhouse.relocated.org.springframework.web.reactive.function.client.WebClientResponseException;
import com.linkedin.openhouse.spark.SparkTestBase;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -133,7 +132,7 @@ public void testCommitFailedException() {
@Test
public void testSurfaceRestExceptions() {
mockTableService.enqueue(mockResponse(500, "{\"message\":\"Internal Server Error\"}"));
Assertions.assertThrows(WebClientResponseException.class, () -> ops.doCommit(null, base));
Assertions.assertThrows(CommitStateUnknownException.class, () -> ops.doCommit(null, base));
}

@Test
Expand Down

0 comments on commit 52abbc5

Please sign in to comment.