Skip to content

Commit

Permalink
Merge pull request #190 from sphereio/task-repeater
Browse files Browse the repository at this point in the history
Add default task repeater
  • Loading branch information
PhilippSpo committed Dec 19, 2016
2 parents 918a791 + 11304f2 commit 971e04e
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 11 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"istanbul": "0.3.x",
"jasmine-node": "1.14.5",
"sphere-coffeelint": "git://github.com/sphereio/sphere-coffeelint.git#master",
"sphere-node-utils": "^0.8.4",
"underscore": "1.8.x",
"underscore-mixins": "0.1.x",
"validate-commit-msg": "2.0.0"
Expand Down
18 changes: 13 additions & 5 deletions src/coffee/client.coffee
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
_ = require 'underscore'
Rest = require './connect/rest'
TaskQueue = require './task-queue'
_ = require 'underscore'
Rest = require './connect/rest'
TaskQueue = require './task-queue'
RepeaterTaskQueue = require '../lib/repeater-task-queue'
CartDiscountService = require './services/cart-discounts'
CartService = require './services/carts'
CategoryService = require './services/categories'
Expand Down Expand Up @@ -76,6 +77,11 @@ ALL_SERVICES = [
# Every request is internally pushed in a queue which automatically starts resolving promises (requests)
# and will process concurrently some of them based on the `maxParallel` parameter. You can set this parameter by calling {::setMaxParallel}.
#
# ### RepeaterTaskQueue
# To repeat request on case of some errors. It's possible to use custom repeater by passing it to constructor.
# Also it's possible to turn off by passing `enableRepeater = false` flag.
# There is a default repeater if constructor parameter is empty.
#
# ```coffeescript
# client = new SphereClient # a TaskQueue is internally initialized at this point with maxParallel of 20
# client.setMaxParallel 5
Expand Down Expand Up @@ -220,9 +226,11 @@ class SphereClient
#
# options - An {Object} to configure the client
constructor: (options = {}) ->

# Private: instance of a {TaskQueue}
@_task = options.task or new TaskQueue
if options.enableRepeater? and !options.enableRepeater
@_task = new TaskQueue
else
@_task = options.task or new RepeaterTaskQueue {}, {}

# Private: instance of a {Rest}
@_rest = options.rest or new Rest _.defaults options, {user_agent: 'sphere-node-sdk'}
Expand Down
2 changes: 1 addition & 1 deletion src/coffee/connect/oauth2.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class OAuth2
host: host
protocol: protocol
accessTokenUrl: opts.accessTokenUrl or '/oauth/token'
timeout: opts.timeout or 20000
timeout: opts.timeout or 60000
rejectUnauthorized: rejectUnauthorized
userAgent: userAgent

Expand Down
77 changes: 77 additions & 0 deletions src/coffee/repeater-task-queue.coffee
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{ Repeater } = require 'sphere-node-utils'
Promise = require 'bluebird'
TaskQueue = require './task-queue'
_ = require 'underscore'

# Response messages which will be handled by `RepeaterTaskQueue`
retryKeywords = [
'ETIMEDOUT'
'socket hang up'
'write EPROTO'
'Retry later'
'I am the LoadBalancer of'
'Gateway Timeout'
'Bad Gateway'
'EAI_AGAIN'
'ESOCKETTIMEDOUT'
'Oops. This shouldn\'t happen'
'InternalServerError: Undefined'
'Temporary overloading'
'read ECONNRESET'
'getaddrinfo ENOTFOUND'
'Cannot commit on stream id'
]

# Public: A `RepeaterTaskQueue` adds request repeater on particular response errors
#
# `RepeaterTaskQueue` receives two objects as parameter.
# First object overriding `maxParallel` value of `TaskQueue`
# Second object contains information about the count of attempts, timeout, timeout type and retry keywords.
# There are 2 types of `timeoutType`:
# - `c`: constant delay
# - `v`: variable delay (grows with attempts count with a random component)
# It's possible to customize default list of handled error messages. To do this just pass new array to `retryKeywords`
#
# Examples
#
# task = new RepeaterTaskQueue { maxParallel: 30 }, { attempts: 50, timeout: 200, timeoutType: 'v', retryKeywords: ['test1', 'test2'] }
class RepeaterTaskQueue extends TaskQueue


constructor: (options, repeaterOptions) ->
super options
repeaterOptions = _.defaults repeaterOptions,
attempts: 50
timeout: 200
timeoutType: 'v'
retryKeywords: retryKeywords
@repeaterOptions = repeaterOptions


_startTask: (task) =>
@_activeCount += 1
repeater = new Repeater(@repeaterOptions)
toRepeat = repeater.execute task.fn, (err) =>
if @_shouldRetry(err)
return Promise.resolve()
else
Promise.reject err
toRepeat.then (res) ->
task.resolve res
return
.catch (err) ->
task.reject err
return
.finally =>
@_activeCount -= 1
@_maybeExecute()
.done()


_shouldRetry: (error) ->
return error?.code?.toString().startsWith('5') or
error?.statusCode?.toString().startsWith('5') or
@repeaterOptions.retryKeywords.some (keyword) ->
JSON.stringify(error).toUpperCase().includes(keyword.toUpperCase())

module.exports = RepeaterTaskQueue
20 changes: 20 additions & 0 deletions src/spec/client/client.spec.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,26 @@ describe 'SphereClient', ->
expect(=> @client.setMaxParallel(0)).toThrow new Error 'MaxParallel must be a number between 1 and 100'
expect(=> @client.setMaxParallel(101)).toThrow new Error 'MaxParallel must be a number between 1 and 100'

it 'should not repeat request on error if repeater is disabled',(done) ->
client = new SphereClient {config: Config, enableRepeater: false}
callsMap = {
0: { statusCode: 500, message: 'ETIMEDOUT' }
1: { statusCode: 500, message: 'ETIMEDOUT' }
2: { statusCode: 200, message: 'success' }
}
callCount = 0
spyOn(client._rest, 'GET').andCallFake (resource, callback) ->
currentCall = callsMap[callCount]
callCount++
statusCode = currentCall.statusCode
message = currentCall.message
callback(null, { statusCode }, { message })

client.products.fetch()
.catch (err) ->
expect(client._rest.GET.calls.length).toEqual 1
expect(err.body.message).toEqual 'ETIMEDOUT'
done()

_.each [
{name: 'cartDiscounts', className: 'CartDiscountService', blacklist: []}
Expand Down
2 changes: 1 addition & 1 deletion src/spec/connect/oauth2.spec.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe 'OAuth2', ->
expect(oa._options.host).toBe 'auth.sphere.io'
expect(oa._options.protocol).toBe 'https'
expect(oa._options.accessTokenUrl).toBe '/oauth/token'
expect(oa._options.timeout).toBe 20000
expect(oa._options.timeout).toBe 60000
expect(oa._options.rejectUnauthorized).toBe true

it 'should throw error if no credentials are given', ->
Expand Down
1 change: 1 addition & 0 deletions src/spec/integration/inventories-sync.spec.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ describe 'Integration Inventories Sync', ->
expect(result.body.expectedDelivery).not.toBeDefined()
done()
.catch (error) -> done(_.prettify(error))
, 60000

describe 'custom type and field handling', (done) ->
customType = undefined
Expand Down
7 changes: 6 additions & 1 deletion src/spec/integration/orders-sync.spec.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ describe 'Integration Orders Sync', ->
expect(orderUpdated.shipmentState).toBe orderNew.shipmentState
done()
.catch (error) -> done(_.prettify(error))
, 60000


it 'should sync returnInfo', (done) ->
orderNew = _.deepClone @order
Expand All @@ -87,6 +89,7 @@ describe 'Integration Orders Sync', ->
expect(orderUpdated.returnInfo[0].id).toBe orderNew.returnInfo[0].id
done()
.catch (error) -> done(_.prettify(error))
, 60000

it 'should sync returnInfo (status)', (done) ->
orderNew = _.deepClone @order
Expand Down Expand Up @@ -125,6 +128,7 @@ describe 'Integration Orders Sync', ->
expect(orderUpdated2.returnInfo[0].items[0].paymentState).toEqual @orderNew2.returnInfo[0].items[0].paymentState
done()
.catch (error) -> done(_.prettify(error))
, 60000

it 'should sync delivery items', (done) ->

Expand All @@ -148,6 +152,7 @@ describe 'Integration Orders Sync', ->
expect(orderUpdated.shippingInfo.deliveries.length).toBe 1
done()
.catch (error) -> done(_.prettify(error))
, 60000

it 'should sync parcel items of a delivery', (done) ->
orderNew = _.deepClone @order
Expand Down Expand Up @@ -211,7 +216,7 @@ describe 'Integration Orders Sync', ->
expect(parcels.length).toBe 2
done()
.catch (error) -> done(_.prettify(error))

, 60000
###
helper methods
###
Expand Down
4 changes: 2 additions & 2 deletions src/spec/integration/products-sync.spec.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ describe 'Integration Products Sync', ->
expect(updated.variants[1].prices[0].country).toBe 'FR'
done()
.catch (error) -> done(_.prettify(error))
, 10000 # 10sec
, 30000 # 30sec

describe 'State update actions', ->

Expand Down Expand Up @@ -696,6 +696,6 @@ describe 'Integration Products Sync', ->
expect(updated.masterVariant.prices[0].custom.fields.kiloPrice.centAmount).toBe 4948
done()
.catch (error) -> done(_.prettify(error))

, 60000


2 changes: 1 addition & 1 deletion src/spec/integration/products.spec.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ describe 'Integration Products', ->
expect(result.body.productType.hasOwnProperty('obj')).toBe(true)
done()
.catch (error) -> done(_.prettify(error))
, 30000 # 30sec
, 60000 # 60sec

# it 'should search for suggestions', (done) ->
# debug 'Creating products with search keywords'
Expand Down
114 changes: 114 additions & 0 deletions src/spec/repeater-task-queue.spec.coffee
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
SphereClient = require '../lib/client'
RepeaterTaskQueue = require '../lib/repeater-task-queue'
Config = require('../config').config
util = require('util')
_ = require 'underscore'
describe 'RepeaterTaskQueue', ->

repeaterOptions = {
attempts: 5,
timeout: 200
}

beforeEach ->
task = new RepeaterTaskQueue { maxParallel: 30 }, { attempts: 5, timeout: 200, timeoutType: 'v' }
sphereConfig = { config: Config, task }
@client = new SphereClient sphereConfig


it 'should finally resolve after four tries', (done) ->

callsMap = {
0: { statusCode: 500, message: 'ETIMEDOUT' }
1: { code: 500, message: 'ETIMEDOUT' }
2: { code: 789, message: 'ETIMEDOUT' }
3: { statusCode: 200, message: 'success' }
}
callCount = 0
spyOn(@client._rest, 'GET').andCallFake (resource, callback) ->
currentCall = callsMap[callCount]
callCount++
statusCode = currentCall.statusCode
message = currentCall.message
callback(null, { statusCode }, { message })

@client.products.fetch()
.then (res) =>
expect(@client._rest.GET.calls.length).toEqual 4
expect(res.body.message).toEqual 'success'
done()
.catch (err) -> done(err)


it 'should repeat requests several times for 5xx error codes', (done) ->
spyOn(@client._rest, 'GET').andCallFake (resource, callback) ->
callback(null, { statusCode: 500 }, { code: 500 })

@client.products.fetch()
.then ->
done 'Error expected'
.catch =>
expect(@client._rest.GET.calls.length).toEqual repeaterOptions.attempts
done()


it 'should repeat requests several times for certain error messages', (done) ->
spyOn(@client._rest, 'GET').andCallFake (resource, callback) ->
callback(null, { statusCode: 500 }, { message: 'ETIMEDOUT' })

@client.products.fetch()
.then ->
done 'Error expected'
.catch =>
expect(@client._rest.GET.calls.length).toEqual repeaterOptions.attempts
done()


it 'should not repeat requests for non-5xx errors', (done) ->
spyOn(@client._rest, 'GET').andCallFake (resource, callback) ->
callback(null, { statusCode: 400 }, { message: 'something' })

@client.products.fetch()
.then ->
done 'Error expected'
.catch (err) =>
expect(err.message).toEqual 'something'
expect(@client._rest.GET.calls.length).toEqual 1
done()

it 'should set default repeater options on empty object', (done) ->
retryKeywords = [
'ETIMEDOUT'
'socket hang up'
'write EPROTO'
'Retry later'
'I am the LoadBalancer of'
'Gateway Timeout'
'Bad Gateway'
'EAI_AGAIN'
'ESOCKETTIMEDOUT'
'Oops. This shouldn\'t happen'
'InternalServerError: Undefined'
'Temporary overloading'
'read ECONNRESET'
'getaddrinfo ENOTFOUND'
'Cannot commit on stream id'
]
task = new RepeaterTaskQueue {}, {}
expect(_.isEqual(task.repeaterOptions.retryKeywords, retryKeywords)).toEqual true
expect(task.repeaterOptions.attempts).toEqual 50
expect(task.repeaterOptions.timeout).toEqual 200
expect(task.repeaterOptions.timeoutType).toEqual 'v'
done()

it 'should set custom repeater options', (done) ->
retryKeywords = [
'test1'
'test2'
]
task = new RepeaterTaskQueue {}, { attempts: 7, timeout: 211, timeoutType: 'c', retryKeywords: retryKeywords}
expect(_.isEqual(task.repeaterOptions.retryKeywords, retryKeywords)).toEqual true
expect(task.repeaterOptions.attempts).toEqual 7
expect(task.repeaterOptions.timeout).toEqual 211
expect(task.repeaterOptions.timeoutType).toEqual 'c'
done()

0 comments on commit 971e04e

Please sign in to comment.