Skip to content

Commit

Permalink
Decouple request retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Apr 9, 2019
1 parent 8c0ebf5 commit 7d3dd1f
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 235 deletions.

Large diffs are not rendered by default.

Expand Up @@ -56,8 +56,8 @@ final case class ElasticsearchIO[T](esOptions: ElasticsearchOptions) extends Sci
.withFlushInterval(params.flushInterval)
.withNumOfShard(shards)
.withMaxBulkRequestSize(params.maxBulkRequestSize)
.withMaxRetries(params.maxRetries)
.withRetryPause(params.retryPause)
.withMaxRetries(params.retry.maxRetries)
.withRetryPause(params.retry.retryPause)
.withError(new beam.ThrowingConsumer[BulkExecutionException] {
override def accept(t: BulkExecutionException): Unit =
params.errorFn(t)
Expand All @@ -77,7 +77,11 @@ object ElasticsearchIO {
private[elasticsearch] val DefaultNumShards = 0
private[elasticsearch] val DefaultMaxBulkRequestSize = 3000
private[elasticsearch] val DefaultMaxRetries = 3
private[elasticsearch] val DefaultRetryPause = 5
private[elasticsearch] val DefaultRetryPause = Duration.millis(35000)
private[elasticsearch] val DefaultRetryConfig =
RetryConfig(maxRetries = WriteParam.DefaultMaxRetries,
retryPause = WriteParam.DefaultRetryPause)

}

final case class WriteParam[T] private (
Expand All @@ -86,6 +90,7 @@ object ElasticsearchIO {
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
maxRetries: Int = WriteParam.DefaultMaxRetries,
retryPause: Int = WriteParam.DefaultRetryPause)
retry: RetryConfig = WriteParam.DefaultRetryConfig)

final case class RetryConfig(maxRetries: Int, retryPause: Duration)
}
Expand Up @@ -55,17 +55,9 @@ package object elasticsearch {
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn,
maxRetries: Int = WriteParam.DefaultMaxRetries,
retryPause: Int = WriteParam.DefaultRetryPause)(
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)(
f: T => Iterable[ActionRequest[_]])(implicit coder: Coder[T]): ClosedTap[Nothing] = {
val param = WriteParam(f,
errorFn,
flushInterval,
numOfShards,
maxBulkRequestSize,
maxRetries,
retryPause)
val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize)
self.write(ElasticsearchIO[T](esOptions))(param)
}
}
Expand Down
Expand Up @@ -37,22 +37,24 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ProcessFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -143,15 +145,15 @@ public static <T> Bound withMaxRetries(int maxRetries) {
*
* @param retryPause Number of seconds to wait between successive retry attempts.
*/
public static <T> Bound withRetryPause(int retryPause) {
public static <T> Bound withRetryPause(Duration retryPause) {
return new Bound<>().withRetryPause(retryPause);
}

public static class Bound<T> extends PTransform<PCollection<T>, PDone> {

private static final int CHUNK_SIZE = 3000;
private static final int DEFAULT_RETRIES = 3;
private static final int DEFAULT_RETRY_PAUSE = 5;
private static final Duration DEFAULT_RETRY_PAUSE = Duration.millis(35000);

private final String clusterName;
private final InetSocketAddress[] servers;
Expand All @@ -160,7 +162,7 @@ public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
private final long numOfShard;
private final int maxBulkRequestSize;
private final int maxRetries;
private final int retryPause;
private final Duration retryPause;
private final ThrowingConsumer<BulkExecutionException> error;

private Bound(final String clusterName,
Expand All @@ -170,7 +172,7 @@ private Bound(final String clusterName,
final long numOfShard,
final int maxBulkRequestSize,
final int maxRetries,
final int retryPause,
final Duration retryPause,
final ThrowingConsumer<BulkExecutionException> error) {
this.clusterName = clusterName;
this.servers = servers;
Expand Down Expand Up @@ -238,7 +240,7 @@ public Bound<T> withMaxRetries(int maxRetries) {
maxRetries, retryPause, error);
}

public Bound<T> withRetryPause(int retryPause) {
public Bound<T> withRetryPause(Duration retryPause) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
Expand All @@ -253,7 +255,7 @@ public PDone expand(final PCollection<T> input) {
checkArgument(numOfShard > 0);
checkArgument(maxBulkRequestSize > 0);
checkArgument(maxRetries >= 0);
checkArgument(retryPause >= 0);
checkArgument(retryPause.getMillis() >= 0);
input
.apply("Assign To Shard", ParDo.of(new AssignToShard<>(numOfShard)))
.apply("Re-Window to Global Window", Window.<KV<Long, T>>into(new GlobalWindows())
Expand Down Expand Up @@ -291,21 +293,26 @@ public void processElement(ProcessContext c) throws Exception {
private static class ElasticsearchWriter<T> extends DoFn<KV<Long, Iterable<T>>, Void> {

private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class);
private static final String RETRY_ATTEMPT_LOG =
"Error writing to Elasticsearch. Retry attempt[%d]";
private static final String RETRY_FAILED_LOG =
"Error writing to ES after %d attempt(s). No more attempts allowed";

private final ClientSupplier clientSupplier;
private final SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests;
private final ThrowingConsumer<BulkExecutionException> error;
private FluentBackoff backoffConfig;
private final int maxBulkRequestSize;
private final int maxRetries;
private final int retryPause;
private final Duration retryPause;

public ElasticsearchWriter(String clusterName,
InetSocketAddress[] servers,
int maxBulkRequestSize,
SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests,
ThrowingConsumer<BulkExecutionException> error,
int maxRetries,
int retryPause) {
Duration retryPause) {
this.maxBulkRequestSize = maxBulkRequestSize;
this.clientSupplier = new ClientSupplier(clusterName, servers);
this.toDocWriteRequests = toDocWriteRequests;
Expand All @@ -318,7 +325,7 @@ public ElasticsearchWriter(String clusterName,
public void setup() throws Exception {
this.backoffConfig = FluentBackoff.DEFAULT
.withMaxRetries(this.maxRetries)
.withInitialBackoff(Duration.standardSeconds(this.retryPause));
.withInitialBackoff(this.retryPause);
}

@SuppressWarnings("Duplicates")
Expand All @@ -341,57 +348,60 @@ public void processElement(ProcessContext c) throws Exception {
final Iterable<List<DocWriteRequest>> chunks =
Iterables.partition(docWriteRequests::iterator, maxBulkRequestSize);

chunks.forEach(chunk -> {
Exception exception;
final ProcessFunction<List<DocWriteRequest>, BulkResponse> requestFn =
request(clientSupplier, error);
final ProcessFunction<List<DocWriteRequest>, BulkResponse> retryFn =
retry(requestFn, backoffConfig);

for (final List<DocWriteRequest> chunk : chunks) {
try {
requestFn.apply(chunk);
} catch (Exception e) {
retryFn.apply(chunk);
}
}
}

private static ProcessFunction<List<DocWriteRequest>, BulkResponse> request(
final ClientSupplier clientSupplier,
final ThrowingConsumer<BulkExecutionException> bulkErrorHandler) {
return chunk -> {
final BulkRequest bulkRequest = new BulkRequest().add(chunk);
final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get();

if (bulkItemResponse.hasFailures()) {
bulkErrorHandler.accept(new BulkExecutionException(bulkItemResponse));
}

return bulkItemResponse;
};
}

private static ProcessFunction<List<DocWriteRequest>, BulkResponse> retry(
final ProcessFunction<List<DocWriteRequest>, BulkResponse> requestFn,
final FluentBackoff backoffConfig) {
return chunk -> {
final BackOff backoff = backoffConfig.backoff();
int attempt = 0;
BulkResponse response = null;
Exception exception = null;

do {
while (response == null && BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
try {
final BulkRequest bulkRequest = new BulkRequest().add(chunk)
.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get();
if (bulkItemResponse.hasFailures()) {
throw new BulkExecutionException(bulkItemResponse);
} else {
exception = null;
break;
}
response = requestFn.apply(chunk);
exception = null;
} catch (Exception e) {
exception = e;

LOG.error(
"ElasticsearchWriter: Failed to bulk save chunk of " +
Objects.toString(chunk.size()),
exception);

// Backoff
try {
final long sleepTime = backoff.nextBackOffMillis();
if (sleepTime == BackOff.STOP) {
break;
}
Thread.sleep(sleepTime);
} catch (InterruptedException | IOException e1) {
LOG.error("Interrupt during backoff", e1);
break;
}
}
} while (true);
}

try {
if (exception != null) {
if (exception instanceof BulkExecutionException) {
// This may result in no exception being thrown, depending on callback.
error.accept((BulkExecutionException) exception);
} else {
throw exception;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
if (exception != null) {
throw new Exception(String.format(RETRY_FAILED_LOG, attempt), exception);
}
});

return response;
};
}
}

Expand Down
Expand Up @@ -60,8 +60,8 @@ final case class ElasticsearchIO[T](esOptions: ElasticsearchOptions) extends Sci
.withFlushInterval(params.flushInterval)
.withNumOfShard(shards)
.withMaxBulkRequestSize(params.maxBulkRequestSize)
.withMaxRetries(params.maxRetries)
.withRetryPause(params.retryPause)
.withMaxRetries(params.retry.maxRetries)
.withRetryPause(params.retry.retryPause)
.withError(new beam.ThrowingConsumer[BulkExecutionException] {
override def accept(t: BulkExecutionException): Unit =
params.errorFn(t)
Expand All @@ -81,7 +81,11 @@ object ElasticsearchIO {
private[elasticsearch] val DefaultNumShards = 0
private[elasticsearch] val DefaultMaxBulkRequestSize = 3000
private[elasticsearch] val DefaultMaxRetries = 3
private[elasticsearch] val DefaultRetryPause = 5
private[elasticsearch] val DefaultRetryPause = Duration.millis(35000)
private[elasticsearch] val DefaultRetryConfig =
RetryConfig(maxRetries = WriteParam.DefaultMaxRetries,
retryPause = WriteParam.DefaultRetryPause)

}

final case class WriteParam[T] private (
Expand All @@ -90,6 +94,7 @@ object ElasticsearchIO {
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
maxRetries: Int = WriteParam.DefaultMaxRetries,
retryPause: Int = WriteParam.DefaultRetryPause)
retry: RetryConfig = WriteParam.DefaultRetryConfig)

final case class RetryConfig(maxRetries: Int, retryPause: Duration)
}
Expand Up @@ -22,7 +22,7 @@ import java.net.InetSocketAddress
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.values.SCollection
import com.spotify.scio.coders.Coder
import com.spotify.scio.elasticsearch.ElasticsearchIO.WriteParam
import com.spotify.scio.elasticsearch.ElasticsearchIO.{RetryConfig, WriteParam}
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write.BulkExecutionException
import org.elasticsearch.action.DocWriteRequest
import org.joda.time.Duration
Expand Down Expand Up @@ -56,16 +56,9 @@ package object elasticsearch {
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn,
maxRetries: Int = WriteParam.DefaultMaxRetries,
retryPause: Int = WriteParam.DefaultRetryPause)(
retry: RetryConfig = WriteParam.DefaultRetryConfig)(
f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): ClosedTap[Nothing] = {
val param = WriteParam(f,
errorFn,
flushInterval,
numOfShards,
maxBulkRequestSize,
maxRetries,
retryPause)
val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize, retry)
self.write(ElasticsearchIO[T](esOptions))(param)
}
}
Expand Down

0 comments on commit 7d3dd1f

Please sign in to comment.