diff --git a/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala index 04fc2cb2c5..9966989954 100644 --- a/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es2/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,14 +52,24 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes + * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions + * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause)( f: T => Iterable[ActionRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) + val param = WriteParam(f, + errorFn, + flushInterval, + numOfShards, + maxBulkRequestSize, + maxRetries, + retryPause) self.write(ElasticsearchIO[T](esOptions))(param) } } diff --git a/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala index 150bf28e22..4436129e8b 100644 --- a/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es5/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,14 +52,24 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes + * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions + * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause)( f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) + val param = WriteParam(f, + errorFn, + flushInterval, + numOfShards, + maxBulkRequestSize, + maxRetries, + retryPause) self.write(ElasticsearchIO[T](esOptions))(param) } } diff --git a/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala b/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala index ab082ab28e..a754884316 100644 --- a/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala +++ b/scio-elasticsearch/es6/src/main/scala/com/spotify/scio/elasticsearch/package.scala @@ -52,14 +52,24 @@ package object elasticsearch { * @param numOfShards number of parallel writes to be performed, recommended setting is the * number of pipeline workers * @param errorFn function to handle error when performing Elasticsearch bulk writes + * @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions + * @param retryPause Number of seconds to pause between subsequent attempts */ def saveAsElasticsearch(esOptions: ElasticsearchOptions, flushInterval: Duration = WriteParam.DefaultFlushInterval, numOfShards: Long = WriteParam.DefaultNumShards, maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize, - errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)( + errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn, + maxRetries: Int = WriteParam.DefaultMaxRetries, + retryPause: Int = WriteParam.DefaultRetryPause)( f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = { - val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize) + val param = WriteParam(f, + errorFn, + flushInterval, + numOfShards, + maxBulkRequestSize, + maxRetries, + retryPause) self.write(ElasticsearchIO[T](esOptions))(param) } }