Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP-122 Add lookup retries #129

Closed
wants to merge 6 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
HTTP-122 Add lookup retries
  • Loading branch information
Grzegorz Kołakowski committed Oct 14, 2024
commit ece37a7bc7ef98d0701d57019bdfa636b471f706
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -449,16 +449,21 @@ be requested if the current time is later than the cached token expiry time minu
| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
| lookup.retry-strategy.type | optional | Defines the retry strategy to use in case of lookup failures. Accepted values are: `none` (default), `fixed-delay` and `exponential-delay`. |
| lookup.retry-strategy.fixed-delay.attempts | optional | The number of times that connector retries lookup execution before connector returns empty result. |
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returns empty result

In current implementation, an empty result is returned if error status code is returned or when exception is thrown. To be honest I'm strongly against such approach. Errors should not be suppressed, at least by default. If error suppression is needed, user should have to configure it explicitly. WDYT?

| lookup.retry-strategy.fixed-delay.delay | optional | Delay between two consecutive retry attempts. |
| lookup.retry-strategy.exponential-delay.attempts | optional | The number of times that connector retries lookup execution before connector returns empty result. |
| lookup.retry-strategy.exponential-delay.initial-delay | optional | Initial delay between two consecutive retry attempts. |
| lookup.retry-strategy.exponential-delay.max-delay | optional | The highest possible duration between two consecutive retry attempts. |
| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding |
| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding |
| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.getindata.connectors.http.internal;

import java.util.List;
import java.util.stream.Collectors;

import lombok.Data;
import lombok.NonNull;
import lombok.ToString;

import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.status.HttpResponseStatus;

/**
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
@@ -18,14 +20,36 @@
public class SinkHttpClientResponse {

/**
* A list of successfully written requests.
* A list of requests along with write status.
*/
@NonNull
private final List<HttpRequest> successfulRequests;
private final List<ResponseItem> requests;

/**
* A list of requests that {@link SinkHttpClient} failed to write.
*/
@NonNull
private final List<HttpRequest> failedRequests;
public List<HttpRequest> getSuccessfulRequests() {
return requests.stream()
.filter(i -> i.getStatus() == HttpResponseStatus.SUCCESS)
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

public List<HttpRequest> getFailedRetryableRequests() {
return requests.stream()
.filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_RETRYABLE)
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

public List<HttpRequest> getFailedNotRetryableRequests() {
return requests.stream()
.filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_NOT_RETRYABLE)
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

@Data
@ToString
public static class ResponseItem {
private final HttpRequest request;
private final HttpResponseStatus status;
}
}
Original file line number Diff line number Diff line change
@@ -28,13 +28,13 @@ public final class HttpConnectorConfigConstants {
+ "source.lookup.header.";

public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
+ "security.oidc.token.request";
+ "security.oidc.token.request";

public static final String OIDC_AUTH_TOKEN_ENDPOINT_URL = GID_CONNECTOR_HTTP
+ "security.oidc.token.endpoint.url";
+ "security.oidc.token.endpoint.url";

public static final String OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = GID_CONNECTOR_HTTP
+ "security.oidc.token.expiry.reduction";
+ "security.oidc.token.expiry.reduction";
/**
* Whether to use the raw value of the Authorization header. If set, it prevents
* the special treatment of the header for Basic Authentication, thus preserving the passed
@@ -54,6 +54,12 @@ public final class HttpConnectorConfigConstants {

public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code";

public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error-retryable.code.exclude";

public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error-retryable.code";
// -----------------------------------------------------

public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.getindata.connectors.http.internal.config;

import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.InlineElement;
import static org.apache.flink.configuration.description.TextElement.text;

public enum RetryStrategyType implements DescribedEnum {

NONE("none", text("None")),
FIXED_DELAY("fixed-delay", text("Fixed delay strategy")),
EXPONENTIAL_DELAY("exponential-delay", text("Exponential delay strategy"));

private final String value;
private final InlineElement inlineElement;

RetryStrategyType(String value, InlineElement inlineElement) {
this.value = value;
this.inlineElement = inlineElement;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return inlineElement;
}

}
Original file line number Diff line number Diff line change
@@ -46,18 +46,18 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ
private final Counter numRecordsSendErrorsCounter;

public HttpSinkWriter(
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String endpointUrl,
SinkHttpClient sinkHttpClient,
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
Properties properties) {
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String endpointUrl,
SinkHttpClient sinkHttpClient,
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
Properties properties) {

super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
@@ -82,8 +82,8 @@ public HttpSinkWriter(
// TODO: Reintroduce retries by adding backoff policy
@Override
protected void submitRequestEntries(
List<HttpSinkRequestEntry> requestEntries,
Consumer<List<HttpSinkRequestEntry>> requestResult) {
List<HttpSinkRequestEntry> requestEntries,
Consumer<List<HttpSinkRequestEntry>> requestResult) {
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
future.whenCompleteAsync((response, err) -> {
if (err != null) {
@@ -98,8 +98,10 @@ protected void submitRequestEntries(
// to the `numRecordsSendErrors` metric. It is due to the fact we do not have
// a clear image how we want to do it, so it would be both efficient and correct.
//requestResult.accept(requestEntries);
} else if (response.getFailedRequests().size() > 0) {
int failedRequestsNumber = response.getFailedRequests().size();
} else if (response.getFailedNotRetryableRequests().size()
+ response.getFailedRetryableRequests().size() > 0) {
int failedRequestsNumber = response.getFailedNotRetryableRequests().size()
+ response.getFailedRetryableRequests().size();
log.error("Http Sink failed to write and will retry {} requests",
failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);
Loading
Oops, something went wrong.