From 7146ab9effc6cae62ca5dff5f093bb71cce7cc12 Mon Sep 17 00:00:00 2001 From: Josh Whelchel Date: Wed, 23 Jan 2019 14:50:35 -0500 Subject: [PATCH 1/7] Retry bulk chunk requests to Elasticsearch 2, 5, and 6 Implements retries when saving bulk documents to Elasticsearch. This is useful for transient connection problems and other failures which are temporary in nature. The current behavior will cause the entire "Save to Elasticsearch" step to fail and retry, which creates redundant saves to the cluster. There is no existing method to enable a single failed chunk to retry. This behavior is implemented with two new options, withMaxRetries and withRetryPause, which can be passed in the configuration as "maxRetries" and "retryPause" respectively. --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 132 ++++++++++++++--- .../scio/elasticsearch/ElasticsearchIO.scala | 8 +- .../sdk/io/elasticsearch/ElasticsearchIO.java | 139 ++++++++++++++---- .../scio/elasticsearch/ElasticsearchIO.scala | 8 +- .../sdk/io/elasticsearch/ElasticsearchIO.java | 117 +++++++++++---- .../scio/elasticsearch/ElasticsearchIO.scala | 8 +- 6 files changed, 334 insertions(+), 78 deletions(-) diff --git a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 31135842f2..208ccc5381 100644 --- a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -122,8 +122,29 @@ public static Bound withMaxBulkRequestSize(int maxBulkRequestSize) { return new Bound<>().withMaxBulkRequestSize(maxBulkRequestSize); } + /** + * Returns a transform for writing to Elasticsearch cluster. + * + * @param maxRetries Maximum number of retries to attempt for saving any single chunk of bulk + * requests to the Elasticsearch cluster. + */ + public static Bound withMaxRetries(int maxRetries) { + return new Bound<>().withMaxRetries(maxRetries); + } + + /** + * Returns a transform for writing to Elasticsearch cluster. + * + * @param retryPause Number of seconds to wait between successive retry attempts. + */ + public static Bound withRetryPause(int retryPause) { + return new Bound<>().withRetryPause(retryPause); + } + public static class Bound extends PTransform, PDone> { private static final int CHUNK_SIZE = 3000; + private static final int DEFAULT_RETRIES = 3; + private static final int DEFAULT_RETRY_PAUSE = 5; private final String clusterName; private final InetSocketAddress[] servers; @@ -131,54 +152,85 @@ public static class Bound extends PTransform, PDone> { private final SerializableFunction>> toActionRequests; private final long numOfShard; private final int maxBulkRequestSize; + private final int maxRetries; + private final int retryPause; private final ThrowingConsumer error; private Bound(final String clusterName, - final InetSocketAddress[] servers, - final Duration flushInterval, - final SerializableFunction>> toActionRequests, - final long numOfShard, - final int maxBulkRequestSize, - final ThrowingConsumer error) { + final InetSocketAddress[] servers, + final Duration flushInterval, + final SerializableFunction>> toActionRequests, + final long numOfShard, + final int maxBulkRequestSize, + int maxRetries, int retryPause, final ThrowingConsumer error) { this.clusterName = clusterName; this.servers = servers; this.flushInterval = flushInterval; this.toActionRequests = toActionRequests; this.numOfShard = numOfShard; this.maxBulkRequestSize = maxBulkRequestSize; + this.maxRetries = maxRetries; + this.retryPause = retryPause; this.error = error; } Bound() { - this(null, null, null, null, 0, CHUNK_SIZE, defaultErrorHandler()); + this(null, null, null, null, 0, CHUNK_SIZE, DEFAULT_RETRIES, DEFAULT_RETRY_PAUSE, + defaultErrorHandler()); } public Bound withClusterName(String clusterName) { - return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withServers(InetSocketAddress[] servers) { - return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withFlushInterval(Duration flushInterval) { - return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withFunction(SerializableFunction>> toIndexRequest) { - return new Bound<>(clusterName, servers, flushInterval, toIndexRequest, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toIndexRequest, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withNumOfShard(long numOfShard) { - return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withError(ThrowingConsumer error) { - return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withMaxBulkRequestSize(int maxBulkRequestSize) { - return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); + } + + public Bound withMaxRetries(int maxRetries) { + return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); + } + + public Bound withRetryPause(int retryPause) { + return new Bound<>(clusterName, servers, flushInterval, toActionRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } @Override @@ -189,6 +241,8 @@ public PDone expand(final PCollection input) { checkNotNull(flushInterval); checkArgument(numOfShard > 0); checkArgument(maxBulkRequestSize > 0); + checkArgument(maxRetries >= 0); + checkArgument(retryPause >= 0); input .apply("Assign To Shard", ParDo.of(new AssignToShard<>(numOfShard))) .apply("Re-Window to Global Window", Window.>into(new GlobalWindows()) @@ -199,9 +253,10 @@ public PDone expand(final PCollection input) { .discardingFiredPanes() .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) .apply(GroupByKey.create()) - .apply("Write to Elasticesarch", + .apply("Write to Elasticesearch", ParDo.of(new ElasticsearchWriter<> - (clusterName, servers, maxBulkRequestSize, toActionRequests, error))); + (clusterName, servers, maxBulkRequestSize, toActionRequests, error, + maxRetries, retryPause))); return PDone.in(input.getPipeline()); } } @@ -227,18 +282,24 @@ private static class ElasticsearchWriter extends DoFn>, private final SerializableFunction>> toActionRequests; private final ThrowingConsumer error; private final int maxBulkRequestSize; + private final int maxRetries; + private final int retryPause; public ElasticsearchWriter(String clusterName, - InetSocketAddress[] servers, - int maxBulkRequestSize, - SerializableFunction>> toActionRequests, - ThrowingConsumer error) { + InetSocketAddress[] servers, + int maxBulkRequestSize, + SerializableFunction>> toActionRequests, + ThrowingConsumer error, int maxRetries, int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; + this.maxRetries = maxRetries; + this.retryPause = retryPause; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toActionRequests = toActionRequests; this.error = error; } + + @SuppressWarnings("Duplicates") @ProcessElement public void processElement(ProcessContext c) throws Exception { final Iterable values = c.element().getValue(); @@ -259,15 +320,38 @@ public void processElement(ProcessContext c) throws Exception { Iterables.partition(actionRequests::iterator, maxBulkRequestSize); chunks.forEach(chunk -> { + int attempts = 0; try { - final BulkRequest bulkRequest = new BulkRequest().add(chunk); - final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); - if (bulkItemResponse.hasFailures()) { - error.accept(new BulkExecutionException(bulkItemResponse)); + BulkResponse failedBulkItemResponse = null; + + while (attempts <= maxRetries) { + final BulkRequest bulkRequest = new BulkRequest().add(chunk); + final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); + + if (bulkItemResponse.hasFailures()) { + failedBulkItemResponse = bulkItemResponse; + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of {} items, " + + "attempts remaining: {}", + chunk.size(), + (maxRetries - attempts)); + attempts += 1; + if (attempts <= maxRetries) { + Thread.sleep(retryPause * 1000); + } + } else { + failedBulkItemResponse = null; + break; + } + } + + if (failedBulkItemResponse != null) { + error.accept(new BulkExecutionException(failedBulkItemResponse)); } } catch (Exception e) { throw new RuntimeException(e); } + }); } } diff --git a/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala b/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala index 51d5ae97b5..e1bc5ae3f0 100644 --- a/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala +++ b/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala @@ -57,6 +57,8 @@ final case class ElasticsearchIO[T](esOptions: ElasticsearchOptions) extends Sci .withFlushInterval(params.flushInterval) .withNumOfShard(shards) .withMaxBulkRequestSize(params.maxBulkRequestSize) + .withMaxRetries(params.maxRetries) + .withRetryPause(params.retryPause) .withError(new beam.ThrowingConsumer[BulkExecutionException] { override def accept(t: BulkExecutionException): Unit = params.errorFn(t) @@ -75,6 +77,8 @@ object ElasticsearchIO { private[elasticsearch] val DefaultFlushInterval = Duration.standardSeconds(1) private[elasticsearch] val DefaultNumShards = 0 private[elasticsearch] val DefaultMaxBulkRequestSize = 3000 + private[elasticsearch] val DefaultMaxRetries = 3 + private[elasticsearch] val DefaultRetryPause = 5 } final case class WriteParam[T] private ( @@ -82,5 +86,7 @@ object ElasticsearchIO { errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, - maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize) + maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause) } diff --git a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 1c67797c31..43a288bdb6 100644 --- a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -121,12 +121,34 @@ public static Bound withError(ThrowingConsumer error return new Bound<>().withError(error); } - public static Bound withMaxBulkRequestSize(int maxBulkRequestSize) { + public static Bound withMaxBulkRequestSize(int maxBulkRequestSize) { return new Bound<>().withMaxBulkRequestSize(maxBulkRequestSize); } + /** + * Returns a transform for writing to Elasticsearch cluster. + * + * @param maxRetries Maximum number of retries to attempt for saving any single chunk of bulk + * requests to the Elasticsearch cluster. + */ + public static Bound withMaxRetries(int maxRetries) { + return new Bound<>().withMaxRetries(maxRetries); + } + + /** + * Returns a transform for writing to Elasticsearch cluster. + * + * @param retryPause Number of seconds to wait between successive retry attempts. + */ + public static Bound withRetryPause(int retryPause) { + return new Bound<>().withRetryPause(retryPause); + } + public static class Bound extends PTransform, PDone> { + private static final int CHUNK_SIZE = 3000; + private static final int DEFAULT_RETRIES = 3; + private static final int DEFAULT_RETRY_PAUSE = 5; private final String clusterName; private final InetSocketAddress[] servers; @@ -134,55 +156,88 @@ public static class Bound extends PTransform, PDone> { private final SerializableFunction>> toDocWriteRequests; private final long numOfShard; private final int maxBulkRequestSize; + private final int maxRetries; + private final int retryPause; private final ThrowingConsumer error; private Bound(final String clusterName, - final InetSocketAddress[] servers, - final Duration flushInterval, - final SerializableFunction>> toDocWriteRequests, - final long numOfShard, - final int maxBulkRequestSize, - final ThrowingConsumer error) { + final InetSocketAddress[] servers, + final Duration flushInterval, + final SerializableFunction>> toDocWriteRequests, + final long numOfShard, + final int maxBulkRequestSize, + final int maxRetries, final int retryPause, + final ThrowingConsumer error) { this.clusterName = clusterName; this.servers = servers; this.flushInterval = flushInterval; this.toDocWriteRequests = toDocWriteRequests; this.numOfShard = numOfShard; this.maxBulkRequestSize = maxBulkRequestSize; + this.maxRetries = maxRetries; + this.retryPause = retryPause; this.error = error; } Bound() { - this(null, null, null, null, 0, CHUNK_SIZE, defaultErrorHandler()); + this(null, null, null, null, 0, + CHUNK_SIZE, DEFAULT_RETRIES, DEFAULT_RETRY_PAUSE, + defaultErrorHandler()); } public Bound withClusterName(String clusterName) { - return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withServers(InetSocketAddress[] servers) { - return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withFlushInterval(Duration flushInterval) { - return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withFunction( SerializableFunction>> toIndexRequest) { - return new Bound<>(clusterName, servers, flushInterval, toIndexRequest, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toIndexRequest, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withNumOfShard(long numOfShard) { - return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withError(ThrowingConsumer error) { - return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } public Bound withMaxBulkRequestSize(int maxBulkRequestSize) { - return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, maxBulkRequestSize, error); + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); + } + + public Bound withMaxRetries(int maxRetries) { + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); + } + + public Bound withRetryPause(int retryPause) { + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } @Override @@ -193,6 +248,8 @@ public PDone expand(final PCollection input) { checkNotNull(flushInterval); checkArgument(numOfShard > 0); checkArgument(maxBulkRequestSize > 0); + checkArgument(maxRetries >= 0); + checkArgument(retryPause >= 0); input .apply("Assign To Shard", ParDo.of(new AssignToShard<>(numOfShard))) .apply("Re-Window to Global Window", Window.>into(new GlobalWindows()) @@ -203,9 +260,10 @@ public PDone expand(final PCollection input) { .discardingFiredPanes() .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) .apply(GroupByKey.create()) - .apply("Write to Elasticesarch", + .apply("Write to Elasticsearch", ParDo.of(new ElasticsearchWriter<> - (clusterName, servers, maxBulkRequestSize, toDocWriteRequests, error))); + (clusterName, servers, maxBulkRequestSize, toDocWriteRequests, error, + maxRetries, retryPause))); return PDone.in(input.getPipeline()); } } @@ -233,19 +291,25 @@ private static class ElasticsearchWriter extends DoFn>, private final SerializableFunction>> toDocWriteRequests; private final ThrowingConsumer error; private final int maxBulkRequestSize; + private final int maxRetries; + private final int retryPause; public ElasticsearchWriter(String clusterName, - InetSocketAddress[] servers, - int maxBulkRequestSize, - SerializableFunction>> toDocWriteRequests, - ThrowingConsumer error) { + InetSocketAddress[] servers, + int maxBulkRequestSize, + SerializableFunction>> toDocWriteRequests, + ThrowingConsumer error, + int maxRetries, int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; + this.maxRetries = maxRetries; + this.retryPause = retryPause; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toDocWriteRequests = toDocWriteRequests; this.error = error; } + @SuppressWarnings("Duplicates") @ProcessElement public void processElement(ProcessContext c) throws Exception { final Iterable values = c.element().getValue(); @@ -265,12 +329,35 @@ public void processElement(ProcessContext c) throws Exception { final Iterable> chunks = Iterables.partition(docWriteRequests::iterator, maxBulkRequestSize); chunks.forEach(chunk -> { + int attempts = 0; try { - final BulkRequest bulkRequest = new BulkRequest().add(chunk); - final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); - if (bulkItemResponse.hasFailures()) { - error.accept(new BulkExecutionException(bulkItemResponse)); + BulkResponse failedBulkItemResponse = null; + + while (attempts <= maxRetries) { + final BulkRequest bulkRequest = new BulkRequest().add(chunk); + final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); + + if (bulkItemResponse.hasFailures()) { + failedBulkItemResponse = bulkItemResponse; + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of {} items, " + + "attempts remaining: {}", + chunk.size(), + (maxRetries - attempts)); + attempts += 1; + if (attempts <= maxRetries) { + Thread.sleep(retryPause * 1000); + } + } else { + failedBulkItemResponse = null; + break; + } } + + if (failedBulkItemResponse != null) { + error.accept(new BulkExecutionException(failedBulkItemResponse)); + } + } catch (Exception e) { throw new RuntimeException(e); } @@ -342,4 +429,4 @@ public Iterable getFailures() { } } } -} +} \ No newline at end of file diff --git a/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala b/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala index 71b0da764c..07da096b6b 100644 --- a/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala +++ b/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala @@ -61,6 +61,8 @@ final case class ElasticsearchIO[T](esOptions: ElasticsearchOptions) extends Sci .withFlushInterval(params.flushInterval) .withNumOfShard(shards) .withMaxBulkRequestSize(params.maxBulkRequestSize) + .withMaxRetries(params.maxRetries) + .withRetryPause(params.retryPause) .withError(new beam.ThrowingConsumer[BulkExecutionException] { override def accept(t: BulkExecutionException): Unit = params.errorFn(t) @@ -79,6 +81,8 @@ object ElasticsearchIO { private[elasticsearch] val DefaultFlushInterval = Duration.standardSeconds(1) private[elasticsearch] val DefaultNumShards = 0 private[elasticsearch] val DefaultMaxBulkRequestSize = 3000 + private[elasticsearch] val DefaultMaxRetries = 3 + private[elasticsearch] val DefaultRetryPause = 5 } final case class WriteParam[T] private ( @@ -86,5 +90,7 @@ object ElasticsearchIO { errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, - maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize) + maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause) } diff --git a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 513280699e..9caf39a32b 100644 --- a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -125,9 +125,30 @@ public static Bound withMaxBulkRequestSize(int maxBulkRequestSize) { return new Bound<>().withMaxBulkRequestSize(maxBulkRequestSize); } + /** + * Returns a transform for writing to Elasticsearch cluster. + * + * @param maxRetries Maximum number of retries to attempt for saving any single chunk of bulk + * requests to the Elasticsearch cluster. + */ + public static Bound withMaxRetries(int maxRetries) { + return new Bound<>().withMaxRetries(maxRetries); + } + + /** + * Returns a transform for writing to Elasticsearch cluster. + * + * @param retryPause Number of seconds to wait between successive retry attempts. + */ + public static Bound withRetryPause(int retryPause) { + return new Bound<>().withRetryPause(retryPause); + } + public static class Bound extends PTransform, PDone> { private static final int CHUNK_SIZE = 3000; + private static final int DEFAULT_RETRIES = 3; + private static final int DEFAULT_RETRY_PAUSE = 5; private final String clusterName; private final InetSocketAddress[] servers; @@ -135,62 +156,78 @@ public static class Bound extends PTransform, PDone> { private final SerializableFunction>> toDocWriteRequests; private final long numOfShard; private final int maxBulkRequestSize; + private final int maxRetries; + private final int retryPause; private final ThrowingConsumer error; private Bound(final String clusterName, - final InetSocketAddress[] servers, - final Duration flushInterval, - final SerializableFunction>> toDocWriteRequests, - final long numOfShard, - final int maxBulkRequestSize, - final ThrowingConsumer error) { + final InetSocketAddress[] servers, + final Duration flushInterval, + final SerializableFunction>> toDocWriteRequests, + final long numOfShard, + final int maxBulkRequestSize, + int maxRetries, int retryPause, final ThrowingConsumer error) { this.clusterName = clusterName; this.servers = servers; this.flushInterval = flushInterval; this.toDocWriteRequests = toDocWriteRequests; this.numOfShard = numOfShard; this.maxBulkRequestSize = maxBulkRequestSize; + this.maxRetries = maxRetries; + this.retryPause = retryPause; this.error = error; } Bound() { - this(null, null, null, null, 0, CHUNK_SIZE, defaultErrorHandler()); + this(null, null, null, null, 0, CHUNK_SIZE, DEFAULT_RETRIES, DEFAULT_RETRY_PAUSE, defaultErrorHandler()); } public Bound withClusterName(String clusterName) { return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, - maxBulkRequestSize, error); + maxBulkRequestSize, maxRetries, retryPause, error); } public Bound withServers(InetSocketAddress[] servers) { return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, - maxBulkRequestSize, error); + maxBulkRequestSize, maxRetries, retryPause, error); } public Bound withFlushInterval(Duration flushInterval) { return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, - maxBulkRequestSize, error); + maxBulkRequestSize, maxRetries, retryPause, error); } public Bound withFunction( SerializableFunction>> toIndexRequest) { return new Bound<>(clusterName, servers, flushInterval, toIndexRequest, numOfShard, - maxBulkRequestSize, error); + maxBulkRequestSize, maxRetries, retryPause, error); } public Bound withNumOfShard(long numOfShard) { return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, - maxBulkRequestSize, error); + maxBulkRequestSize, maxRetries, retryPause, error); } public Bound withError(ThrowingConsumer error) { return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, - maxBulkRequestSize, error); + maxBulkRequestSize, maxRetries, retryPause, error); } public Bound withMaxBulkRequestSize(int maxBulkRequestSize) { return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, - maxBulkRequestSize, error); + maxBulkRequestSize, maxRetries, retryPause, error); + } + + public Bound withMaxRetries(int maxRetries) { + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); + } + + public Bound withRetryPause(int retryPause) { + return new Bound<>(clusterName, servers, flushInterval, toDocWriteRequests, numOfShard, + maxBulkRequestSize, + maxRetries, retryPause, error); } @Override @@ -201,6 +238,8 @@ public PDone expand(final PCollection input) { checkNotNull(flushInterval); checkArgument(numOfShard > 0); checkArgument(maxBulkRequestSize > 0); + checkArgument(maxRetries >= 0); + checkArgument(retryPause >= 0); input .apply("Assign To Shard", ParDo.of(new AssignToShard<>(numOfShard))) .apply("Re-Window to Global Window", Window.>into(new GlobalWindows()) @@ -211,10 +250,10 @@ public PDone expand(final PCollection input) { .discardingFiredPanes() .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) .apply(GroupByKey.create()) - .apply("Write to Elasticesarch", + .apply("Write to Elasticsearch", ParDo.of(new ElasticsearchWriter<> (clusterName, servers, maxBulkRequestSize, toDocWriteRequests, - error))); + error, maxRetries, retryPause))); return PDone.in(input.getPipeline()); } } @@ -242,19 +281,24 @@ private static class ElasticsearchWriter extends DoFn>, private final SerializableFunction>> toDocWriteRequests; private final ThrowingConsumer error; private final int maxBulkRequestSize; + private final int maxRetries; + private final int retryPause; public ElasticsearchWriter(String clusterName, - InetSocketAddress[] servers, - int maxBulkRequestSize, - SerializableFunction>> toDocWriteRequests, - ThrowingConsumer error) { + InetSocketAddress[] servers, + int maxBulkRequestSize, + SerializableFunction>> toDocWriteRequests, + ThrowingConsumer error, int maxRetries, int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; + this.maxRetries = maxRetries; + this.retryPause = retryPause; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toDocWriteRequests = toDocWriteRequests; this.error = error; } + @SuppressWarnings("Duplicates") @ProcessElement public void processElement(ProcessContext c) throws Exception { final Iterable values = c.element().getValue(); @@ -274,13 +318,36 @@ public void processElement(ProcessContext c) throws Exception { final Iterable> chunks = Iterables.partition(docWriteRequests::iterator, maxBulkRequestSize); chunks.forEach(chunk -> { + int attempts = 0; try { - final BulkRequest bulkRequest = new BulkRequest().add(chunk.toArray( - new DocWriteRequest[0])); - final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); - if (bulkItemResponse.hasFailures()) { - error.accept(new BulkExecutionException(bulkItemResponse)); + BulkResponse failedBulkItemResponse = null; + + while (attempts <= maxRetries) { + final BulkRequest bulkRequest = new BulkRequest().add(chunk.toArray( + new DocWriteRequest[0])); + final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); + + if (bulkItemResponse.hasFailures()) { + failedBulkItemResponse = bulkItemResponse; + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of {} items, " + + "attempts remaining: {}", + chunk.size(), + (maxRetries - attempts)); + attempts += 1; + if (attempts <= maxRetries) { + Thread.sleep(retryPause * 1000); + } + } else { + failedBulkItemResponse = null; + break; + } } + + if (failedBulkItemResponse != null) { + error.accept(new BulkExecutionException(failedBulkItemResponse)); + } + } catch (Exception e) { throw new RuntimeException(e); } diff --git a/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala b/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala index e7719f44be..8336d337f5 100644 --- a/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala +++ b/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/ElasticsearchIO.scala @@ -61,6 +61,8 @@ final case class ElasticsearchIO[T](esOptions: ElasticsearchOptions) extends Sci .withFlushInterval(params.flushInterval) .withNumOfShard(shards) .withMaxBulkRequestSize(params.maxBulkRequestSize) + .withMaxRetries(params.maxRetries) + .withRetryPause(params.retryPause) .withError(new beam.ThrowingConsumer[BulkExecutionException] { override def accept(t: BulkExecutionException): Unit = params.errorFn(t) @@ -79,6 +81,8 @@ object ElasticsearchIO { private[elasticsearch] val DefaultFlushInterval = Duration.standardSeconds(1) private[elasticsearch] val DefaultNumShards = 0 private[elasticsearch] val DefaultMaxBulkRequestSize = 3000 + private[elasticsearch] val DefaultMaxRetries = 3 + private[elasticsearch] val DefaultRetryPause = 5 } final case class WriteParam[T] private ( @@ -86,5 +90,7 @@ object ElasticsearchIO { errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, - maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize) + maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause) } From 960584276464f0dbc5853f4f3a0584d1373989db Mon Sep 17 00:00:00 2001 From: Josh Whelchel Date: Wed, 23 Jan 2019 16:04:42 -0500 Subject: [PATCH 2/7] Catch all exceptions within the chunks.forEach block. Some of the exceptions thrown are not caught within the Client bulk() method. --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 50 ++++++++++------- .../sdk/io/elasticsearch/ElasticsearchIO.java | 53 ++++++++++++------- .../sdk/io/elasticsearch/ElasticsearchIO.java | 53 ++++++++++++------- 3 files changed, 99 insertions(+), 57 deletions(-) diff --git a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 208ccc5381..4e8f71c8cd 100644 --- a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -321,37 +321,49 @@ public void processElement(ProcessContext c) throws Exception { chunks.forEach(chunk -> { int attempts = 0; - try { - BulkResponse failedBulkItemResponse = null; + Exception exception = null; - while (attempts <= maxRetries) { - final BulkRequest bulkRequest = new BulkRequest().add(chunk); - final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); + while (attempts <= maxRetries) { + try { + if (attempts > 0) { + // Sleep on subsequent attempts. + Thread.sleep(retryPause * 1000); + } + final BulkRequest bulkRequest = new BulkRequest().add(chunk).refresh(false); + final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); if (bulkItemResponse.hasFailures()) { - failedBulkItemResponse = bulkItemResponse; - LOG.error( - "ElasticsearchWriter: Failed to bulk save chunk of {} items, " + - "attempts remaining: {}", - chunk.size(), - (maxRetries - attempts)); - attempts += 1; - if (attempts <= maxRetries) { - Thread.sleep(retryPause * 1000); - } + exception = new BulkExecutionException(bulkItemResponse); } else { - failedBulkItemResponse = null; + exception = null; break; } + } catch (Exception e) { + exception = e; + } finally { + if (exception != null) { + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of " + + Objects.toString(chunk.size()) + " items, attempts remaining: " + + Objects.toString(maxRetries - attempts), + exception); + } + attempts += 1; } + } - if (failedBulkItemResponse != null) { - error.accept(new BulkExecutionException(failedBulkItemResponse)); + try { + if (exception != null) { + if (exception instanceof BulkExecutionException) { + // This may result in no exception being thrown, depending on callback. + error.accept((BulkExecutionException) exception); + } else { + throw exception; + } } } catch (Exception e) { throw new RuntimeException(e); } - }); } } diff --git a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 43a288bdb6..9dcd32f63e 100644 --- a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -50,6 +50,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; @@ -328,36 +329,50 @@ public void processElement(ProcessContext c) throws Exception { final Iterable> chunks = Iterables.partition(docWriteRequests::iterator, maxBulkRequestSize); + chunks.forEach(chunk -> { int attempts = 0; - try { - BulkResponse failedBulkItemResponse = null; + Exception exception = null; - while (attempts <= maxRetries) { - final BulkRequest bulkRequest = new BulkRequest().add(chunk); - final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); + while (attempts <= maxRetries) { + try { + if (attempts > 0) { + // Sleep on subsequent attempts. + Thread.sleep(retryPause * 1000); + } + final BulkRequest bulkRequest = new BulkRequest().add(chunk) + .setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); if (bulkItemResponse.hasFailures()) { - failedBulkItemResponse = bulkItemResponse; - LOG.error( - "ElasticsearchWriter: Failed to bulk save chunk of {} items, " + - "attempts remaining: {}", - chunk.size(), - (maxRetries - attempts)); - attempts += 1; - if (attempts <= maxRetries) { - Thread.sleep(retryPause * 1000); - } + exception = new BulkExecutionException(bulkItemResponse); } else { - failedBulkItemResponse = null; + exception = null; break; } + } catch (Exception e) { + exception = e; + } finally { + if (exception != null) { + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of " + + Objects.toString(chunk.size()) + " items, attempts remaining: " + + Objects.toString(maxRetries - attempts), + exception); + } + attempts += 1; } + } - if (failedBulkItemResponse != null) { - error.accept(new BulkExecutionException(failedBulkItemResponse)); + try { + if (exception != null) { + if (exception instanceof BulkExecutionException) { + // This may result in no exception being thrown, depending on callback. + error.accept((BulkExecutionException) exception); + } else { + throw exception; + } } - } catch (Exception e) { throw new RuntimeException(e); } diff --git a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 9caf39a32b..bfc6ebbec6 100644 --- a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -50,6 +50,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; @@ -317,37 +318,51 @@ public void processElement(ProcessContext c) throws Exception { final Iterable> chunks = Iterables.partition(docWriteRequests::iterator, maxBulkRequestSize); + chunks.forEach(chunk -> { int attempts = 0; - try { - BulkResponse failedBulkItemResponse = null; + Exception exception = null; + + while (attempts <= maxRetries) { + try { + if (attempts > 0) { + // Sleep on subsequent attempts. + Thread.sleep(retryPause * 1000); + } - while (attempts <= maxRetries) { final BulkRequest bulkRequest = new BulkRequest().add(chunk.toArray( - new DocWriteRequest[0])); + new DocWriteRequest[0])) + .setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); - if (bulkItemResponse.hasFailures()) { - failedBulkItemResponse = bulkItemResponse; - LOG.error( - "ElasticsearchWriter: Failed to bulk save chunk of {} items, " + - "attempts remaining: {}", - chunk.size(), - (maxRetries - attempts)); - attempts += 1; - if (attempts <= maxRetries) { - Thread.sleep(retryPause * 1000); - } + exception = new BulkExecutionException(bulkItemResponse); } else { - failedBulkItemResponse = null; + exception = null; break; } + } catch (Exception e) { + exception = e; + } finally { + if (exception != null) { + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of " + + Objects.toString(chunk.size()) + " items, attempts remaining: " + + Objects.toString(maxRetries - attempts), + exception); + } + attempts += 1; } + } - if (failedBulkItemResponse != null) { - error.accept(new BulkExecutionException(failedBulkItemResponse)); + try { + if (exception != null) { + if (exception instanceof BulkExecutionException) { + // This may result in no exception being thrown, depending on callback. + error.accept((BulkExecutionException) exception); + } else { + throw exception; + } } - } catch (Exception e) { throw new RuntimeException(e); } From 00c299fbc2b5fbd823c697fc95e2bc0cc3f57cd9 Mon Sep 17 00:00:00 2001 From: Josh Whelchel Date: Wed, 23 Jan 2019 16:11:23 -0500 Subject: [PATCH 3/7] Adds new params maxRetries and retryPause to saveAsElasticsearch method --- .../com/spotify/scio/elasticsearch/package.scala | 14 ++++++++++++-- .../com/spotify/scio/elasticsearch/package.scala | 14 ++++++++++++-- .../com/spotify/scio/elasticsearch/package.scala | 14 ++++++++++++-- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala index 04fc2cb2c5..9966989954 100644 --- a/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,14 +52,24 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes + * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions + * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause)( f: T => Iterable[ActionRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) + val param = WriteParam(f, + errorFn, + flushInterval, + numOfShards, + maxBulkRequestSize, + maxRetries, + retryPause) self.write(ElasticsearchIO[T](esOptions))(param) } } diff --git a/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala index 150bf28e22..4436129e8b 100644 --- a/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,14 +52,24 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes + * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions + * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause)( f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) + val param = WriteParam(f, + errorFn, + flushInterval, + numOfShards, + maxBulkRequestSize, + maxRetries, + retryPause) self.write(ElasticsearchIO[T](esOptions))(param) } } diff --git a/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala index ab082ab28e..a754884316 100644 --- a/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,14 +52,24 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes + * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions + * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause)( f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) + val param = WriteParam(f, + errorFn, + flushInterval, + numOfShards, + maxBulkRequestSize, + maxRetries, + retryPause) self.write(ElasticsearchIO[T](esOptions))(param) } } From b4d95d1127f34dcc64e7a947934a1a9a703823b0 Mon Sep 17 00:00:00 2001 From: Josh Whelchel Date: Wed, 23 Jan 2019 16:15:19 -0500 Subject: [PATCH 4/7] Revert "Adds new params maxRetries and retryPause to saveAsElasticsearch method" This exceeds the number of max params in scala style. So rely on it coming as cmdLine arg. This reverts commit 00c299fbc2b5fbd823c697fc95e2bc0cc3f57cd9. --- .../com/spotify/scio/elasticsearch/package.scala | 14 ++------------ .../com/spotify/scio/elasticsearch/package.scala | 14 ++------------ .../com/spotify/scio/elasticsearch/package.scala | 14 ++------------ 3 files changed, 6 insertions(+), 36 deletions(-) diff --git a/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala index 9966989954..04fc2cb2c5 100644 --- a/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,24 +52,14 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes - * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions - * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, - maxRetries: Int = WriteParam.DefaultMaxRetries, - retryPause: Int = WriteParam.DefaultRetryPause)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( f: T => Iterable[ActionRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, - errorFn, - flushInterval, - numOfShards, - maxBulkRequestSize, - maxRetries, - retryPause) + val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) self.write(ElasticsearchIO[T](esOptions))(param) } } diff --git a/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala index 4436129e8b..150bf28e22 100644 --- a/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,24 +52,14 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes - * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions - * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, - maxRetries: Int = WriteParam.DefaultMaxRetries, - retryPause: Int = WriteParam.DefaultRetryPause)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, - errorFn, - flushInterval, - numOfShards, - maxBulkRequestSize, - maxRetries, - retryPause) + val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) self.write(ElasticsearchIO[T](esOptions))(param) } } diff --git a/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala index a754884316..ab082ab28e 100644 --- a/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,24 +52,14 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes - * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions - * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, - maxRetries: Int = WriteParam.DefaultMaxRetries, - retryPause: Int = WriteParam.DefaultRetryPause)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, - errorFn, - flushInterval, - numOfShards, - maxBulkRequestSize, - maxRetries, - retryPause) + val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) self.write(ElasticsearchIO[T](esOptions))(param) } } From 2bd54923dd70e802b7c6fcf45b4e2bb82804918f Mon Sep 17 00:00:00 2001 From: Josh Whelchel Date: Tue, 19 Feb 2019 14:32:00 -0500 Subject: [PATCH 5/7] Implements FluentBackoff --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 51 +++++++++++-------- .../sdk/io/elasticsearch/ElasticsearchIO.java | 51 +++++++++++-------- .../sdk/io/elasticsearch/ElasticsearchIO.java | 51 +++++++++++-------- 3 files changed, 87 insertions(+), 66 deletions(-) diff --git a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 4e8f71c8cd..ecebde6bc1 100644 --- a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -43,6 +43,8 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -282,8 +284,7 @@ private static class ElasticsearchWriter extends DoFn>, private final SerializableFunction>> toActionRequests; private final ThrowingConsumer error; private final int maxBulkRequestSize; - private final int maxRetries; - private final int retryPause; + private final FluentBackoff backoffConfig; public ElasticsearchWriter(String clusterName, InetSocketAddress[] servers, @@ -291,11 +292,13 @@ public ElasticsearchWriter(String clusterName, SerializableFunction>> toActionRequests, ThrowingConsumer error, int maxRetries, int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; - this.maxRetries = maxRetries; - this.retryPause = retryPause; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toActionRequests = toActionRequests; this.error = error; + + this.backoffConfig = FluentBackoff.DEFAULT + .withMaxRetries(maxRetries) + .withInitialBackoff(Duration.standardSeconds(retryPause)); } @@ -320,37 +323,41 @@ public void processElement(ProcessContext c) throws Exception { Iterables.partition(actionRequests::iterator, maxBulkRequestSize); chunks.forEach(chunk -> { - int attempts = 0; - Exception exception = null; + Exception exception; - while (attempts <= maxRetries) { - try { - if (attempts > 0) { - // Sleep on subsequent attempts. - Thread.sleep(retryPause * 1000); - } + final BackOff backoff = backoffConfig.backoff(); + do { + try { final BulkRequest bulkRequest = new BulkRequest().add(chunk).refresh(false); final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); if (bulkItemResponse.hasFailures()) { - exception = new BulkExecutionException(bulkItemResponse); + throw new BulkExecutionException(bulkItemResponse); } else { exception = null; break; } } catch (Exception e) { exception = e; - } finally { - if (exception != null) { - LOG.error( - "ElasticsearchWriter: Failed to bulk save chunk of " + - Objects.toString(chunk.size()) + " items, attempts remaining: " + - Objects.toString(maxRetries - attempts), - exception); + + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of " + + Objects.toString(chunk.size()), + exception); + + // Backoff + try { + final long sleepTime = backoff.nextBackOffMillis(); + if (sleepTime == BackOff.STOP) { + break; + } + Thread.sleep(sleepTime); + } catch (InterruptedException | IOException e1) { + LOG.error("Interrupt during backoff", e1); + break; } - attempts += 1; } - } + } while (true); try { if (exception != null) { diff --git a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 9dcd32f63e..f9105514e9 100644 --- a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -43,6 +43,8 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -292,8 +294,7 @@ private static class ElasticsearchWriter extends DoFn>, private final SerializableFunction>> toDocWriteRequests; private final ThrowingConsumer error; private final int maxBulkRequestSize; - private final int maxRetries; - private final int retryPause; + private final FluentBackoff backoffConfig; public ElasticsearchWriter(String clusterName, InetSocketAddress[] servers, @@ -302,11 +303,13 @@ public ElasticsearchWriter(String clusterName, ThrowingConsumer error, int maxRetries, int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; - this.maxRetries = maxRetries; - this.retryPause = retryPause; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toDocWriteRequests = toDocWriteRequests; this.error = error; + + this.backoffConfig = FluentBackoff.DEFAULT + .withMaxRetries(maxRetries) + .withInitialBackoff(Duration.standardSeconds(retryPause)); } @@ -331,38 +334,42 @@ public void processElement(ProcessContext c) throws Exception { Iterables.partition(docWriteRequests::iterator, maxBulkRequestSize); chunks.forEach(chunk -> { - int attempts = 0; - Exception exception = null; + Exception exception; - while (attempts <= maxRetries) { - try { - if (attempts > 0) { - // Sleep on subsequent attempts. - Thread.sleep(retryPause * 1000); - } + final BackOff backoff = backoffConfig.backoff(); + do { + try { final BulkRequest bulkRequest = new BulkRequest().add(chunk) .setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); if (bulkItemResponse.hasFailures()) { - exception = new BulkExecutionException(bulkItemResponse); + throw new BulkExecutionException(bulkItemResponse); } else { exception = null; break; } } catch (Exception e) { exception = e; - } finally { - if (exception != null) { - LOG.error( - "ElasticsearchWriter: Failed to bulk save chunk of " + - Objects.toString(chunk.size()) + " items, attempts remaining: " + - Objects.toString(maxRetries - attempts), - exception); + + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of " + + Objects.toString(chunk.size()), + exception); + + // Backoff + try { + final long sleepTime = backoff.nextBackOffMillis(); + if (sleepTime == BackOff.STOP) { + break; + } + Thread.sleep(sleepTime); + } catch (InterruptedException | IOException e1) { + LOG.error("Interrupt during backoff", e1); + break; } - attempts += 1; } - } + } while (true); try { if (exception != null) { diff --git a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index bfc6ebbec6..918fb4f7b0 100644 --- a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -43,6 +43,8 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -282,8 +284,7 @@ private static class ElasticsearchWriter extends DoFn>, private final SerializableFunction>> toDocWriteRequests; private final ThrowingConsumer error; private final int maxBulkRequestSize; - private final int maxRetries; - private final int retryPause; + private final FluentBackoff backoffConfig; public ElasticsearchWriter(String clusterName, InetSocketAddress[] servers, @@ -291,11 +292,13 @@ public ElasticsearchWriter(String clusterName, SerializableFunction>> toDocWriteRequests, ThrowingConsumer error, int maxRetries, int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; - this.maxRetries = maxRetries; - this.retryPause = retryPause; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toDocWriteRequests = toDocWriteRequests; this.error = error; + + this.backoffConfig = FluentBackoff.DEFAULT + .withMaxRetries(maxRetries) + .withInitialBackoff(Duration.standardSeconds(retryPause)); } @@ -320,39 +323,43 @@ public void processElement(ProcessContext c) throws Exception { Iterables.partition(docWriteRequests::iterator, maxBulkRequestSize); chunks.forEach(chunk -> { - int attempts = 0; - Exception exception = null; + Exception exception; - while (attempts <= maxRetries) { - try { - if (attempts > 0) { - // Sleep on subsequent attempts. - Thread.sleep(retryPause * 1000); - } + final BackOff backoff = backoffConfig.backoff(); + do { + try { final BulkRequest bulkRequest = new BulkRequest().add(chunk.toArray( new DocWriteRequest[0])) .setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); final BulkResponse bulkItemResponse = clientSupplier.get().bulk(bulkRequest).get(); if (bulkItemResponse.hasFailures()) { - exception = new BulkExecutionException(bulkItemResponse); + throw new BulkExecutionException(bulkItemResponse); } else { exception = null; break; } } catch (Exception e) { exception = e; - } finally { - if (exception != null) { - LOG.error( - "ElasticsearchWriter: Failed to bulk save chunk of " + - Objects.toString(chunk.size()) + " items, attempts remaining: " + - Objects.toString(maxRetries - attempts), - exception); + + LOG.error( + "ElasticsearchWriter: Failed to bulk save chunk of " + + Objects.toString(chunk.size()), + exception); + + // Backoff + try { + final long sleepTime = backoff.nextBackOffMillis(); + if (sleepTime == BackOff.STOP) { + break; + } + Thread.sleep(sleepTime); + } catch (InterruptedException | IOException e1) { + LOG.error("Interrupt during backoff", e1); + break; } - attempts += 1; } - } + } while (true); try { if (exception != null) { From 87e780553bd7249103af0999bf51b87fdf033b33 Mon Sep 17 00:00:00 2001 From: Josh Whelchel Date: Thu, 21 Feb 2019 14:06:17 -0500 Subject: [PATCH 6/7] Implements @Setup for ElasticsearchWriters, restores formatting --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 24 ++++++++++++------ .../sdk/io/elasticsearch/ElasticsearchIO.java | 25 ++++++++++++------- .../sdk/io/elasticsearch/ElasticsearchIO.java | 24 ++++++++++++------ 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index ecebde6bc1..c86d6cde8b 100644 --- a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -283,25 +283,33 @@ private static class ElasticsearchWriter extends DoFn>, private final ClientSupplier clientSupplier; private final SerializableFunction>> toActionRequests; private final ThrowingConsumer error; + private FluentBackoff backoffConfig; private final int maxBulkRequestSize; - private final FluentBackoff backoffConfig; + private final int maxRetries; + private final int retryPause; public ElasticsearchWriter(String clusterName, - InetSocketAddress[] servers, - int maxBulkRequestSize, - SerializableFunction>> toActionRequests, - ThrowingConsumer error, int maxRetries, int retryPause) { + InetSocketAddress[] servers, + int maxBulkRequestSize, + SerializableFunction>> toActionRequests, + ThrowingConsumer error, + int maxRetries, + int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toActionRequests = toActionRequests; this.error = error; + this.maxRetries = maxRetries; + this.retryPause = retryPause; + } + @Setup + public void setup() throws Exception { this.backoffConfig = FluentBackoff.DEFAULT - .withMaxRetries(maxRetries) - .withInitialBackoff(Duration.standardSeconds(retryPause)); + .withMaxRetries(this.maxRetries) + .withInitialBackoff(Duration.standardSeconds(this.retryPause)); } - @SuppressWarnings("Duplicates") @ProcessElement public void processElement(ProcessContext c) throws Exception { diff --git a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index f9105514e9..c0b3a990b1 100644 --- a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -293,26 +293,33 @@ private static class ElasticsearchWriter extends DoFn>, private final ClientSupplier clientSupplier; private final SerializableFunction>> toDocWriteRequests; private final ThrowingConsumer error; + private FluentBackoff backoffConfig; private final int maxBulkRequestSize; - private final FluentBackoff backoffConfig; + private final int maxRetries; + private final int retryPause; public ElasticsearchWriter(String clusterName, - InetSocketAddress[] servers, - int maxBulkRequestSize, - SerializableFunction>> toDocWriteRequests, - ThrowingConsumer error, - int maxRetries, int retryPause) { + InetSocketAddress[] servers, + int maxBulkRequestSize, + SerializableFunction>> toDocWriteRequests, + ThrowingConsumer error, + int maxRetries, + int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toDocWriteRequests = toDocWriteRequests; this.error = error; + this.maxRetries = maxRetries; + this.retryPause = retryPause; + } + @Setup + public void setup() throws Exception { this.backoffConfig = FluentBackoff.DEFAULT - .withMaxRetries(maxRetries) - .withInitialBackoff(Duration.standardSeconds(retryPause)); + .withMaxRetries(this.maxRetries) + .withInitialBackoff(Duration.standardSeconds(this.retryPause)); } - @SuppressWarnings("Duplicates") @ProcessElement public void processElement(ProcessContext c) throws Exception { diff --git a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 918fb4f7b0..82003afc4c 100644 --- a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -283,25 +283,33 @@ private static class ElasticsearchWriter extends DoFn>, private final ClientSupplier clientSupplier; private final SerializableFunction>> toDocWriteRequests; private final ThrowingConsumer error; + private FluentBackoff backoffConfig; private final int maxBulkRequestSize; - private final FluentBackoff backoffConfig; + private final int maxRetries; + private final int retryPause; public ElasticsearchWriter(String clusterName, - InetSocketAddress[] servers, - int maxBulkRequestSize, - SerializableFunction>> toDocWriteRequests, - ThrowingConsumer error, int maxRetries, int retryPause) { + InetSocketAddress[] servers, + int maxBulkRequestSize, + SerializableFunction>> toDocWriteRequests, + ThrowingConsumer error, + int maxRetries, + int retryPause) { this.maxBulkRequestSize = maxBulkRequestSize; this.clientSupplier = new ClientSupplier(clusterName, servers); this.toDocWriteRequests = toDocWriteRequests; this.error = error; + this.maxRetries = maxRetries; + this.retryPause = retryPause; + } + @Setup + public void setup() throws Exception { this.backoffConfig = FluentBackoff.DEFAULT - .withMaxRetries(maxRetries) - .withInitialBackoff(Duration.standardSeconds(retryPause)); + .withMaxRetries(this.maxRetries) + .withInitialBackoff(Duration.standardSeconds(this.retryPause)); } - @SuppressWarnings("Duplicates") @ProcessElement public void processElement(ProcessContext c) throws Exception { From d8c1724761a2a63939993005424c8053bb1737f5 Mon Sep 17 00:00:00 2001 From: Josh Whelchel Date: Thu, 21 Feb 2019 16:16:28 -0500 Subject: [PATCH 7/7] Restore indentation formatting on ES Bound constructors --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 14 ++++++++------ .../sdk/io/elasticsearch/ElasticsearchIO.java | 15 ++++++++------- .../sdk/io/elasticsearch/ElasticsearchIO.java | 14 ++++++++------ 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index c86d6cde8b..08e51c0b1f 100644 --- a/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es2/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -159,12 +159,14 @@ public static class Bound extends PTransform, PDone> { private final ThrowingConsumer error; private Bound(final String clusterName, - final InetSocketAddress[] servers, - final Duration flushInterval, - final SerializableFunction>> toActionRequests, - final long numOfShard, - final int maxBulkRequestSize, - int maxRetries, int retryPause, final ThrowingConsumer error) { + final InetSocketAddress[] servers, + final Duration flushInterval, + final SerializableFunction>> toActionRequests, + final long numOfShard, + final int maxBulkRequestSize, + int maxRetries, + int retryPause, + final ThrowingConsumer error) { this.clusterName = clusterName; this.servers = servers; this.flushInterval = flushInterval; diff --git a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index c0b3a990b1..4800f82388 100644 --- a/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es5/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -164,13 +164,14 @@ public static class Bound extends PTransform, PDone> { private final ThrowingConsumer error; private Bound(final String clusterName, - final InetSocketAddress[] servers, - final Duration flushInterval, - final SerializableFunction>> toDocWriteRequests, - final long numOfShard, - final int maxBulkRequestSize, - final int maxRetries, final int retryPause, - final ThrowingConsumer error) { + final InetSocketAddress[] servers, + final Duration flushInterval, + final SerializableFunction>> toDocWriteRequests, + final long numOfShard, + final int maxBulkRequestSize, + final int maxRetries, + final int retryPause, + final ThrowingConsumer error) { this.clusterName = clusterName; this.servers = servers; this.flushInterval = flushInterval; diff --git a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 82003afc4c..ed349fbfb8 100644 --- a/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/scio-elasticsearch/es6/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -164,12 +164,14 @@ public static class Bound extends PTransform, PDone> { private final ThrowingConsumer error; private Bound(final String clusterName, - final InetSocketAddress[] servers, - final Duration flushInterval, - final SerializableFunction>> toDocWriteRequests, - final long numOfShard, - final int maxBulkRequestSize, - int maxRetries, int retryPause, final ThrowingConsumer error) { + final InetSocketAddress[] servers, + final Duration flushInterval, + final SerializableFunction>> toDocWriteRequests, + final long numOfShard, + final int maxBulkRequestSize, + int maxRetries, + int retryPause, + final ThrowingConsumer error) { this.clusterName = clusterName; this.servers = servers; this.flushInterval = flushInterval;