Skip to content

Commit

Permalink
Add fluent backoff support for ES (#1826)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Apr 10, 2019
1 parent f216963 commit 408e98c
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 83 deletions.

Large diffs are not rendered by default.

Expand Up @@ -56,6 +56,8 @@ final case class ElasticsearchIO[T](esOptions: ElasticsearchOptions) extends Sci
.withFlushInterval(params.flushInterval)
.withNumOfShard(shards)
.withMaxBulkRequestSize(params.maxBulkRequestSize)
.withMaxRetries(params.retry.maxRetries)
.withRetryPause(params.retry.retryPause)
.withError(new beam.ThrowingConsumer[BulkExecutionException] {
override def accept(t: BulkExecutionException): Unit =
params.errorFn(t)
Expand All @@ -74,12 +76,21 @@ object ElasticsearchIO {
private[elasticsearch] val DefaultFlushInterval = Duration.standardSeconds(1)
private[elasticsearch] val DefaultNumShards = 0
private[elasticsearch] val DefaultMaxBulkRequestSize = 3000
private[elasticsearch] val DefaultMaxRetries = 3
private[elasticsearch] val DefaultRetryPause = Duration.millis(35000)
private[elasticsearch] val DefaultRetryConfig =
RetryConfig(maxRetries = WriteParam.DefaultMaxRetries,
retryPause = WriteParam.DefaultRetryPause)

}

final case class WriteParam[T] private (
f: T => Iterable[ActionRequest[_]],
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn,
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize)
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
retry: RetryConfig = WriteParam.DefaultRetryConfig)

final case class RetryConfig(maxRetries: Int, retryPause: Duration)
}
Expand Up @@ -37,12 +37,17 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ProcessFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
Expand Down Expand Up @@ -121,19 +126,43 @@ public static <T> Bound withError(ThrowingConsumer<BulkExecutionException> error
return new Bound<>().withError(error);
}

public static<T> Bound withMaxBulkRequestSize(int maxBulkRequestSize) {
public static <T> Bound withMaxBulkRequestSize(int maxBulkRequestSize) {
return new Bound<>().withMaxBulkRequestSize(maxBulkRequestSize);
}

/**
* Returns a transform for writing to Elasticsearch cluster.
*
* @param maxRetries Maximum number of retries to attempt for saving any single chunk of bulk
* requests to the Elasticsearch cluster.
*/
public static <T> Bound withMaxRetries(int maxRetries) {
return new Bound<>().withMaxRetries(maxRetries);
}

/**
* Returns a transform for writing to Elasticsearch cluster.
*
* @param retryPause Duration to wait between successive retry attempts.
*/
public static <T> Bound withRetryPause(Duration retryPause) {
return new Bound<>().withRetryPause(retryPause);
}

public static class Bound<T> extends PTransform<PCollection<T>, PDone> {

private static final int CHUNK_SIZE = 3000;
private static final int DEFAULT_RETRIES = 3;
private static final Duration DEFAULT_RETRY_PAUSE = Duration.millis(35000);

private final String clusterName;
private final InetSocketAddress[] servers;
private final Duration flushInterval;
private final SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests;
private final long numOfShard;
private final int maxBulkRequestSize;
private final int maxRetries;
private final Duration retryPause;
private final ThrowingConsumer<BulkExecutionException> error;

private Bound(final String clusterName,
Expand All @@ -142,47 +171,79 @@ private Bound(final String clusterName,
final SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests,
final long numOfShard,
final int maxBulkRequestSize,
final int maxRetries,
final Duration retryPause,
final ThrowingConsumer<BulkExecutionException> error) {
this.clusterName = clusterName;
this.servers = servers;
this.flushInterval = flushInterval;
this.toDocWriteRequests = toDocWriteRequests;
this.numOfShard = numOfShard;
this.maxBulkRequestSize = maxBulkRequestSize;
this.maxRetries = maxRetries;
this.retryPause = retryPause;
this.error = error;
}

Bound() {
this(null, null, null, null, 0, CHUNK_SIZE, defaultErrorHandler());
this(null, null, null, null, 0,
CHUNK_SIZE, DEFAULT_RETRIES, DEFAULT_RETRY_PAUSE,
defaultErrorHandler());
}

public Bound<T> withClusterName(String clusterName) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error);
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

public Bound<T> withServers(InetSocketAddress[] servers) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error);
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

public Bound<T> withFlushInterval(Duration flushInterval) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error);
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

public Bound<T> withFunction(
SerializableFunction<T, Iterable<DocWriteRequest<?>>> toIndexRequest) {
return new Bound<>(clusterName, servers, flushInterval, toIndexRequest, numOfShard, maxBulkRequestSize, error);
return new Bound<>(clusterName, servers, flushInterval, toIndexRequest, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

public Bound<T> withNumOfShard(long numOfShard) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error);
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

public Bound<T> withError(ThrowingConsumer<BulkExecutionException> error) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error);
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

public Bound<T> withMaxBulkRequestSize(int maxBulkRequestSize) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error);
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

