diff --git a/v3/pom.xml b/v3/pom.xml
index 140ae3c5..57626e12 100644
--- a/v3/pom.xml
+++ b/v3/pom.xml
@@ -11,7 +11,7 @@
skyflow-java
- 3.0.0-beta.9-dev.e40880f
+ 2.0.4-dev.a98436e
jar
${project.groupId}:${project.artifactId}
Skyflow V3 SDK for the Java programming language
diff --git a/v3/src/main/java/com/skyflow/enums/CustomHeaderKey.java b/v3/src/main/java/com/skyflow/enums/CustomHeaderKey.java
new file mode 100644
index 00000000..8e6190c6
--- /dev/null
+++ b/v3/src/main/java/com/skyflow/enums/CustomHeaderKey.java
@@ -0,0 +1,18 @@
+package com.skyflow.enums;
+
+public enum CustomHeaderKey {
+ SkyflowAccountID("x-skyflow-account-id"),
+ SkyflowAccountName("x-skyflow-account-name"),
+ RequestIDHeader("x-request-id");
+
+ private final String value;
+
+ CustomHeaderKey(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return this.value;
+ }
+}
\ No newline at end of file
diff --git a/v3/src/main/java/com/skyflow/utils/Utils.java b/v3/src/main/java/com/skyflow/utils/Utils.java
index 131772c7..cdd0b03b 100644
--- a/v3/src/main/java/com/skyflow/utils/Utils.java
+++ b/v3/src/main/java/com/skyflow/utils/Utils.java
@@ -77,6 +77,10 @@ public static List createDetokenizeBatches(V1FlowDetoke
}
public static ErrorRecord createErrorRecord(Map recordMap, int indexNumber) {
+ return createErrorRecord(recordMap, indexNumber, null);
+ }
+
+ public static ErrorRecord createErrorRecord(Map recordMap, int indexNumber, String requestId) {
ErrorRecord err = null;
if (recordMap != null) {
int code = 500;
@@ -84,7 +88,6 @@ public static ErrorRecord createErrorRecord(Map recordMap, int i
code = (Integer) recordMap.get("http_code");
} else if (recordMap.containsKey("httpCode")) {
code = (Integer) recordMap.get("httpCode");
-
} else {
if (recordMap.containsKey("statusCode")) {
code = (Integer) recordMap.get("statusCode");
@@ -92,11 +95,17 @@ public static ErrorRecord createErrorRecord(Map recordMap, int i
}
String message = recordMap.containsKey("error") ? (String) recordMap.get("error") :
recordMap.containsKey("message") ? (String) recordMap.get("message") : "Unknown error";
- err = new ErrorRecord(indexNumber, message, code);
+ err = new ErrorRecord(indexNumber, message, code, requestId);
}
return err;
}
+ private static String extractRequestId(Map> headers) {
+ if (headers == null) return null;
+ List ids = headers.get(BaseConstants.REQUEST_ID_HEADER_KEY);
+ return (ids == null || ids.isEmpty()) ? null : ids.get(0);
+ }
+
public static List handleBatchException(
Throwable ex, List batch, int batchNumber, int batchSize
) {
@@ -104,7 +113,9 @@ public static List handleBatchException(
Throwable cause = ex.getCause();
if (cause instanceof ApiClientApiException) {
ApiClientApiException apiException = (ApiClientApiException) cause;
- Map responseBody = (Map) apiException.body();
+ String requestId = extractRequestId(apiException.headers());
+ Object rawBody = apiException.body();
+ Map responseBody = (rawBody instanceof Map) ? (Map) rawBody : null;
int indexNumber = batchNumber > 0 ? batchNumber * batchSize : 0;
if (responseBody != null) {
if (responseBody.containsKey("records")) {
@@ -114,21 +125,31 @@ public static List handleBatchException(
for (Object record : recordsList) {
if (record instanceof Map) {
Map recordMap = (Map) record;
- ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber);
+ ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber, requestId);
errorRecords.add(err);
indexNumber++;
}
}
}
} else if (responseBody.containsKey("error")) {
- Map recordMap = (Map) responseBody.get("error");
+ Object errField = responseBody.get("error");
+ Map recordMap = (errField instanceof Map) ? (Map) errField : null;
+ String fallbackMsg = (errField instanceof String) ? (String) errField : null;
for (int j = 0; j < batch.size(); j++) {
- ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber);
+ ErrorRecord err = (recordMap != null)
+ ? Utils.createErrorRecord(recordMap, indexNumber, requestId)
+ : new ErrorRecord(indexNumber, fallbackMsg != null ? fallbackMsg : apiException.getMessage(), apiException.statusCode(), requestId);
errorRecords.add(err);
indexNumber++;
}
}
}
+ if (errorRecords.isEmpty()) {
+ for (int j = 0; j < batch.size(); j++) {
+ errorRecords.add(new ErrorRecord(indexNumber, apiException.getMessage(), apiException.statusCode(), requestId));
+ indexNumber++;
+ }
+ }
} else {
int indexNumber = batchNumber > 0 ? batchNumber * batchSize : 0;
for (int j = 0; j < batch.size(); j++) {
@@ -147,7 +168,9 @@ public static List handleDetokenizeBatchException(
Throwable cause = ex.getCause();
if (cause instanceof ApiClientApiException) {
ApiClientApiException apiException = (ApiClientApiException) cause;
- Map responseBody = (Map) apiException.body();
+ String requestId = extractRequestId(apiException.headers());
+ Object rawBody = apiException.body();
+ Map responseBody = (rawBody instanceof Map) ? (Map) rawBody : null;
int indexNumber = batchNumber * batchSize;
if (responseBody != null) {
if (responseBody.containsKey("response")) {
@@ -157,21 +180,33 @@ public static List handleDetokenizeBatchException(
for (Object record : recordsList) {
if (record instanceof Map) {
Map recordMap = (Map) record;
- ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber);
+ ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber, requestId);
errorRecords.add(err);
indexNumber++;
}
}
}
} else if (responseBody.containsKey("error")) {
- Map recordMap = (Map) responseBody.get("error");
- for (int j = 0; j < batch.getTokens().get().size(); j++) {
- ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber);
+ Object errField = responseBody.get("error");
+ Map recordMap = (errField instanceof Map) ? (Map) errField : null;
+ String fallbackMsg = (errField instanceof String) ? (String) errField : null;
+ int tokenCount = batch.getTokens().isPresent() ? batch.getTokens().get().size() : 0;
+ for (int j = 0; j < tokenCount; j++) {
+ ErrorRecord err = (recordMap != null)
+ ? Utils.createErrorRecord(recordMap, indexNumber, requestId)
+ : new ErrorRecord(indexNumber, fallbackMsg != null ? fallbackMsg : apiException.getMessage(), apiException.statusCode(), requestId);
errorRecords.add(err);
indexNumber++;
}
}
}
+ if (errorRecords.isEmpty()) {
+ int tokenCount = batch.getTokens().isPresent() ? batch.getTokens().get().size() : 0;
+ for (int j = 0; j < tokenCount; j++) {
+ errorRecords.add(new ErrorRecord(indexNumber, apiException.getMessage(), apiException.statusCode(), requestId));
+ indexNumber++;
+ }
+ }
} else {
int indexNumber = batchNumber * batchSize;
for (int j = 0; j < batch.getTokens().get().size(); j++) {
@@ -184,15 +219,20 @@ public static List handleDetokenizeBatchException(
}
public static DetokenizeResponse formatDetokenizeResponse(com.skyflow.generated.rest.types.V1FlowDetokenizeResponse response, int batch, int batchSize) {
- if (response != null) {
+ return formatDetokenizeResponse(response, batch, batchSize, null);
+ }
+
+ public static DetokenizeResponse formatDetokenizeResponse(com.skyflow.generated.rest.types.V1FlowDetokenizeResponse response, int batch, int batchSize, Map> headers) {
+ if (response != null && response.getResponse().isPresent()) {
+ String requestId = extractRequestId(headers);
List record = response.getResponse().get();
List errorRecords = new ArrayList<>();
List successRecords = new ArrayList<>();
int indexNumber = batch * batchSize;
- int recordsSize = response.getResponse().get().size();
+ int recordsSize = record.size();
for (int index = 0; index < recordsSize; index++) {
if (record.get(index).getError().isPresent()) {
- ErrorRecord errorRecord = new ErrorRecord(indexNumber, record.get(index).getError().get(), record.get(index).getHttpCode().get());
+ ErrorRecord errorRecord = new ErrorRecord(indexNumber, record.get(index).getError().get(), record.get(index).getHttpCode().get(), requestId);
errorRecords.add(errorRecord);
} else {
com.skyflow.vault.data.DetokenizeResponseObject success = new com.skyflow.vault.data.DetokenizeResponseObject(indexNumber, record.get(index).getToken().orElse(null), record.get(index).getValue().orElse(null), record.get(index).getTokenGroupName().orElse(null), record.get(index).getError().orElse(null), record.get(index).getMetadata().orElse(null));
@@ -200,23 +240,27 @@ public static DetokenizeResponse formatDetokenizeResponse(com.skyflow.generated.
}
indexNumber++;
}
- DetokenizeResponse formattedResponse = new DetokenizeResponse(successRecords, errorRecords);
- return formattedResponse;
+ return new DetokenizeResponse(successRecords, errorRecords);
}
return null;
}
public static com.skyflow.vault.data.InsertResponse formatResponse(V1InsertResponse response, int batch, int batchSize) {
+ return formatResponse(response, batch, batchSize, null);
+ }
+
+ public static com.skyflow.vault.data.InsertResponse formatResponse(V1InsertResponse response, int batch, int batchSize, Map> headers) {
com.skyflow.vault.data.InsertResponse formattedResponse = null;
List successRecords = new ArrayList<>();
List errorRecords = new ArrayList<>();
if (response != null) {
+ String requestId = extractRequestId(headers);
List record = response.getRecords().get();
int indexNumber = batch * batchSize;
int recordsSize = response.getRecords().get().size();
for (int index = 0; index < recordsSize; index++) {
if (record.get(index).getError().isPresent()) {
- ErrorRecord errorRecord = new ErrorRecord(indexNumber, record.get(index).getError().get(), record.get(index).getHttpCode().get());
+ ErrorRecord errorRecord = new ErrorRecord(indexNumber, record.get(index).getError().get(), record.get(index).getHttpCode().get(), requestId);
errorRecords.add(errorRecord);
} else {
Map> tokensMap = null;
@@ -312,7 +356,9 @@ public static List handleDeleteTokensBatchException(
Throwable cause = ex.getCause();
if (cause instanceof ApiClientApiException) {
ApiClientApiException apiException = (ApiClientApiException) cause;
- Map responseBody = (Map) apiException.body();
+ String requestId = extractRequestId(apiException.headers());
+ Object rawBody = apiException.body();
+ Map responseBody = (rawBody instanceof Map) ? (Map) rawBody : null;
int indexNumber = batchNumber * batchSize;
if (responseBody != null) {
if (responseBody.containsKey("tokens")) {
@@ -322,21 +368,33 @@ public static List handleDeleteTokensBatchException(
for (Object record : recordsList) {
if (record instanceof Map) {
Map recordMap = (Map) record;
- ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber);
+ ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber, requestId);
errorRecords.add(err);
indexNumber++;
}
}
}
} else if (responseBody.containsKey("error")) {
- Map recordMap = (Map) responseBody.get("error");
- for (int j = 0; j < batch.getTokens().get().size(); j++) {
- ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber);
+ Object errField = responseBody.get("error");
+ Map recordMap = (errField instanceof Map) ? (Map) errField : null;
+ String fallbackMsg = (errField instanceof String) ? (String) errField : null;
+ int tokenCount = batch.getTokens().isPresent() ? batch.getTokens().get().size() : 0;
+ for (int j = 0; j < tokenCount; j++) {
+ ErrorRecord err = (recordMap != null)
+ ? Utils.createErrorRecord(recordMap, indexNumber, requestId)
+ : new ErrorRecord(indexNumber, fallbackMsg != null ? fallbackMsg : apiException.getMessage(), apiException.statusCode(), requestId);
errorRecords.add(err);
indexNumber++;
}
}
}
+ if (errorRecords.isEmpty()) {
+ int tokenCount = batch.getTokens().isPresent() ? batch.getTokens().get().size() : 0;
+ for (int j = 0; j < tokenCount; j++) {
+ errorRecords.add(new ErrorRecord(indexNumber, apiException.getMessage(), apiException.statusCode(), requestId));
+ indexNumber++;
+ }
+ }
} else {
int indexNumber = batchNumber * batchSize;
for (int j = 0; j < batch.getTokens().get().size(); j++) {
@@ -350,20 +408,25 @@ public static List handleDeleteTokensBatchException(
public static DeleteTokensResponse formatDeleteTokensResponse(
com.skyflow.generated.rest.types.V1FlowDeleteTokenResponse response, int batch, int batchSize) {
+ return formatDeleteTokensResponse(response, batch, batchSize, null);
+ }
+
+ public static DeleteTokensResponse formatDeleteTokensResponse(
+ com.skyflow.generated.rest.types.V1FlowDeleteTokenResponse response, int batch, int batchSize, Map> headers) {
if (response != null && response.getTokens().isPresent()) {
+ String requestId = extractRequestId(headers);
List records = response.getTokens().get();
List errorRecords = new ArrayList<>();
List successRecords = new ArrayList<>();
int indexNumber = batch * batchSize;
for (com.skyflow.generated.rest.types.V1DeleteTokenResponseObject record : records) {
- // The API returns the token string in "value" field regardless of success or error
String tokenValue = record.getValue().orElse(null);
if (record.getError().isPresent()
&& record.getError().get() != null
&& !record.getError().get().isEmpty()
&& record.getHttpCode().orElse(200) != 200) {
ErrorRecord errorRecord = new ErrorRecord(indexNumber, record.getError().get(),
- record.getHttpCode().orElse(500));
+ record.getHttpCode().orElse(500), requestId);
errorRecords.add(errorRecord);
} else {
DeleteTokensSuccess success = new DeleteTokensSuccess(indexNumber, tokenValue);
@@ -402,7 +465,9 @@ public static List handleTokenizeBatchException(
Throwable cause = ex.getCause();
if (cause instanceof ApiClientApiException) {
ApiClientApiException apiException = (ApiClientApiException) cause;
- Map responseBody = (Map) apiException.body();
+ String requestId = extractRequestId(apiException.headers());
+ Object rawBody = apiException.body();
+ Map responseBody = (rawBody instanceof Map) ? (Map) rawBody : null;
int indexNumber = batchNumber * batchSize;
if (responseBody != null) {
if (responseBody.containsKey("response")) {
@@ -412,22 +477,33 @@ public static List handleTokenizeBatchException(
for (Object record : recordsList) {
if (record instanceof Map) {
Map recordMap = (Map) record;
- ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber);
+ ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber, requestId);
errorRecords.add(err);
indexNumber++;
}
}
}
} else if (responseBody.containsKey("error")) {
- Map recordMap = (Map) responseBody.get("error");
+ Object errField = responseBody.get("error");
+ Map recordMap = (errField instanceof Map) ? (Map) errField : null;
+ String fallbackMsg = (errField instanceof String) ? (String) errField : null;
int batchDataSize = batch.getData().isPresent() ? batch.getData().get().size() : 0;
for (int j = 0; j < batchDataSize; j++) {
- ErrorRecord err = Utils.createErrorRecord(recordMap, indexNumber);
+ ErrorRecord err = (recordMap != null)
+ ? Utils.createErrorRecord(recordMap, indexNumber, requestId)
+ : new ErrorRecord(indexNumber, fallbackMsg != null ? fallbackMsg : apiException.getMessage(), apiException.statusCode(), requestId);
errorRecords.add(err);
indexNumber++;
}
}
}
+ if (errorRecords.isEmpty()) {
+ int batchDataSize = batch.getData().isPresent() ? batch.getData().get().size() : 0;
+ for (int j = 0; j < batchDataSize; j++) {
+ errorRecords.add(new ErrorRecord(indexNumber, apiException.getMessage(), apiException.statusCode(), requestId));
+ indexNumber++;
+ }
+ }
} else {
int indexNumber = batchNumber * batchSize;
int batchDataSize = batch.getData().isPresent() ? batch.getData().get().size() : 0;
@@ -444,7 +520,15 @@ public static TokenizeResponse formatTokenizeResponse(
com.skyflow.generated.rest.types.V1FlowTokenizeResponse response,
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest batchRequest,
int batchNumber, int batchSize) {
+ return formatTokenizeResponse(response, batchRequest, batchNumber, batchSize, null);
+ }
+
+ public static TokenizeResponse formatTokenizeResponse(
+ com.skyflow.generated.rest.types.V1FlowTokenizeResponse response,
+ com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest batchRequest,
+ int batchNumber, int batchSize, Map> headers) {
if (response != null && response.getResponse().isPresent()) {
+ String requestId = extractRequestId(headers);
List flatList =
response.getResponse().get();
List requestData =
@@ -475,7 +559,7 @@ public static TokenizeResponse formatTokenizeResponse(
? ((Number) props.get("httpCode")).intValue() : 200;
if (errorMsg != null) {
- errorRecords.add(new ErrorRecord(inputRecordIndex, errorMsg, httpCode));
+ errorRecords.add(new ErrorRecord(inputRecordIndex, errorMsg, httpCode, requestId));
} else {
if (successEntry == null) {
successEntry = new TokenizeSuccess(inputRecordIndex, value);
diff --git a/v3/src/main/java/com/skyflow/utils/validations/Validations.java b/v3/src/main/java/com/skyflow/utils/validations/Validations.java
index 485b60d5..839c54ce 100644
--- a/v3/src/main/java/com/skyflow/utils/validations/Validations.java
+++ b/v3/src/main/java/com/skyflow/utils/validations/Validations.java
@@ -140,15 +140,15 @@ public static void validateDetokenizeRequest(DetokenizeRequest request) throws S
throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.DetokenizeRequestNull.getMessage());
}
List tokens = request.getTokens();
- if (tokens.size() > 10000) {
- LogUtil.printErrorLog(ErrorLogs.TOKENS_SIZE_EXCEED.getLog());
- throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.TokensSizeExceedError.getMessage());
- }
if (tokens == null || tokens.isEmpty()) {
LogUtil.printErrorLog(Utils.parameterizedString(
ErrorLogs.EMPTY_DETOKENIZE_DATA.getLog(), InterfaceName.DETOKENIZE.getName()
- ));
- throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyDetokenizeData.getMessage());
+ ));
+ throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.EmptyDetokenizeData.getMessage());
+ }
+ if (tokens.size() > 10000) {
+ LogUtil.printErrorLog(ErrorLogs.TOKENS_SIZE_EXCEED.getLog());
+ throw new SkyflowException(ErrorCode.INVALID_INPUT.getCode(), ErrorMessage.TokensSizeExceedError.getMessage());
}
for (int index = 0; index < tokens.size(); index++) {
String token = tokens.get(index);
diff --git a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java
index 74e6642b..40afd90b 100644
--- a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java
+++ b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java
@@ -1,5 +1,13 @@
package com.skyflow.vault.controller;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
@@ -8,6 +16,7 @@
import com.skyflow.config.VaultConfig;
import com.skyflow.errors.SkyflowException;
import com.skyflow.generated.rest.core.ApiClientApiException;
+import com.skyflow.generated.rest.core.ApiClientHttpResponse;
import com.skyflow.generated.rest.core.RequestOptions;
import com.skyflow.generated.rest.types.V1InsertRecordData;
import com.skyflow.generated.rest.types.V1InsertResponse;
@@ -19,21 +28,29 @@
import com.skyflow.utils.Utils;
import com.skyflow.utils.logger.LogUtil;
import com.skyflow.utils.validations.Validations;
-import com.skyflow.vault.data.*;
+import com.skyflow.vault.data.DeleteTokensOptions;
+import com.skyflow.vault.data.DeleteTokensRequest;
+import com.skyflow.vault.data.DeleteTokensResponse;
+import com.skyflow.vault.data.DeleteTokensSuccess;
+import com.skyflow.vault.data.DetokenizeOptions;
+import com.skyflow.vault.data.DetokenizeRequest;
+import com.skyflow.vault.data.DetokenizeResponse;
+import com.skyflow.vault.data.DetokenizeResponseObject;
+import com.skyflow.vault.data.ErrorRecord;
+import com.skyflow.vault.data.InsertOptions;
+import com.skyflow.vault.data.InsertRecord;
+import com.skyflow.vault.data.InsertRequest;
+import com.skyflow.vault.data.RequestContext;
+import com.skyflow.vault.data.RequestInterceptor;
+import com.skyflow.vault.data.Success;
+import com.skyflow.vault.data.TokenizeOptions;
import com.skyflow.vault.data.TokenizeRequest;
import com.skyflow.vault.data.TokenizeResponse;
import com.skyflow.vault.data.TokenizeSuccess;
+
import io.github.cdimascio.dotenv.Dotenv;
import io.github.cdimascio.dotenv.DotenvException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
public final class VaultController extends VaultClient {
private static final Gson gson = new GsonBuilder().serializeNulls().create();
private JsonObject metrics = Utils.getMetrics();
@@ -58,8 +75,13 @@ public VaultController(VaultConfig vaultConfig, Credentials credentials) throws
this.tokenizeConcurrencyLimit = Constants.TOKENIZE_CONCURRENCY_LIMIT;
}
+ // ── Insert ────────────────────────────────────────────────────────────────
+
public com.skyflow.vault.data.InsertResponse bulkInsert(InsertRequest insertRequest) throws SkyflowException {
- com.skyflow.vault.data.InsertResponse response;
+ return bulkInsert(insertRequest, null);
+ }
+
+ public com.skyflow.vault.data.InsertResponse bulkInsert(InsertRequest insertRequest, InsertOptions options) throws SkyflowException {
LogUtil.printInfoLog(InfoLogs.INSERT_TRIGGERED.getLog());
try {
LogUtil.printInfoLog(InfoLogs.VALIDATE_INSERT_REQUEST.getLog());
@@ -68,20 +90,28 @@ public com.skyflow.vault.data.InsertResponse bulkInsert(InsertRequest insertRequ
setBearerToken();
com.skyflow.generated.rest.resources.flowservice.requests.V1InsertRequest request = super.getBulkInsertRequestBody(insertRequest, super.getVaultConfig());
-
- response = this.processSync(request, insertRequest.getRecords());
- return response;
+ RequestInterceptor interceptor = options != null ? options.getInterceptor() : null;
+ return this.processSync(request, insertRequest.getRecords(), interceptor);
} catch (ApiClientApiException e) {
String bodyString = gson.toJson(e.body());
LogUtil.printErrorLog(ErrorLogs.INSERT_RECORDS_REJECTED.getLog());
throw new SkyflowException(e.statusCode(), e, e.headers(), bodyString);
- } catch (ExecutionException | InterruptedException e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
LogUtil.printErrorLog(ErrorLogs.INSERT_RECORDS_REJECTED.getLog());
throw new SkyflowException(e.getMessage());
+ } catch (ExecutionException e) {
+ LogUtil.printErrorLog(ErrorLogs.INSERT_RECORDS_REJECTED.getLog());
+ Throwable cause = e.getCause();
+ throw new SkyflowException(cause != null && cause.getMessage() != null ? cause.getMessage() : e.getMessage());
}
}
public CompletableFuture bulkInsertAsync(InsertRequest insertRequest) throws SkyflowException {
+ return bulkInsertAsync(insertRequest, null);
+ }
+
+ public CompletableFuture bulkInsertAsync(InsertRequest insertRequest, InsertOptions options) throws SkyflowException {
LogUtil.printInfoLog(InfoLogs.INSERT_TRIGGERED.getLog());
try {
LogUtil.printInfoLog(InfoLogs.VALIDATE_INSERT_REQUEST.getLog());
@@ -90,13 +120,13 @@ public CompletableFuture bulkInsertAsync(
setBearerToken();
com.skyflow.generated.rest.resources.flowservice.requests.V1InsertRequest request = super.getBulkInsertRequestBody(insertRequest, super.getVaultConfig());
+ RequestInterceptor interceptor = options != null ? options.getInterceptor() : null;
List errorRecords = Collections.synchronizedList(new ArrayList<>());
- List> futures = this.insertBatchFutures(request, errorRecords);
+ List> futures = this.insertBatchFutures(request, errorRecords, interceptor);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List successRecords = new ArrayList<>();
-// List errorRecords = new ArrayList<>();
for (CompletableFuture future : futures) {
com.skyflow.vault.data.InsertResponse futureResponse = future.join();
@@ -119,29 +149,39 @@ public CompletableFuture bulkInsertAsync(
}
}
+ // ── Detokenize ────────────────────────────────────────────────────────────
+
public DetokenizeResponse bulkDetokenize(DetokenizeRequest detokenizeRequest) throws SkyflowException {
+ return bulkDetokenize(detokenizeRequest, null);
+ }
+
+ public DetokenizeResponse bulkDetokenize(DetokenizeRequest detokenizeRequest, DetokenizeOptions options) throws SkyflowException {
LogUtil.printInfoLog(InfoLogs.DETOKENIZE_TRIGGERED.getLog());
try {
- DetokenizeResponse response;
configureDetokenizeConcurrencyAndBatchSize(detokenizeRequest.getTokens().size());
LogUtil.printInfoLog(InfoLogs.VALIDATE_DETOKENIZE_REQUEST.getLog());
Validations.validateDetokenizeRequest(detokenizeRequest);
setBearerToken();
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDetokenizeRequest request = super.getDetokenizeRequestBody(detokenizeRequest);
-
- response = this.processDetokenizeSync(request, detokenizeRequest.getTokens());
- return response;
+ RequestInterceptor interceptor = options != null ? options.getInterceptor() : null;
+ return this.processDetokenizeSync(request, detokenizeRequest.getTokens(), interceptor);
} catch (ApiClientApiException e) {
String bodyString = gson.toJson(e.body());
LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
throw new SkyflowException(e.statusCode(), e, e.headers(), bodyString);
- } catch (ExecutionException | InterruptedException e) {
- LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SkyflowException(e.getMessage());
+ } catch (ExecutionException e) {
throw new SkyflowException(e.getMessage());
}
}
public CompletableFuture bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest) throws SkyflowException {
+ return bulkDetokenizeAsync(detokenizeRequest, null);
+ }
+
+ public CompletableFuture bulkDetokenizeAsync(DetokenizeRequest detokenizeRequest, DetokenizeOptions options) throws SkyflowException {
LogUtil.printInfoLog(InfoLogs.DETOKENIZE_TRIGGERED.getLog());
ExecutorService executor = Executors.newFixedThreadPool(detokenizeConcurrencyLimit);
try {
@@ -150,16 +190,16 @@ public CompletableFuture bulkDetokenizeAsync(DetokenizeReque
Validations.validateDetokenizeRequest(detokenizeRequest);
setBearerToken();
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDetokenizeRequest request = super.getDetokenizeRequestBody(detokenizeRequest);
+ RequestInterceptor interceptor = options != null ? options.getInterceptor() : null;
LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
List errorTokens = Collections.synchronizedList(new ArrayList<>());
List successRecords = new ArrayList<>();
- // Create batches
List batches = Utils.createDetokenizeBatches(request, detokenizeBatchSize);
- List> futures = this.detokenizeBatchFutures(executor, batches, errorTokens);
+ List> futures = this.detokenizeBatchFutures(executor, batches, errorTokens, interceptor);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
for (CompletableFuture future : futures) {
@@ -177,6 +217,13 @@ public CompletableFuture bulkDetokenizeAsync(DetokenizeReque
executor.shutdown();
return new DetokenizeResponse(successRecords, errorTokens, detokenizeRequest.getTokens());
});
+ } catch (ApiClientApiException e) {
+ String bodyString = gson.toJson(e.body());
+ LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
+ throw new SkyflowException(e.statusCode(), e, e.headers(), bodyString);
+ } catch (SkyflowException e) {
+ LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
+ throw e;
} catch (Exception e) {
LogUtil.printErrorLog(ErrorLogs.DETOKENIZE_REQUEST_REJECTED.getLog());
throw new SkyflowException(e.getMessage());
@@ -185,7 +232,13 @@ public CompletableFuture bulkDetokenizeAsync(DetokenizeReque
}
}
+ // ── Delete Tokens ─────────────────────────────────────────────────────────
+
public DeleteTokensResponse bulkDeleteTokens(DeleteTokensRequest deleteTokensRequest) throws SkyflowException {
+ return bulkDeleteTokens(deleteTokensRequest, null);
+ }
+
+ public DeleteTokensResponse bulkDeleteTokens(DeleteTokensRequest deleteTokensRequest, DeleteTokensOptions options) throws SkyflowException {
LogUtil.printInfoLog(InfoLogs.DELETE_TOKENS_TRIGGERED.getLog());
try {
LogUtil.printInfoLog(InfoLogs.VALIDATE_DELETE_TOKENS_REQUEST.getLog());
@@ -194,7 +247,8 @@ public DeleteTokensResponse bulkDeleteTokens(DeleteTokensRequest deleteTokensReq
setBearerToken();
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDeleteTokenRequest request =
super.getDeleteTokensRequestBody(deleteTokensRequest);
- return this.processDeleteTokensSync(request, deleteTokensRequest.getTokens());
+ RequestInterceptor interceptor = options != null ? options.getInterceptor() : null;
+ return this.processDeleteTokensSync(request, deleteTokensRequest.getTokens(), interceptor);
} catch (ApiClientApiException e) {
String bodyString = gson.toJson(e.body());
LogUtil.printErrorLog(ErrorLogs.DELETE_REQUEST_REJECTED.getLog());
@@ -206,6 +260,10 @@ public DeleteTokensResponse bulkDeleteTokens(DeleteTokensRequest deleteTokensReq
}
public CompletableFuture bulkDeleteTokensAsync(DeleteTokensRequest deleteTokensRequest) throws SkyflowException {
+ return bulkDeleteTokensAsync(deleteTokensRequest, null);
+ }
+
+ public CompletableFuture bulkDeleteTokensAsync(DeleteTokensRequest deleteTokensRequest, DeleteTokensOptions options) throws SkyflowException {
LogUtil.printInfoLog(InfoLogs.DELETE_TOKENS_TRIGGERED.getLog());
ExecutorService executor = Executors.newFixedThreadPool(deleteTokensConcurrencyLimit);
try {
@@ -215,6 +273,7 @@ public CompletableFuture bulkDeleteTokensAsync(DeleteToken
setBearerToken();
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDeleteTokenRequest request =
super.getDeleteTokensRequestBody(deleteTokensRequest);
+ RequestInterceptor interceptor = options != null ? options.getInterceptor() : null;
LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
@@ -225,7 +284,7 @@ public CompletableFuture bulkDeleteTokensAsync(DeleteToken
Utils.createDeleteTokensBatches(request, deleteTokensBatchSize);
List> futures =
- this.deleteTokensBatchFutures(executor, batches);
+ this.deleteTokensBatchFutures(executor, batches, interceptor);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
@@ -259,9 +318,108 @@ public CompletableFuture bulkDeleteTokensAsync(DeleteToken
}
}
+ // ── Tokenize ──────────────────────────────────────────────────────────────
+
+ public TokenizeResponse bulkTokenize(TokenizeRequest tokenizeRequest) throws SkyflowException {
+ return bulkTokenize(tokenizeRequest, null);
+ }
+
+ public TokenizeResponse bulkTokenize(TokenizeRequest tokenizeRequest, TokenizeOptions options) throws SkyflowException {
+ LogUtil.printInfoLog(InfoLogs.TOKENIZE_TRIGGERED.getLog());
+ try {
+ LogUtil.printInfoLog(InfoLogs.VALIDATING_TOKENIZE_REQUEST.getLog());
+ Validations.validateTokenizeRequest(tokenizeRequest);
+ configureTokenizeConcurrencyAndBatchSize(tokenizeRequest.getData().size());
+ setBearerToken();
+ com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest request =
+ super.getTokenizeRequestBody(tokenizeRequest);
+ RequestInterceptor interceptor = options != null ? options.getInterceptor() : null;
+ return this.processTokenizeSync(request, tokenizeRequest.getData(), interceptor);
+ } catch (ApiClientApiException e) {
+ String bodyString = gson.toJson(e.body());
+ LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
+ throw new SkyflowException(e.statusCode(), e, e.headers(), bodyString);
+ } catch (SkyflowException e) {
+ LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
+ throw e;
+ } catch (ExecutionException | InterruptedException e) {
+ LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
+ throw new SkyflowException(e.getMessage());
+ }
+ }
+
+ public CompletableFuture bulkTokenizeAsync(TokenizeRequest tokenizeRequest) throws SkyflowException {
+ return bulkTokenizeAsync(tokenizeRequest, null);
+ }
+
+ public CompletableFuture bulkTokenizeAsync(TokenizeRequest tokenizeRequest, TokenizeOptions options) throws SkyflowException {
+ LogUtil.printInfoLog(InfoLogs.TOKENIZE_TRIGGERED.getLog());
+ ExecutorService executor = Executors.newFixedThreadPool(tokenizeConcurrencyLimit);
+ try {
+ LogUtil.printInfoLog(InfoLogs.VALIDATING_TOKENIZE_REQUEST.getLog());
+ Validations.validateTokenizeRequest(tokenizeRequest);
+ configureTokenizeConcurrencyAndBatchSize(tokenizeRequest.getData().size());
+ setBearerToken();
+ com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest request =
+ super.getTokenizeRequestBody(tokenizeRequest);
+ RequestInterceptor interceptor = options != null ? options.getInterceptor() : null;
+
+ LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
+
+ List errorRecords = Collections.synchronizedList(new ArrayList<>());
+ List successRecords = Collections.synchronizedList(new ArrayList<>());
+
+ List batches =
+ Utils.createTokenizeBatches(request, tokenizeBatchSize);
+
+ List> futures =
+ this.tokenizeBatchFutures(executor, batches, interceptor);
+
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+ .thenApply(v -> {
+ for (CompletableFuture future : futures) {
+ TokenizeResponse futureResponse = future.join();
+ if (futureResponse != null) {
+ if (futureResponse.getSuccess() != null) {
+ successRecords.addAll(futureResponse.getSuccess());
+ }
+ if (futureResponse.getErrors() != null) {
+ errorRecords.addAll(futureResponse.getErrors());
+ }
+ }
+ }
+ LogUtil.printInfoLog(InfoLogs.TOKENIZE_REQUEST_RESOLVED.getLog());
+ executor.shutdown();
+ return new TokenizeResponse(successRecords, errorRecords, tokenizeRequest.getData());
+ });
+ } catch (ApiClientApiException e) {
+ String bodyString = gson.toJson(e.body());
+ LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
+ throw new SkyflowException(e.statusCode(), e, e.headers(), bodyString);
+ } catch (SkyflowException e) {
+ LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
+ throw e;
+ } catch (Exception e) {
+ LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
+ throw new SkyflowException(e.getMessage());
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ // ── Private helpers ───────────────────────────────────────────────────────
+
+ private RequestOptions buildRequestOptions(RequestContext context) {
+ RequestOptions.Builder builder = RequestOptions.builder()
+ .addHeader(Constants.SDK_METRICS_HEADER_KEY, metrics.toString());
+ context.getHeaders().forEach((k, v) -> builder.addHeader(k.toString(), v));
+ return builder.build();
+ }
+
private DeleteTokensResponse processDeleteTokensSync(
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDeleteTokenRequest deleteTokensRequest,
- List originalTokens
+ List originalTokens,
+ RequestInterceptor interceptor
) throws ExecutionException, InterruptedException, SkyflowException {
LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
List errorRecords = Collections.synchronizedList(new ArrayList<>());
@@ -271,7 +429,7 @@ private DeleteTokensResponse processDeleteTokensSync(
Utils.createDeleteTokensBatches(deleteTokensRequest, deleteTokensBatchSize);
try {
List> futures =
- this.deleteTokensBatchFutures(executor, batches);
+ this.deleteTokensBatchFutures(executor, batches, interceptor);
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
} catch (Exception e) {
@@ -297,33 +455,35 @@ private DeleteTokensResponse processDeleteTokensSync(
private List> deleteTokensBatchFutures(
ExecutorService executor,
- List batches) {
+ List batches,
+ RequestInterceptor interceptor) {
List> futures = new ArrayList<>();
if (batches == null) return futures;
+ int totalBatches = batches.size();
for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
final int index = batchIndex;
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDeleteTokenRequest batch = batches.get(index);
+ RequestContext ctx = new RequestContext("DELETE_TOKENS", batchIndex, totalBatches);
+ if (interceptor != null) interceptor.intercept(ctx);
CompletableFuture future = CompletableFuture
- .supplyAsync(() -> processDeleteTokensBatch(batch), executor)
+ .supplyAsync(() -> processDeleteTokensBatch(batch, ctx), executor)
.handle((result, ex) -> {
if (ex != null) {
List batchErrors =
Utils.handleDeleteTokensBatchException(ex, batch, index, deleteTokensBatchSize);
return new DeleteTokensResponse(new ArrayList<>(), batchErrors);
}
- return Utils.formatDeleteTokensResponse(result, index, deleteTokensBatchSize);
+ return Utils.formatDeleteTokensResponse(result.body(), index, deleteTokensBatchSize, result.headers());
});
futures.add(future);
}
return futures;
}
- private com.skyflow.generated.rest.types.V1FlowDeleteTokenResponse processDeleteTokensBatch(
- com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDeleteTokenRequest batch) {
- RequestOptions requestOptions = RequestOptions.builder()
- .addHeader(Constants.SDK_METRICS_HEADER_KEY, metrics.toString())
- .build();
- return this.getRecordsApi().deletetoken(batch, requestOptions);
+ private ApiClientHttpResponse processDeleteTokensBatch(
+ com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDeleteTokenRequest batch,
+ RequestContext ctx) {
+ return this.getRecordsApi().withRawResponse().deletetoken(batch, buildRequestOptions(ctx));
}
private void configureDeleteTokensConcurrencyAndBatchSize(int totalRequests) {
@@ -362,7 +522,6 @@ private void configureDeleteTokensConcurrencyAndBatchSize(int totalRequests) {
}
}
- // Max no of threads required to run all batches concurrently at once
int maxConcurrencyNeeded = (totalRequests + this.deleteTokensBatchSize - 1) / this.deleteTokensBatchSize;
if (userProvidedConcurrencyLimit != null) {
@@ -392,86 +551,10 @@ private void configureDeleteTokensConcurrencyAndBatchSize(int totalRequests) {
}
}
- public TokenizeResponse bulkTokenize(TokenizeRequest tokenizeRequest) throws SkyflowException {
- LogUtil.printInfoLog(InfoLogs.TOKENIZE_TRIGGERED.getLog());
- try {
- LogUtil.printInfoLog(InfoLogs.VALIDATING_TOKENIZE_REQUEST.getLog());
- Validations.validateTokenizeRequest(tokenizeRequest);
- configureTokenizeConcurrencyAndBatchSize(tokenizeRequest.getData().size());
- setBearerToken();
- com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest request =
- super.getTokenizeRequestBody(tokenizeRequest);
- return this.processTokenizeSync(request, tokenizeRequest.getData());
- } catch (ApiClientApiException e) {
- String bodyString = gson.toJson(e.body());
- LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
- throw new SkyflowException(e.statusCode(), e, e.headers(), bodyString);
- } catch (SkyflowException e) {
- LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
- throw e;
- } catch (ExecutionException | InterruptedException e) {
- LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
- throw new SkyflowException(e.getMessage());
- }
- }
-
- public CompletableFuture bulkTokenizeAsync(TokenizeRequest tokenizeRequest) throws SkyflowException {
- LogUtil.printInfoLog(InfoLogs.TOKENIZE_TRIGGERED.getLog());
- ExecutorService executor = Executors.newFixedThreadPool(tokenizeConcurrencyLimit);
- try {
- LogUtil.printInfoLog(InfoLogs.VALIDATING_TOKENIZE_REQUEST.getLog());
- Validations.validateTokenizeRequest(tokenizeRequest);
- configureTokenizeConcurrencyAndBatchSize(tokenizeRequest.getData().size());
- setBearerToken();
- com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest request =
- super.getTokenizeRequestBody(tokenizeRequest);
-
- LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
-
- List errorRecords = Collections.synchronizedList(new ArrayList<>());
- List successRecords = Collections.synchronizedList(new ArrayList<>());
-
- List batches =
- Utils.createTokenizeBatches(request, tokenizeBatchSize);
-
- List> futures =
- this.tokenizeBatchFutures(executor, batches);
-
- return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
- .thenApply(v -> {
- for (CompletableFuture future : futures) {
- TokenizeResponse futureResponse = future.join();
- if (futureResponse != null) {
- if (futureResponse.getSuccess() != null) {
- successRecords.addAll(futureResponse.getSuccess());
- }
- if (futureResponse.getErrors() != null) {
- errorRecords.addAll(futureResponse.getErrors());
- }
- }
- }
- LogUtil.printInfoLog(InfoLogs.TOKENIZE_REQUEST_RESOLVED.getLog());
- executor.shutdown();
- return new TokenizeResponse(successRecords, errorRecords, tokenizeRequest.getData());
- });
- } catch (ApiClientApiException e) {
- String bodyString = gson.toJson(e.body());
- LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
- throw new SkyflowException(e.statusCode(), e, e.headers(), bodyString);
- } catch (SkyflowException e) {
- LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
- throw e;
- } catch (Exception e) {
- LogUtil.printErrorLog(ErrorLogs.TOKENIZE_REQUEST_REJECTED.getLog());
- throw new SkyflowException(e.getMessage());
- } finally {
- executor.shutdown();
- }
- }
-
private TokenizeResponse processTokenizeSync(
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest tokenizeRequest,
- java.util.ArrayList originalData
+ java.util.ArrayList originalData,
+ RequestInterceptor interceptor
) throws ExecutionException, InterruptedException, SkyflowException {
LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
List errorRecords = Collections.synchronizedList(new ArrayList<>());
@@ -481,7 +564,7 @@ private TokenizeResponse processTokenizeSync(
Utils.createTokenizeBatches(tokenizeRequest, tokenizeBatchSize);
try {
List> futures =
- this.tokenizeBatchFutures(executor, batches);
+ this.tokenizeBatchFutures(executor, batches, interceptor);
try {
CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
@@ -512,33 +595,35 @@ private TokenizeResponse processTokenizeSync(
private List> tokenizeBatchFutures(
ExecutorService executor,
- List batches) {
+ List batches,
+ RequestInterceptor interceptor) {
List> futures = new ArrayList<>();
if (batches == null) return futures;
+ int totalBatches = batches.size();
for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
final int index = batchIndex;
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest batch = batches.get(index);
+ RequestContext ctx = new RequestContext("TOKENIZE", batchIndex, totalBatches);
+ if (interceptor != null) interceptor.intercept(ctx);
CompletableFuture future = CompletableFuture
- .supplyAsync(() -> processTokenizeBatch(batch), executor)
+ .supplyAsync(() -> processTokenizeBatch(batch, ctx), executor)
.handle((result, ex) -> {
if (ex != null) {
List batchErrors =
Utils.handleTokenizeBatchException(ex, batch, index, tokenizeBatchSize);
return new TokenizeResponse(new ArrayList<>(), batchErrors);
}
- return Utils.formatTokenizeResponse(result, batch, index, tokenizeBatchSize);
+ return Utils.formatTokenizeResponse(result.body(), batch, index, tokenizeBatchSize, result.headers());
});
futures.add(future);
}
return futures;
}
- private com.skyflow.generated.rest.types.V1FlowTokenizeResponse processTokenizeBatch(
- com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest batch) {
- RequestOptions requestOptions = RequestOptions.builder()
- .addHeader(Constants.SDK_METRICS_HEADER_KEY, metrics.toString())
- .build();
- return this.getRecordsApi().tokenize(batch, requestOptions);
+ private ApiClientHttpResponse processTokenizeBatch(
+ com.skyflow.generated.rest.resources.flowservice.requests.V1FlowTokenizeRequest batch,
+ RequestContext ctx) {
+ return this.getRecordsApi().withRawResponse().tokenize(batch, buildRequestOptions(ctx));
}
private void configureTokenizeConcurrencyAndBatchSize(int totalRequests) {
@@ -608,12 +693,13 @@ private void configureTokenizeConcurrencyAndBatchSize(int totalRequests) {
private com.skyflow.vault.data.InsertResponse processSync(
com.skyflow.generated.rest.resources.flowservice.requests.V1InsertRequest insertRequest,
- ArrayList originalPayload
+ ArrayList originalPayload,
+ RequestInterceptor interceptor
) throws ExecutionException, InterruptedException {
LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
List successRecords = new ArrayList<>();
List errorRecords = Collections.synchronizedList(new ArrayList<>());
- List> futures = this.insertBatchFutures(insertRequest, errorRecords);
+ List> futures = this.insertBatchFutures(insertRequest, errorRecords, interceptor);
CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
@@ -636,7 +722,8 @@ private com.skyflow.vault.data.InsertResponse processSync(
private DetokenizeResponse processDetokenizeSync(
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDetokenizeRequest detokenizeRequest,
- List originalTokens
+ List originalTokens,
+ RequestInterceptor interceptor
) throws ExecutionException, InterruptedException, SkyflowException {
LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog());
List errorTokens = Collections.synchronizedList(new ArrayList<>());
@@ -644,12 +731,12 @@ private DetokenizeResponse processDetokenizeSync(
ExecutorService executor = Executors.newFixedThreadPool(detokenizeConcurrencyLimit);
List batches = Utils.createDetokenizeBatches(detokenizeRequest, detokenizeBatchSize);
try {
- List> futures = this.detokenizeBatchFutures(executor, batches, errorTokens);
+ List> futures = this.detokenizeBatchFutures(executor, batches, errorTokens, interceptor);
try {
-
CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
} catch (Exception e) {
+ // individual batch errors are already captured
}
for (CompletableFuture future : futures) {
DetokenizeResponse futureResponse = future.get();
@@ -673,16 +760,22 @@ private DetokenizeResponse processDetokenizeSync(
return response;
}
- private List> detokenizeBatchFutures(ExecutorService executor, List batches, List errorTokens) {
+ private List> detokenizeBatchFutures(
+ ExecutorService executor,
+ List batches,
+ List errorTokens,
+ RequestInterceptor interceptor) {
List> futures = new ArrayList<>();
try {
-
+ int totalBatches = batches.size();
for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDetokenizeRequest batch = batches.get(batchIndex);
int batchNumber = batchIndex;
+ RequestContext ctx = new RequestContext("DETOKENIZE", batchIndex, totalBatches);
+ if (interceptor != null) interceptor.intercept(ctx);
CompletableFuture future = CompletableFuture
- .supplyAsync(() -> processDetokenizeBatch(batch), executor)
- .thenApply(response -> Utils.formatDetokenizeResponse(response, batchNumber, detokenizeBatchSize))
+ .supplyAsync(() -> processDetokenizeBatch(batch, ctx), executor)
+ .thenApply(response -> Utils.formatDetokenizeResponse(response.body(), batchNumber, detokenizeBatchSize, response.headers()))
.exceptionally(ex -> {
errorTokens.addAll(Utils.handleDetokenizeBatchException(ex, batch, batchNumber, detokenizeBatchSize));
return null;
@@ -696,31 +789,33 @@ private List> detokenizeBatchFutures(Execu
return futures;
}
- private com.skyflow.generated.rest.types.V1FlowDetokenizeResponse processDetokenizeBatch(com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDetokenizeRequest batch) {
- RequestOptions requestOptions = RequestOptions.builder()
- .addHeader(Constants.SDK_METRICS_HEADER_KEY, metrics.toString())
- .build();
- return this.getRecordsApi().detokenize(batch, requestOptions);
+ private ApiClientHttpResponse processDetokenizeBatch(
+ com.skyflow.generated.rest.resources.flowservice.requests.V1FlowDetokenizeRequest batch,
+ RequestContext ctx) {
+ return this.getRecordsApi().withRawResponse().detokenize(batch, buildRequestOptions(ctx));
}
- private List>
- insertBatchFutures(
+ private List> insertBatchFutures(
com.skyflow.generated.rest.resources.flowservice.requests.V1InsertRequest insertRequest,
- List errorRecords) {
+ List errorRecords,
+ RequestInterceptor interceptor) {
List records = insertRequest.getRecords().get();
ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit);
List> batches = Utils.createBatches(records, insertBatchSize);
List> futures = new ArrayList<>();
V1Upsert upsert = insertRequest.getUpsert().isPresent() ? insertRequest.getUpsert().get() : null;
+ int totalBatches = batches.size();
try {
for (int batchIndex = 0; batchIndex < batches.size(); batchIndex++) {
List batch = batches.get(batchIndex);
int batchNumber = batchIndex;
+ RequestContext ctx = new RequestContext("INSERT", batchIndex, totalBatches);
+ if (interceptor != null) interceptor.intercept(ctx);
CompletableFuture future = CompletableFuture
- .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().isPresent() ? insertRequest.getTableName().get() : null, upsert), executor)
- .thenApply(response -> Utils.formatResponse(response, batchNumber, insertBatchSize))
+ .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().isPresent() ? insertRequest.getTableName().get() : null, upsert, ctx), executor)
+ .thenApply(response -> Utils.formatResponse(response.body(), batchNumber, insertBatchSize, response.headers()))
.exceptionally(ex -> {
errorRecords.addAll(Utils.handleBatchException(ex, batch, batchNumber, insertBatchSize));
return null;
@@ -733,20 +828,16 @@ private com.skyflow.generated.rest.types.V1FlowDetokenizeResponse processDetoken
return futures;
}
- private V1InsertResponse insertBatch(List batch, String tableName, V1Upsert upsert) {
+ private ApiClientHttpResponse insertBatch(List batch, String tableName, V1Upsert upsert, RequestContext ctx) {
com.skyflow.generated.rest.resources.flowservice.requests.V1InsertRequest.Builder req = com.skyflow.generated.rest.resources.flowservice.requests.V1InsertRequest.builder()
.vaultId(this.getVaultConfig().getVaultId())
.records(batch)
.upsert(upsert);
-// .build();
if (tableName != null && !tableName.isEmpty()) {
req.tableName(tableName);
}
com.skyflow.generated.rest.resources.flowservice.requests.V1InsertRequest request = req.build();
- RequestOptions requestOptions = RequestOptions.builder()
- .addHeader(Constants.SDK_METRICS_HEADER_KEY, metrics.toString())
- .build();
- return this.getRecordsApi().insert(request, requestOptions);
+ return this.getRecordsApi().withRawResponse().insert(request, buildRequestOptions(ctx));
}
private void configureInsertConcurrencyAndBatchSize(int totalRequests) {
diff --git a/v3/src/main/java/com/skyflow/vault/data/DeleteTokensOptions.java b/v3/src/main/java/com/skyflow/vault/data/DeleteTokensOptions.java
new file mode 100644
index 00000000..622f531e
--- /dev/null
+++ b/v3/src/main/java/com/skyflow/vault/data/DeleteTokensOptions.java
@@ -0,0 +1,30 @@
+package com.skyflow.vault.data;
+
+public final class DeleteTokensOptions {
+ private final RequestInterceptor interceptor;
+
+ private DeleteTokensOptions(Builder builder) {
+ this.interceptor = builder.interceptor;
+ }
+
+ public RequestInterceptor getInterceptor() {
+ return interceptor;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private RequestInterceptor interceptor;
+
+ public Builder interceptor(RequestInterceptor interceptor) {
+ this.interceptor = interceptor;
+ return this;
+ }
+
+ public DeleteTokensOptions build() {
+ return new DeleteTokensOptions(this);
+ }
+ }
+}
diff --git a/v3/src/main/java/com/skyflow/vault/data/DetokenizeOptions.java b/v3/src/main/java/com/skyflow/vault/data/DetokenizeOptions.java
new file mode 100644
index 00000000..267bd9bd
--- /dev/null
+++ b/v3/src/main/java/com/skyflow/vault/data/DetokenizeOptions.java
@@ -0,0 +1,30 @@
+package com.skyflow.vault.data;
+
+public final class DetokenizeOptions {
+ private final RequestInterceptor interceptor;
+
+ private DetokenizeOptions(Builder builder) {
+ this.interceptor = builder.interceptor;
+ }
+
+ public RequestInterceptor getInterceptor() {
+ return interceptor;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private RequestInterceptor interceptor;
+
+ public Builder interceptor(RequestInterceptor interceptor) {
+ this.interceptor = interceptor;
+ return this;
+ }
+
+ public DetokenizeOptions build() {
+ return new DetokenizeOptions(this);
+ }
+ }
+}
diff --git a/v3/src/main/java/com/skyflow/vault/data/ErrorRecord.java b/v3/src/main/java/com/skyflow/vault/data/ErrorRecord.java
index 9044f189..732952f3 100644
--- a/v3/src/main/java/com/skyflow/vault/data/ErrorRecord.java
+++ b/v3/src/main/java/com/skyflow/vault/data/ErrorRecord.java
@@ -10,14 +10,22 @@ public class ErrorRecord {
private String error;
@Expose(serialize = true)
private int code;
-// public ErrorRecord() {
-// }
+ @Expose(serialize = true)
+ private String requestId;
public ErrorRecord(int index, String error, int code) {
this.index = index;
this.error = error;
this.code = code;
}
+
+ public ErrorRecord(int index, String error, int code, String requestId) {
+ this.index = index;
+ this.error = error;
+ this.code = code;
+ this.requestId = requestId;
+ }
+
public String getError() {
return error;
}
@@ -30,6 +38,10 @@ public int getIndex() {
return index;
}
+ public String getRequestId() {
+ return requestId;
+ }
+
@Override
public String toString() {
diff --git a/v3/src/main/java/com/skyflow/vault/data/InsertOptions.java b/v3/src/main/java/com/skyflow/vault/data/InsertOptions.java
new file mode 100644
index 00000000..73262b98
--- /dev/null
+++ b/v3/src/main/java/com/skyflow/vault/data/InsertOptions.java
@@ -0,0 +1,30 @@
+package com.skyflow.vault.data;
+
+public final class InsertOptions {
+ private final RequestInterceptor interceptor;
+
+ private InsertOptions(Builder builder) {
+ this.interceptor = builder.interceptor;
+ }
+
+ public RequestInterceptor getInterceptor() {
+ return interceptor;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private RequestInterceptor interceptor;
+
+ public Builder interceptor(RequestInterceptor interceptor) {
+ this.interceptor = interceptor;
+ return this;
+ }
+
+ public InsertOptions build() {
+ return new InsertOptions(this);
+ }
+ }
+}
diff --git a/v3/src/main/java/com/skyflow/vault/data/RequestContext.java b/v3/src/main/java/com/skyflow/vault/data/RequestContext.java
new file mode 100644
index 00000000..07d6fda3
--- /dev/null
+++ b/v3/src/main/java/com/skyflow/vault/data/RequestContext.java
@@ -0,0 +1,32 @@
+package com.skyflow.vault.data;
+
+import com.skyflow.enums.CustomHeaderKey;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class RequestContext {
+ private final String operation;
+ private final int batchIndex;
+ private final int totalBatches;
+ private final Map headers = new HashMap<>();
+
+ public RequestContext(String operation, int batchIndex, int totalBatches) {
+ this.operation = operation;
+ this.batchIndex = batchIndex;
+ this.totalBatches = totalBatches;
+ }
+
+ public String getOperation() { return operation; }
+ public int getBatchIndex() { return batchIndex; }
+ public int getTotalBatches() { return totalBatches; }
+
+ public void addHeader(CustomHeaderKey key, String value) {
+ headers.put(key, value);
+ }
+
+ public Map getHeaders() {
+ return Collections.unmodifiableMap(headers);
+ }
+}
diff --git a/v3/src/main/java/com/skyflow/vault/data/RequestInterceptor.java b/v3/src/main/java/com/skyflow/vault/data/RequestInterceptor.java
new file mode 100644
index 00000000..37f5261d
--- /dev/null
+++ b/v3/src/main/java/com/skyflow/vault/data/RequestInterceptor.java
@@ -0,0 +1,6 @@
+package com.skyflow.vault.data;
+
+@FunctionalInterface
+public interface RequestInterceptor {
+ void intercept(RequestContext context);
+}
diff --git a/v3/src/main/java/com/skyflow/vault/data/TokenizeOptions.java b/v3/src/main/java/com/skyflow/vault/data/TokenizeOptions.java
new file mode 100644
index 00000000..2ac5bcfd
--- /dev/null
+++ b/v3/src/main/java/com/skyflow/vault/data/TokenizeOptions.java
@@ -0,0 +1,30 @@
+package com.skyflow.vault.data;
+
+public final class TokenizeOptions {
+ private final RequestInterceptor interceptor;
+
+ private TokenizeOptions(Builder builder) {
+ this.interceptor = builder.interceptor;
+ }
+
+ public RequestInterceptor getInterceptor() {
+ return interceptor;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private RequestInterceptor interceptor;
+
+ public Builder interceptor(RequestInterceptor interceptor) {
+ this.interceptor = interceptor;
+ return this;
+ }
+
+ public TokenizeOptions build() {
+ return new TokenizeOptions(this);
+ }
+ }
+}
diff --git a/v3/src/test/java/com/skyflow/SkyflowTests.java b/v3/src/test/java/com/skyflow/SkyflowTests.java
index 0904461e..2e454207 100644
--- a/v3/src/test/java/com/skyflow/SkyflowTests.java
+++ b/v3/src/test/java/com/skyflow/SkyflowTests.java
@@ -193,6 +193,139 @@ public void testSetLogLevelReturnsBuilder() {
}
}
+ @Test
+ public void testSetLogLevelNullDefaultsToError() {
+ try {
+ VaultConfig config = new VaultConfig();
+ config.setVaultId(vaultID);
+ config.setClusterId(clusterID);
+ config.setEnv(Env.SANDBOX);
+
+ Skyflow skyflow = Skyflow.builder()
+ .setLogLevel(null)
+ .addVaultConfig(config)
+ .build();
+
+ Assert.assertEquals(LogLevel.ERROR, skyflow.getLogLevel());
+ } catch (Exception e) {
+ Assert.fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAddSkyflowCredentials_invalidCredentials_throws() {
+ try {
+ VaultConfig config = new VaultConfig();
+ config.setVaultId(vaultID);
+ config.setClusterId(clusterID);
+ config.setEnv(Env.SANDBOX);
+
+ Credentials badCreds = new Credentials();
+ badCreds.setToken(""); // empty token — invalid
+
+ Skyflow.builder()
+ .addVaultConfig(config)
+ .addSkyflowCredentials(badCreds);
+ Assert.fail(EXCEPTION_NOT_THROWN);
+ } catch (SkyflowException e) {
+ Assert.assertEquals(ErrorCode.INVALID_INPUT.getCode(), e.getHttpCode());
+ Assert.assertEquals(ErrorMessage.EmptyToken.getMessage(), e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAddSkyflowCredentials_propagatesToVaultController() {
+ try {
+ VaultConfig config = new VaultConfig();
+ config.setVaultId(vaultID);
+ config.setClusterId(clusterID);
+ config.setEnv(Env.SANDBOX);
+
+ Credentials creds = new Credentials();
+ creds.setToken(token);
+
+ Skyflow skyflow = Skyflow.builder()
+ .addVaultConfig(config)
+ .addSkyflowCredentials(creds)
+ .build();
+
+ VaultController controller = skyflow.vault();
+ Assert.assertNotNull(controller);
+
+ // verify that common credentials were propagated — the controller should
+ // hold the credentials we passed, not null
+ Object builder = getField(skyflow, "builder");
+ Credentials storedCreds = (Credentials) getField(builder.getClass().getSuperclass(), builder, "skyflowCredentials");
+ Assert.assertEquals(token, storedCreds.getToken());
+ } catch (Exception e) {
+ Assert.fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAddSkyflowCredentials_returnsBuilderForChaining() {
+ try {
+ VaultConfig config = new VaultConfig();
+ config.setVaultId(vaultID);
+ config.setClusterId(clusterID);
+ config.setEnv(Env.SANDBOX);
+
+ Credentials creds = new Credentials();
+ creds.setToken(token);
+
+ Skyflow.SkyflowClientBuilder builder = Skyflow.builder().addVaultConfig(config);
+ Skyflow.SkyflowClientBuilder returned = builder.addSkyflowCredentials(creds);
+ Assert.assertSame(builder, returned);
+ } catch (Exception e) {
+ Assert.fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetVaultConfig_returnsStoredConfig() {
+ try {
+ VaultConfig config = new VaultConfig();
+ config.setVaultId(vaultID);
+ config.setClusterId(clusterID);
+ config.setEnv(Env.SANDBOX);
+
+ Skyflow skyflow = Skyflow.builder().addVaultConfig(config).build();
+ VaultConfig stored = skyflow.getVaultConfig();
+
+ Assert.assertNotNull(stored);
+ Assert.assertEquals(vaultID, stored.getVaultId());
+ Assert.assertEquals(clusterID, stored.getClusterId());
+ Assert.assertEquals(Env.SANDBOX, stored.getEnv());
+ } catch (Exception e) {
+ Assert.fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetVaultConfig_returnsCloneNotOriginal() {
+ try {
+ VaultConfig config = new VaultConfig();
+ config.setVaultId(vaultID);
+ config.setClusterId(clusterID);
+ config.setEnv(Env.SANDBOX);
+
+ Skyflow skyflow = Skyflow.builder().addVaultConfig(config).build();
+ VaultConfig stored = skyflow.getVaultConfig();
+
+ // The stored config must be a different object from the one passed in
+ Assert.assertNotSame(config, stored);
+ } catch (Exception e) {
+ Assert.fail("Unexpected exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBuilderReturnsNewInstanceEachCall() {
+ Skyflow.SkyflowClientBuilder b1 = Skyflow.builder();
+ Skyflow.SkyflowClientBuilder b2 = Skyflow.builder();
+ Assert.assertNotSame(b1, b2);
+ }
+
private Object getField(Object instance, String fieldName) throws Exception {
Field f = instance.getClass().getDeclaredField(fieldName);
f.setAccessible(true);
diff --git a/v3/src/test/java/com/skyflow/enums/CustomHeaderKeyTests.java b/v3/src/test/java/com/skyflow/enums/CustomHeaderKeyTests.java
new file mode 100644
index 00000000..5ca546df
--- /dev/null
+++ b/v3/src/test/java/com/skyflow/enums/CustomHeaderKeyTests.java
@@ -0,0 +1,34 @@
+package com.skyflow.enums;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CustomHeaderKeyTests {
+
+ @Test
+ public void values_hasExactlyThreeEntries() {
+ Assert.assertEquals(3, CustomHeaderKey.values().length);
+ }
+
+ @Test
+ public void skyflowAccountID_toStringReturnsCorrectHeader() {
+ Assert.assertEquals("x-skyflow-account-id", CustomHeaderKey.SkyflowAccountID.toString());
+ }
+
+ @Test
+ public void skyflowAccountName_toStringReturnsCorrectHeader() {
+ Assert.assertEquals("x-skyflow-account-name", CustomHeaderKey.SkyflowAccountName.toString());
+ }
+
+ @Test
+ public void requestIDHeader_toStringReturnsCorrectHeader() {
+ Assert.assertEquals("x-request-id", CustomHeaderKey.RequestIDHeader.toString());
+ }
+
+ @Test
+ public void valueOf_returnsCorrectConstants() {
+ Assert.assertEquals(CustomHeaderKey.SkyflowAccountID, CustomHeaderKey.valueOf("SkyflowAccountID"));
+ Assert.assertEquals(CustomHeaderKey.SkyflowAccountName, CustomHeaderKey.valueOf("SkyflowAccountName"));
+ Assert.assertEquals(CustomHeaderKey.RequestIDHeader, CustomHeaderKey.valueOf("RequestIDHeader"));
+ }
+}
\ No newline at end of file
diff --git a/v3/src/test/java/com/skyflow/utils/UtilsTests.java b/v3/src/test/java/com/skyflow/utils/UtilsTests.java
index 46d28164..be1456b1 100644
--- a/v3/src/test/java/com/skyflow/utils/UtilsTests.java
+++ b/v3/src/test/java/com/skyflow/utils/UtilsTests.java
@@ -1,5 +1,6 @@
package com.skyflow.utils;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonObject;
import com.skyflow.config.Credentials;
import com.skyflow.enums.Env;
@@ -7,11 +8,18 @@
import com.skyflow.errors.ErrorMessage;
import com.skyflow.errors.SkyflowException;
import com.skyflow.generated.auth.rest.core.ApiClientApiException;
+import com.skyflow.generated.rest.types.V1DeleteTokenResponseObject;
+import com.skyflow.generated.rest.types.V1FlowDeleteTokenResponse;
+import com.skyflow.generated.rest.types.V1FlowDetokenizeResponseObject;
+import com.skyflow.generated.rest.types.V1FlowTokenizeResponseObject;
import com.skyflow.generated.rest.types.V1InsertRecordData;
import com.skyflow.generated.rest.types.V1InsertResponse;
import com.skyflow.generated.rest.types.V1RecordResponseObject;
import com.skyflow.utils.validations.Validations;
import com.skyflow.vault.data.*;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -897,6 +905,339 @@ public void testCreateDetokenizeBatchesWithBatchSizeGreaterThanTokens() {
Assert.assertEquals(Arrays.asList("token1"), batches.get(0).getTokens().get());
}
+ // ── handleBatchException — rest.ApiClientApiException paths ──────────────
+
+ @Test
+ public void handleBatchException_restException_nullBody_createsOneErrorPerRecord() {
+ List batch = Arrays.asList(
+ V1InsertRecordData.builder().build(), V1InsertRecordData.builder().build());
+ com.skyflow.generated.rest.core.ApiClientApiException apiEx =
+ new com.skyflow.generated.rest.core.ApiClientApiException("server error", 503, null);
+ Exception wrapper = new Exception("outer", apiEx);
+
+ List errors = Utils.handleBatchException(wrapper, batch, 0, 2);
+
+ Assert.assertEquals(2, errors.size());
+ Assert.assertEquals(0, errors.get(0).getIndex());
+ Assert.assertEquals(1, errors.get(1).getIndex());
+ Assert.assertEquals(503, errors.get(0).getCode());
+ }
+
+ @Test
+ public void handleBatchException_restException_stringBody_doesNotThrowAndCreatesErrors() {
+ List batch = Collections.singletonList(V1InsertRecordData.builder().build());
+ com.skyflow.generated.rest.core.ApiClientApiException apiEx =
+ new com.skyflow.generated.rest.core.ApiClientApiException("Unauthorized", 401, "plain string body");
+ Exception wrapper = new Exception("outer", apiEx);
+
+ List errors = Utils.handleBatchException(wrapper, batch, 0, 1);
+
+ Assert.assertEquals(1, errors.size());
+ Assert.assertEquals(0, errors.get(0).getIndex());
+ Assert.assertEquals(401, errors.get(0).getCode());
+ }
+
+ @Test
+ public void handleBatchException_restException_errorFieldIsString_usesStringAsMessage() {
+ List batch = Arrays.asList(
+ V1InsertRecordData.builder().build(), V1InsertRecordData.builder().build());
+ Map body = new HashMap<>();
+ body.put("error", "Access denied");
+ com.skyflow.generated.rest.core.ApiClientApiException apiEx =
+ new com.skyflow.generated.rest.core.ApiClientApiException("Forbidden", 403, body);
+ Exception wrapper = new Exception("outer", apiEx);
+
+ List errors = Utils.handleBatchException(wrapper, batch, 0, 2);
+
+ Assert.assertEquals(2, errors.size());
+ Assert.assertEquals("Access denied", errors.get(0).getError());
+ Assert.assertEquals(403, errors.get(0).getCode());
+ Assert.assertEquals(0, errors.get(0).getIndex());
+ Assert.assertEquals(1, errors.get(1).getIndex());
+ }
+
+ @Test
+ public void handleBatchException_recordsListWithNonMapEntry_skipsNonMapItem() {
+ List batch = Arrays.asList(
+ V1InsertRecordData.builder().build(), V1InsertRecordData.builder().build());
+ Map rec = new HashMap<>();
+ rec.put("error", "Err");
+ rec.put("http_code", 400);
+ List