Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
processors: webhook - add error handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
vjeantet committed Mar 18, 2018
1 parent cf6d350 commit e69ec8c
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 50 deletions.
8 changes: 4 additions & 4 deletions core/webhook/webhook.go
Expand Up @@ -54,14 +54,14 @@ func (w *webHook) buildURL(hookName string) string {
return strings.ToLower("/h/" + slug.Make(w.pipelineLabel) + "/" + slug.Make(hookName))
}

func (w *webHook) buildNamedURL(hookName string) string {
func (w *webHook) buildShortURL(hookName string) string {
return strings.ToLower("/_/" + hookName)
}

// Add a new route to a given http.HandlerFunc
func (w *webHook) AddNamed(hookName string, hf http.HandlerFunc) {
func (w *webHook) AddShort(hookName string, hf http.HandlerFunc) {

hUrl := w.buildNamedURL(hookName)
hUrl := w.buildShortURL(hookName)
w.Hooks = append(w.Hooks, hookName)

webHookMap.Store(hUrl, &Hook{
Expand Down Expand Up @@ -96,7 +96,7 @@ func (w *webHook) Delete(hookName string) {
webHookMap.Delete(hUrl)
Log.Debugf("WebHook unregisted [%s]", hUrl)

hUrl = w.buildNamedURL(hookName)
hUrl = w.buildShortURL(hookName)
webHookMap.Delete(hUrl)
Log.Debugf("WebHook unregisted [%s]", hUrl)
}
Expand Down
12 changes: 6 additions & 6 deletions docs/data/processors/webfan.json
Expand Up @@ -18,9 +18,9 @@
},
{
"Alias": "",
"DefaultValue": "\"json\"",
"Doc": "The codec used for input data. Input codecs are a convenient method for decoding\nyour data before it enters the input, without needing a separate filter in your bitfan pipeline\n\nDefault decode http request as json, response is json encoded.\nSet multiple codec with role to customize",
"ExampleLS": "",
"DefaultValue": null,
"Doc": "The codec used for posted data. Input codecs are a convenient method for decoding\nyour data before it enters the pipeline, without needing a separate filter in your bitfan pipeline\n\nDefault decode http request as plain text, response is json encoded.\nSet multiple codec with role to customize",
"ExampleLS": "codec =\u003e plain { role=\u003e\"encoder\"} codec =\u003e json { role=\u003e\"decoder\"}",
"Name": "Codec",
"PossibleValues": null,
"Required": false,
Expand All @@ -37,19 +37,19 @@
"Type": "string"
},
{
"Alias": "conf",
"Alias": "pipeline",
"DefaultValue": null,
"Doc": "Path to pipeline's configuration to execute on request\nThis configuration should contains only a filter section an a output like ```output{pass{}}```",
"ExampleLS": "",
"Name": "Conf",
"Name": "Pipeline",
"PossibleValues": null,
"Required": true,
"Type": "string"
},
{
"Alias": "headers",
"DefaultValue": null,
"Doc": "Headers to send back into each outgoing response",
"Doc": "Headers to send back into outgoing response",
"ExampleLS": "{\"X-Processor\" =\u003e \"bitfan\"}",
"Name": "Headers",
"PossibleValues": null,
Expand Down
12 changes: 6 additions & 6 deletions processors/webfan/docdoc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions processors/webfan/readme.md
Expand Up @@ -4,24 +4,24 @@
## Synopsys


| SETTING | TYPE | REQUIRED | DEFAULT VALUE |
|---------|--------|----------|---------------|
| Codec | codec | false | "json" |
| uri | string | true | "" |
| conf | string | true | "" |
| headers | hash | false | {} |
| SETTING | TYPE | REQUIRED | DEFAULT VALUE |
|----------|--------|----------|---------------|
| Codec | codec | false | ? |
| uri | string | true | "" |
| pipeline | string | true | "" |
| headers | hash | false | {} |


## Details

### Codec
* Value type is codec
* Default value is `"json"`
* Default value is `?`

The codec used for input data. Input codecs are a convenient method for decoding
your data before it enters the input, without needing a separate filter in your bitfan pipeline
The codec used for posted data. Input codecs are a convenient method for decoding
your data before it enters the pipeline, without needing a separate filter in your bitfan pipeline

Default decode http request as json, response is json encoded.
Default decode http request as plain text, response is json encoded.
Set multiple codec with role to customize

### uri
Expand All @@ -31,7 +31,7 @@ Set multiple codec with role to customize

URI path /_/path

### conf
### pipeline
* This is a required setting.
* Value type is string
* Default value is `""`
Expand All @@ -43,17 +43,17 @@ This configuration should contains only a filter section an a output like ```out
* Value type is hash
* Default value is `{}`

Headers to send back into each outgoing response
Headers to send back into outgoing response



## Configuration blueprint

```
webfan{
codec => "json"
codec => plain { role=>"encoder"} codec => json { role=>"decoder"}
uri => ""
conf => ""
pipeline => ""
{"X-Processor" => "bitfan"}
}
```
42 changes: 23 additions & 19 deletions processors/webfan/webfan.go
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"net/http"
"net/http/httputil"
"os"
"sync"

uuid "github.com/nu7hatch/gouuid"
Expand All @@ -22,12 +21,12 @@ func New() processors.Processor {
type options struct {
processors.CommonOptions `mapstructure:",squash"`

// The codec used for input data. Input codecs are a convenient method for decoding
// your data before it enters the input, without needing a separate filter in your bitfan pipeline
// The codec used for posted data. Input codecs are a convenient method for decoding
// your data before it enters the pipeline, without needing a separate filter in your bitfan pipeline
//
// Default decode http request as json, response is json encoded.
// Default decode http request as plain text, response is json encoded.
// Set multiple codec with role to customize
// @Default "json"
// @ExampleLS codec => plain { role=>"encoder"} codec => json { role=>"decoder"}
// @Type codec
Codec codecs.CodecCollection

Expand All @@ -36,9 +35,9 @@ type options struct {

// Path to pipeline's configuration to execute on request
// This configuration should contains only a filter section an a output like ```output{pass{}}```
Conf string `mapstructure:"conf" validate:"required"`
Pipeline string `mapstructure:"pipeline" validate:"required"`

// Headers to send back into each outgoing response
// Headers to send back into outgoing response
// @ExampleLS {"X-Processor" => "bitfan"}
Headers map[string]string `mapstructure:"headers"`
}
Expand All @@ -47,10 +46,9 @@ type options struct {
type processor struct {
processors.Base

opt *options
wg *sync.WaitGroup
host string
ep *entrypoint.Entrypoint
opt *options
wg *sync.WaitGroup
ep *entrypoint.Entrypoint
}

func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]interface{}) error {
Expand All @@ -61,10 +59,6 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i
return err
}

if p.host, err = os.Hostname(); err != nil {
p.Logger.Warnf("can not get hostname : %v", err)
}

if p.opt.Codec.Enc == nil {
p.opt.Codec.Enc = codecs.New("json", nil, ctx.Log(), ctx.ConfigWorkingLocation())
}
Expand All @@ -77,12 +71,12 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i
}
func (p *processor) Start(e processors.IPacket) error {
p.wg = &sync.WaitGroup{}
p.WebHook.AddNamed(p.opt.Uri, p.HttpHandler)
p.WebHook.AddShort(p.opt.Uri, p.HttpHandler)

var err error
p.ep, err = entrypoint.New(p.opt.Conf, p.ConfigWorkingLocation, entrypoint.CONTENT_REF)
p.ep, err = entrypoint.New(p.opt.Pipeline, p.ConfigWorkingLocation, entrypoint.CONTENT_REF)
if err != nil {
p.Logger.Errorf("Error with entrypoint %s", p.opt.Conf)
p.Logger.Errorf("Error with entrypoint %s", p.opt.Pipeline)
}

return err
Expand All @@ -98,13 +92,24 @@ func (p *processor) HttpHandler(w http.ResponseWriter, r *http.Request) {
ppl, err := p.ep.Pipeline()
if err != nil {
p.Logger.Errorf("%s", err)
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte(err.Error()))
return
}
// pp.Println("ppl-->", ppl)
orderedAgentConfList := core.Sort(ppl.Agents(), core.SortInputsFirst)

// Find Last Agent
lastAgent := orderedAgentConfList[len(orderedAgentConfList)-1]
if lastAgent.Label != "pass" {
p.Logger.Errorf("Add an `output{pass{}}` into pipeline configuration")
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("Add an `output{pass{}}` into pipeline configuration"))
return
}
// pp.Println("lastAgent-->", lastAgent)
// Connect its Recipient to this Receive

// Find First agent
Expand Down Expand Up @@ -159,7 +164,6 @@ func (p *processor) HttpHandler(w http.ResponseWriter, r *http.Request) {
if dec, err = p.opt.Codec.NewDecoder(r.Body); err != nil {
p.Logger.Errorln("decoder error : ", err.Error())
close(back)
close(back)
return
}
headersBytes, _ := httputil.DumpRequest(r, false)
Expand Down
2 changes: 1 addition & 1 deletion processors/webhook.go
Expand Up @@ -4,6 +4,6 @@ import "net/http"

type WebHook interface {
Add(string, http.HandlerFunc)
AddNamed(string, http.HandlerFunc)
AddShort(string, http.HandlerFunc)
Unregister()
}

0 comments on commit e69ec8c

Please sign in to comment.