Skip to content

Commit

Permalink
add flag for configuring number of workers (#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw committed Nov 14, 2017
1 parent 935302b commit 52b52ff
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 15 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ cache:
directories:
- vendor
install:
- go get -u github.com/hashicorp/go-plugin
- go get -u github.com/hashicorp/go-hclog
- go get -u github.com/hashicorp/{go-plugin,go-hclog}
- go get -u golang.org/x/net/{context,http2,trace}
- go get -u github.com/golang/dep/cmd/dep
- dep ensure
Expand Down
3 changes: 2 additions & 1 deletion cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func main() {
eventsPort := flag.Uint("events-port", 4000, "Port to serve events API on.")
eventsTLSCrt := flag.String("events-tls-cert", "", "Path to events API TLS certificate file.")
eventsTLSKey := flag.String("events-tls-key", "", "Path to events API TLS key file.")
workersNumber := flag.Uint("workers", 100, "Number of workers processing incoming events.")
plugins := paths{}
flag.Var(&plugins, "plugin", "Path to a plugin to load.")
flag.Parse()
Expand Down Expand Up @@ -83,7 +84,7 @@ func main() {
}

targetCache := cache.NewTarget("/serverless-event-gateway", kv, log)
router := router.New(targetCache, pluginManager, log)
router := router.New(*workersNumber, targetCache, pluginManager, log)
router.StartWorkers()

api.StartEventsAPI(httpapi.Config{
Expand Down
2 changes: 1 addition & 1 deletion router/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func get(url string) (int, http.Header, string) {
func newTestRouterServer(kvstore store.Store, log *zap.Logger) (*Router, *httptest.Server) {
targetCache := cache.NewTarget("/serverless-event-gateway", kvstore, log)

router := New(targetCache, plugin.NewManager([]string{}, log), log)
router := New(10, targetCache, plugin.NewManager([]string{}, log), log)
return router, httptest.NewServer(router)
}

Expand Down
21 changes: 11 additions & 10 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ type Router struct {
targetCache Targeter
plugins *plugin.Manager
log *zap.Logger
workerNumber uint
workersNumber uint
drain chan struct{}
drainWaitGroup sync.WaitGroup
active bool
backlog chan backlogEvent
}

// New instantiates a new Router
func New(targetCache Targeter, plugins *plugin.Manager, log *zap.Logger) *Router {
func New(workersNumber uint, targetCache Targeter, plugins *plugin.Manager, log *zap.Logger) *Router {
return &Router{
targetCache: targetCache,
plugins: plugins,
log: log,
workerNumber: 20,
drain: make(chan struct{}),
backlog: nil,
targetCache: targetCache,
plugins: plugins,
log: log,
workersNumber: workersNumber,
drain: make(chan struct{}),
backlog: nil,
}
}

Expand Down Expand Up @@ -101,10 +101,11 @@ func (router *Router) StartWorkers() {
router.active = true

if router.backlog == nil {
router.backlog = make(chan backlogEvent, router.workerNumber*2)
router.backlog = make(chan backlogEvent, router.workersNumber*2)
}

for i := 0; i < int(router.workerNumber); i++ {
router.log.Debug("Starting processing workers.", zap.Uint("workers", router.workersNumber))
for i := 0; i < int(router.workersNumber); i++ {
router.drainWaitGroup.Add(1)
go router.loop()
}
Expand Down
2 changes: 1 addition & 1 deletion router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestRouterServeHTTP_AllowCORSPreflightForCustomEvents(t *testing.T) {
func testrouter(target Targeter) *Router {
log := zap.NewNop()
plugins := plugin.NewManager([]string{}, log)
router := New(target, plugins, log)
router := New(10, target, plugins, log)
router.StartWorkers()
return router
}

0 comments on commit 52b52ff

Please sign in to comment.