Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Mar 29, 2024
1 parent e6a7a01 commit 12c14fa
Showing 1 changed file with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public void writeCore(CosmosAsyncContainer container, List<SinkOperation> sinkOp
private void upsertWithRetry(CosmosAsyncContainer container, SinkOperation sinkOperation) {
executeWithRetry(
(operation) -> {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig);
CosmosItemRequestOptions cosmosItemRequestOptions = this.getCosmosItemRequestOptions();
return container.upsertItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then();
},
(throwable) -> false, // no exceptions should be ignored
Expand All @@ -82,8 +81,7 @@ private void upsertWithRetry(CosmosAsyncContainer container, SinkOperation sinkO
private void createWithRetry(CosmosAsyncContainer container, SinkOperation sinkOperation) {
executeWithRetry(
(operation) -> {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig);
CosmosItemRequestOptions cosmosItemRequestOptions = this.getCosmosItemRequestOptions();
return container.createItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then();
},
(throwable) -> KafkaCosmosExceptionsHelper.isResourceExistsException(throwable),
Expand All @@ -94,9 +92,8 @@ private void createWithRetry(CosmosAsyncContainer container, SinkOperation sinkO
private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkOperation sinkOperation, String etag) {
executeWithRetry(
(operation) -> {
CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions();
CosmosItemRequestOptions itemRequestOptions = this.getCosmosItemRequestOptions();
itemRequestOptions.setIfMatchETag(etag);
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig);

return ImplementationBridgeHelpers
.CosmosAsyncContainerHelper
Expand All @@ -121,16 +118,14 @@ private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkO
private void deleteWithRetry(CosmosAsyncContainer container, SinkOperation sinkOperation, boolean onlyIfModified) {
executeWithRetry(
(operation) -> {
CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions();
CosmosItemRequestOptions itemRequestOptions = this.getCosmosItemRequestOptions();
if (onlyIfModified) {
String etag = this.getEtag(operation.getSinkRecord().value());
if (StringUtils.isNotEmpty(etag)) {
itemRequestOptions.setIfMatchETag(etag);
}
}

CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig);

return ImplementationBridgeHelpers
.CosmosAsyncContainerHelper
.getCosmosAsyncContainerAccessor()
Expand Down Expand Up @@ -198,5 +193,11 @@ private void executeWithRetry(
.subscribeOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC)
.block();
}

private CosmosItemRequestOptions getCosmosItemRequestOptions() {
CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions();
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig);
return itemRequestOptions;
}
}

0 comments on commit 12c14fa

Please sign in to comment.