public Bound<T> withMaxRetries(int maxRetries) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

public Bound<T> withRetryPause(Duration retryPause) {
return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard,
maxBulkRequestSize,
maxRetries, retryPause, error);
}

@Override
Expand All @@ -193,6 +254,8 @@ public PDone expand(final PCollection<T> input) {
checkNotNull(flushInterval);
checkArgument(numOfShard > 0);
checkArgument(maxBulkRequestSize > 0);
checkArgument(maxRetries >= 0);
checkArgument(retryPause.getMillis() >= 0);
input
.apply("Assign To Shard", ParDo.of(new AssignToShard<>(numOfShard)))
.apply("Re-Window to Global Window", Window.<KV<Long, T>>into(new GlobalWindows())
Expand All @@ -203,9 +266,10 @@ public PDone expand(final PCollection<T> input) {
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.apply(GroupByKey.create())
.apply("Write to Elasticesarch",
.apply("Write to Elasticsearch",
ParDo.of(new ElasticsearchWriter<>
(clusterName, servers, maxBulkRequestSize, toDocWriteRequests, error)));
(clusterName, servers, maxBulkRequestSize, toDocWriteRequests, error,
maxRetries, retryPause)));
return PDone.in(input.getPipeline());
}
}
Expand All @@ -229,23 +293,42 @@ public void processElement(ProcessContext c) throws Exception {
private static class ElasticsearchWriter<T> extends DoFn<KV<Long, Iterable<T>>, Void> {

private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class);
private static final String RETRY_ATTEMPT_LOG =
"Error writing to Elasticsearch. Retry attempt[%d]";
private static final String RETRY_FAILED_LOG =
"Error writing to ES after %d attempt(s). No more attempts allowed";

private final ClientSupplier clientSupplier;
private final SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests;
private final ThrowingConsumer<BulkExecutionException> error;
private FluentBackoff backoffConfig;
private final int maxBulkRequestSize;
private final int maxRetries;
private final Duration retryPause;

public ElasticsearchWriter(String clusterName,
InetSocketAddress[] servers,
int maxBulkRequestSize,
SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests,
ThrowingConsumer<BulkExecutionException> error) {
InetSocketAddress[] servers,
int maxBulkRequestSize,
SerializableFunction<T, Iterable<DocWriteRequest<?>>> toDocWriteRequests,
ThrowingConsumer<BulkExecutionException> error,
int maxRetries,
Duration retryPause) {
this.maxBulkRequestSize = maxBulkRequestSize;
this.clientSupplier = new ClientSupplier(clusterName, servers);
this.toDocWriteRequests = toDocWriteRequests;
this.error = error;
this.maxRetries = maxRetries;
this.retryPause = retryPause;
}

@Setup
public void setup() throws Exception {
this.backoffConfig = FluentBackoff.DEFAULT
.withMaxRetries(this.maxRetries)
.withInitialBackoff(this.retryPause);
}

@SuppressWarnings("Duplicates")
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
final Iterable<T> values = c.element().getValue();
Expand All @@ -264,17 +347,61 @@ public void processElement(ProcessContext c) throws Exception {

final Iterable<List<DocWriteRequest>> chunks =
Iterables.partition(docWriteRequests::iterator, maxBulkRequestSize);
chunks.forEach(chunk -> {

final ProcessFunction<List<DocWriteRequest>, BulkResponse> requestFn =
request(clientSupplier, error);
final ProcessFunction<List<DocWriteRequest>, BulkResponse> retryFn =
retry(requestFn, backoffConfig);

for (final List<DocWriteRequest> chunk : chunks) {
try {
final BulkRequest bulkRequest = new BulkRequest().add(chunk);
final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get();
if (bulkItemResponse.hasFailures()) {
error.accept(new BulkExecutionException(bulkItemResponse));
}
requestFn.apply(chunk);
} catch (Exception e) {
throw new RuntimeException(e);
retryFn.apply(chunk);
}
}
}

private static ProcessFunction<List<DocWriteRequest>, BulkResponse> request(
final ClientSupplier clientSupplier,
final ThrowingConsumer<BulkExecutionException> bulkErrorHandler) {
return chunk -> {
final BulkRequest bulkRequest = new BulkRequest().add(chunk);
final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get();

if (bulkItemResponse.hasFailures()) {
bulkErrorHandler.accept(new BulkExecutionException(bulkItemResponse));
}
});

return bulkItemResponse;
};
}

private static ProcessFunction<List<DocWriteRequest>, BulkResponse> retry(
final ProcessFunction<List<DocWriteRequest>, BulkResponse> requestFn,
final FluentBackoff backoffConfig) {
return chunk -> {
final BackOff backoff = backoffConfig.backoff();
int attempt = 0;
BulkResponse response = null;
Exception exception = null;

while (response == null && BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
try {
response = requestFn.apply(chunk);
exception = null;
} catch (Exception e) {
exception = e;
}
}

if (exception != null) {
throw new Exception(String.format(RETRY_FAILED_LOG, attempt), exception);
}

return response;
};
}
}

