Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
core : webhook - delete processor's routes on processor stop, handle …
Browse files Browse the repository at this point in the history
…panic
  • Loading branch information
vjeantet committed Jul 25, 2017
1 parent e5b3378 commit d773897
Show file tree
Hide file tree
Showing 20 changed files with 1,722 additions and 27 deletions.
6 changes: 5 additions & 1 deletion core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (a *agent) configure(conf *config.Agent) error {
ctx.dataLocation = filepath.Join(dataLocation, conf.Type)
ctx.configWorkingLocation = conf.Wd
ctx.memory = myStore.Space(conf.Type)
ctx.webHook = newWebHook(conf.Label)
ctx.webHook = newWebHook(conf.PipelineName, conf.Label)

Log().Debugf("data location : %s", ctx.dataLocation)
if _, err := os.Stat(ctx.dataLocation); os.IsNotExist(err) {
Expand Down Expand Up @@ -208,6 +208,10 @@ func (a *agent) stop() {
myScheduler.Remove(a.Label)
Log().Debugf("agent %d schedule job removed", a.ID)

// unregister processor's webhooks URLs
a.processor.B().WebHook.Unregister()
Log().Debugf("agent %d webhook routes unregistered", a.ID)

Log().Debugf("Processor '%s' stopping... - %d in pipe ", a.Label, len(a.packetChan))
close(a.packetChan)
<-a.Done
Expand Down
88 changes: 63 additions & 25 deletions core/webhook.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,89 @@
package core

import (
"fmt"
"net/http"

"golang.org/x/sync/syncmap"

"github.com/gosimple/slug"
"github.com/justinas/alice"
)

var webHookMap = map[string]*webHook{}

type webHook struct {
mux *http.ServeMux
namespace string
uri string
Hooks map[string]*hook
pipelineLabel string
namespace string
Hooks []string
}

type hook struct {
Url string
handler *func(http.ResponseWriter, *http.Request)
var webHookMap = syncmap.Map{}
var httpHookServerMux *http.ServeMux

func newWebHook(pipelineLabel, nameSpace string) *webHook {
return &webHook{pipelineLabel: pipelineLabel, namespace: nameSpace, Hooks: []string{}}
}

func newWebHook(nameSpace string) *webHook {
return &webHook{namespace: nameSpace, mux: httpHookServerMux, Hooks: map[string]*hook{}}
// Add a new route to a given http.HandlerFunc
func (w *webHook) Add(hookName string, hf http.HandlerFunc) {
hUrl := "/" + slug.Make(w.pipelineLabel) + "/" + slug.Make(w.namespace) + "/" + slug.Make(hookName)
w.Hooks = append(w.Hooks, hookName)
webHookMap.Store(hUrl, hf)
Log().Infof("Hook %s => %s", hookName, hUrl)
}

// Add register a new route matcher linked to hh
func (w *webHook) Add(hookName string, hh func(http.ResponseWriter, *http.Request)) {
h := &hook{}
h.Url = slug.Make(w.namespace) + "/" + slug.Make(hookName)
h.handler = &hh
w.Hooks[hookName] = h
w.mux.HandleFunc("/"+h.Url, *h.handler)
Log().Infof("Hook %s => %s", hookName, "/"+h.Url)
// Delete a route
func (w *webHook) Delete(hookName string) {
hUrl := "/" + slug.Make(w.pipelineLabel) + "/" + slug.Make(w.namespace) + "/" + slug.Make(hookName)
webHookMap.Delete(hUrl)
Log().Debugf("WebHook unregisted [%s]", hUrl)
}

func getAgentHooks(agentName string) (hooks map[string]*hook) {
if _, ok := webHookMap[agentName]; ok {
hooks = webHookMap[agentName].Hooks
// Delete all routes belonging to webHook
func (w *webHook) Unregister() {
for _, hookName := range w.Hooks {
w.Delete(hookName)
}
return hooks
}

var httpHookServerMux *http.ServeMux
func routerHandler(w http.ResponseWriter, r *http.Request) {
if hfi, ok := webHookMap.Load(r.URL.Path); ok {
Log().Debugf("Webhook found for %s", r.URL.Path)
hfi.(http.HandlerFunc)(w, r)
} else {
Log().Warnf("Webhook not found for %s", r.URL.Path)
w.WriteHeader(404)
fmt.Fprint(w, "Not Found !")
}
}

func listenAndServeWebHook(addr string) {
httpHookServerMux = http.NewServeMux()
commonHandlers := alice.New(loggingHandler, recoverHandler)
httpHookServerMux.Handle("/", commonHandlers.ThenFunc(routerHandler))
go http.ListenAndServe(addr, httpHookServerMux)

Log().Infof("Agents webHook listening on %s", addr)
go http.ListenAndServe(addr, httpHookServerMux)
}

func loggingHandler(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
Log().Debugf("Webhook [%s] %s", r.Method, r.URL.Path)
}
return http.HandlerFunc(fn)
}

func recoverHandler(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
Log().Errorf("Webhook panic [%s] %s : %+v", r.Method, r.URL.Path, err)
http.Error(w, http.StatusText(500), 500)
}
}()

next.ServeHTTP(w, r)
}

return http.HandlerFunc(fn)
}
5 changes: 5 additions & 0 deletions processors/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type Base struct {
PipelineID int
}

// B returns the Base Processor
func (b *Base) B() *Base {
return b
}

func (b *Base) Doc() *doc.Processor {
return &doc.Processor{}
}
Expand Down
1 change: 1 addition & 0 deletions processors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package processors
import "github.com/vjeantet/bitfan/processors/doc"

type Processor interface {
B() *Base
Configure(ProcessorContext, map[string]interface{}) error
Start(IPacket) error
Tick(IPacket) error
Expand Down
3 changes: 2 additions & 1 deletion processors/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ package processors
import "net/http"

type WebHook interface {
Add(string, func(http.ResponseWriter, *http.Request))
Add(string, http.HandlerFunc)
Unregister()
}
20 changes: 20 additions & 0 deletions vendor/github.com/justinas/alice/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 94 additions & 0 deletions vendor/github.com/justinas/alice/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

112 changes: 112 additions & 0 deletions vendor/github.com/justinas/alice/chain.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d773897

Please sign in to comment.