Skip to content

Commit

Permalink
Adds new params maxRetries and retryPause to saveAsElasticsearch method
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Whelchel committed Jan 23, 2019
1 parent 9605842 commit 00c299f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
Expand Up @@ -52,14 +52,24 @@ package object elasticsearch {
* @param numOfShards number of parallel writes to be performed, recommended setting is the
* number of pipeline workers
* @param errorFn function to handle error when performing Elasticsearch bulk writes
* @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions
* @param retryPause Number of seconds to pause between subsequent attempts
*/
def saveAsElasticsearch(esOptions: ElasticsearchOptions,
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)(
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn,
maxRetries: Int = WriteParam.DefaultMaxRetries,
retryPause: Int = WriteParam.DefaultRetryPause)(
f: T => Iterable[ActionRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = {
val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize)
val param = WriteParam(f,
errorFn,
flushInterval,
numOfShards,
maxBulkRequestSize,
maxRetries,
retryPause)
self.write(ElasticsearchIO[T](esOptions))(param)
}
}
Expand Down
Expand Up @@ -52,14 +52,24 @@ package object elasticsearch {
* @param numOfShards number of parallel writes to be performed, recommended setting is the
* number of pipeline workers
* @param errorFn function to handle error when performing Elasticsearch bulk writes
* @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions
* @param retryPause Number of seconds to pause between subsequent attempts
*/
def saveAsElasticsearch(esOptions: ElasticsearchOptions,
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)(
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn,
maxRetries: Int = WriteParam.DefaultMaxRetries,
retryPause: Int = WriteParam.DefaultRetryPause)(
f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = {
val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize)
val param = WriteParam(f,
errorFn,
flushInterval,
numOfShards,
maxBulkRequestSize,
maxRetries,
retryPause)
self.write(ElasticsearchIO[T](esOptions))(param)
}
}
Expand Down
Expand Up @@ -52,14 +52,24 @@ package object elasticsearch {
* @param numOfShards number of parallel writes to be performed, recommended setting is the
* number of pipeline workers
* @param errorFn function to handle error when performing Elasticsearch bulk writes
* @param maxRetries Maximum number of retries for each bulk save operation upon Exceptions
* @param retryPause Number of seconds to pause between subsequent attempts
*/
def saveAsElasticsearch(esOptions: ElasticsearchOptions,
flushInterval: Duration = WriteParam.DefaultFlushInterval,
numOfShards: Long = WriteParam.DefaultNumShards,
maxBulkRequestSize: Int = WriteParam.DefaultMaxBulkRequestSize,
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn)(
errorFn: BulkExecutionException => Unit = WriteParam.DefaultErrorFn,
maxRetries: Int = WriteParam.DefaultMaxRetries,
retryPause: Int = WriteParam.DefaultRetryPause)(
f: T => Iterable[DocWriteRequest[_]])(implicit coder: Coder[T]): Future[Tap[Nothing]] = {
val param = WriteParam(f, errorFn, flushInterval, numOfShards, maxBulkRequestSize)
val param = WriteParam(f,
errorFn,
flushInterval,
numOfShards,
maxBulkRequestSize,
maxRetries,
retryPause)
self.write(ElasticsearchIO[T](esOptions))(param)
}
}
Expand Down

0 comments on commit 00c299f

Please sign in to comment.