Expand Down Expand Up @@ -342,4 +469,4 @@ public Iterable<Throwable> getFailures() {
}
}
}
}
}
Expand Up @@ -60,6 +60,8 @@ final case class ElasticsearchIO[T](esOptions: ElasticsearchOptions) extends Sci
.withFlushInterval(params.flushInterval)
.withNumOfShard(shards)
.withMaxBulkRequestSize(params.maxBulkRequestSize)
.withMaxRetries(params.retry.maxRetries)
.withRetryPause(params.retry.retryPause)
.withError(new beam.ThrowingConsumer[BulkExecutionException] {
override def accept(t: BulkExecutionException): Unit =
params.errorFn(t)
Expand All @@ -78,12 +80,21 @@ object ElasticsearchIO {
private[elasticsearch] val DefaultFlushInterval = Duration.standardSeconds(1)
private[elasticsearch] val DefaultNumShards = 0
private[elasticsearch] val DefaultMaxBulkRequestSize = 3000
private[elasticsearch] val DefaultMaxRetries = 3
private[elasticsearch] val DefaultRetryPause = Duration.millis(35000)
private[elasticsearch] val DefaultRetryConfig =
RetryConfig(maxRetries = WriteParam.DefaultMaxRetries,
retryPause = WriteParam.DefaultRetryPause)

}

final case class WriteParam[T] private (
f: T => Iterable[DocWriteRequest[_]],
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn,
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize)
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
retry: RetryConfig = WriteParam.DefaultRetryConfig)

final case class RetryConfig(maxRetries: Int, retryPause: Duration)
}
Expand Up @@ -22,7 +22,7 @@ import java.net.InetSocketAddress
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.values.SCollection
import com.spotify.scio.coders.Coder
import com.spotify.scio.elasticsearch.ElasticsearchIO.WriteParam
import com.spotify.scio.elasticsearch.ElasticsearchIO.{RetryConfig, WriteParam}
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write.BulkExecutionException
import org.elasticsearch.action.DocWriteRequest
import org.joda.time.Duration
Expand Down Expand Up @@ -55,9 +55,10 @@ package object elasticsearch {
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)(
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn,
retry: RetryConfig = WriteParam.DefaultRetryConfig)(
f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): ClosedTap[Nothing] = {
val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize)
val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize, retry)
self.write(ElasticsearchIO[T](esOptions))(param)
}
}
Expand Down

0 comments on commit 408e98c

Please sign in to comment.