diff --git a/admin/api/plugins.go b/admin/api/plugins.go index 5114ff76..713963e5 100644 --- a/admin/api/plugins.go +++ b/admin/api/plugins.go @@ -44,10 +44,7 @@ func (api *API) CreatePlugin(w http.ResponseWriter, r *http.Request) { return } - p, err := model.Plugin() - api.assert(err) - model.Config = utils.Must(p.MarshalConfig()) - err = api.db.PluginsWS.Insert(r.Context(), &model) + err := api.db.PluginsWS.Insert(r.Context(), &model) api.assert(err) api.json(201, w, model) @@ -73,11 +70,6 @@ func (api *API) UpdatePlugin(w http.ResponseWriter, r *http.Request) { return } - p, err := model.Plugin() - api.assert(err) - - model.Config = utils.Must(p.MarshalConfig()) - model.ID = id err = api.db.PluginsWS.Update(r.Context(), model) api.assert(err) diff --git a/db/entities/plugin.go b/db/entities/plugin.go index f43ceb41..c9d7d4be 100644 --- a/db/entities/plugin.go +++ b/db/entities/plugin.go @@ -1,6 +1,7 @@ package entities import ( + "database/sql/driver" "encoding/json" "errors" "fmt" @@ -44,11 +45,11 @@ func (m *Plugin) Validate() error { } // validate plugin configuration - p, err := m.Plugin() + p, err := m.ToPlugin() if err != nil { return err } - if err = p.ValidateConfig(); err != nil { + if err = p.ValidateConfig(m.Config); err != nil { if e, ok := err.(*errs.ValidateError); ok { e.Fields = map[string]interface{}{ "config": e.Fields, @@ -57,6 +58,13 @@ func (m *Plugin) Validate() error { } return err } + + err = p.Init(m.Config) + if err != nil { + return err + } + m.Config = p.GetConfig() + return nil } @@ -69,44 +77,32 @@ func (m *Plugin) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, (*alias)(m)) } -func (m *Plugin) Plugin() (plugin.Plugin, error) { - r := plugin.GetRegistration(m.Name) - if r == nil { +func (m *Plugin) ToPlugin() (plugin.Plugin, error) { + executor, ok := plugin.New(m.Name) + if !ok { return nil, fmt.Errorf("unknown plugin name: '%s'", m.Name) } - - executor, err := r.New(m.Config) - if err != nil { - return nil, err - } return executor, nil } -type PluginConfiguration json.RawMessage +type PluginConfiguration map[string]interface{} -func (m PluginConfiguration) MarshalYAML() (interface{}, error) { - if len(m) == 0 { - return nil, nil - } - data := make(map[string]interface{}) - err := json.Unmarshal(m, &data) - if err != nil { - return nil, err - } - return data, nil +func (m *PluginConfiguration) Scan(src interface{}) error { + return json.Unmarshal(src.([]byte), m) } -func (m PluginConfiguration) MarshalJSON() ([]byte, error) { +func (m PluginConfiguration) Value() (driver.Value, error) { if m == nil { - return []byte("null"), nil + return []byte(`{}`), nil } - return m, nil + return json.Marshal(m) } func (m *PluginConfiguration) UnmarshalJSON(data []byte) error { - if m == nil { - return errors.New("json.RawMessage: UnmarshalJSON on nil pointer") + v := make(map[string]interface{}) + if err := json.Unmarshal(data, &v); err != nil { + return err } - *m = append((*m)[0:0], data...) + *m = v return nil } diff --git a/go.mod b/go.mod index 80eace55..db616eab 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect diff --git a/go.sum b/go.sum index 3ada4b99..a26907f3 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,8 @@ github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mfridman/tparse v0.18.0 h1:wh6dzOKaIwkUGyKgOntDW4liXSo37qg5AXbIhkMV3vE= github.com/mfridman/tparse v0.18.0/go.mod h1:gEvqZTuCgEhPbYk/2lS3Kcxg1GmTxxU7kTC8DvP0i/A= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= diff --git a/openapi.yml b/openapi.yml index 42693ad2..6a2963d4 100644 --- a/openapi.yml +++ b/openapi.yml @@ -936,7 +936,7 @@ components: nullable: true config: type: object - nullable: true + default: { } metadata: $ref: "#/components/schemas/Metadata" created_at: @@ -988,3 +988,64 @@ components: required: - quota - period + + + WasmPluginConfiguration: + description: "The wasm plugin configuration" + type: object + properties: + file: + description: "The file path to the wasm file." + type: string + envs: + description: "Environments that can be used in wasm." + type: object + additionalProperties: + type: string + default: { } + required: + - file + + FunctionPluginConfiguration: + description: "The function plugin configuration" + type: object + properties: + function: + description: "The function content." + type: string + maxLength: 1048576 + required: + - function + + WebhookxSignaturePluginConfiguration: + description: "The webhookx-signature plugin configuration" + type: object + properties: + signing_secret: + description: "The signature secret." + type: string + minLength: 1 + + JsonschemaValidatorPluginConfiguration: + description: "The jsonschema-validator plugin configuration" + type: object + properties: + draft: + type: string + enum: ["6"] + default: "6" + default_schema: + description: "The default schema" + type: string + format: jsonschema + maxLength: 1048576 + schemas: + description: "Defines event's schema." + type: object + additionalProperties: + type: object + properties: + schema: + type: string + format: jsonschema + maxLength: 1048576 diff --git a/pkg/declarative/types.go b/pkg/declarative/types.go index bdaac09b..f71ef611 100644 --- a/pkg/declarative/types.go +++ b/pkg/declarative/types.go @@ -54,11 +54,6 @@ func (cfg *Configuration) Validate() error { if err := model.Validate(); err != nil { return err } - p, err := model.Plugin() - if err != nil { - return err - } - model.Config = utils.Must(p.MarshalConfig()) } } @@ -67,11 +62,6 @@ func (cfg *Configuration) Validate() error { if err := model.Validate(); err != nil { return err } - p, err := model.Plugin() - if err != nil { - return err - } - model.Config = utils.Must(p.MarshalConfig()) } } diff --git a/pkg/openapi/openapi.go b/pkg/openapi/openapi.go index 1a1cb292..b623e475 100644 --- a/pkg/openapi/openapi.go +++ b/pkg/openapi/openapi.go @@ -1,6 +1,7 @@ package openapi import ( + "context" "encoding/json" "errors" "github.com/getkin/kin-openapi/openapi3" @@ -8,6 +9,23 @@ import ( "strconv" ) +type FormatValidatorFunc[T any] func(T) error + +func (fn FormatValidatorFunc[T]) Validate(value T) error { return fn(value) } + +func init() { + openapi3.DefineStringFormatValidator("jsonschema", FormatValidatorFunc[string](func(s string) error { + schema := &openapi3.Schema{} + if err := schema.UnmarshalJSON([]byte(s)); err != nil { + return err + } + if err := schema.Validate(context.TODO(), openapi3.EnableSchemaFormatValidation()); err != nil { + return err + } + return nil + })) +} + func SetDefaults(schema *openapi3.Schema, defaults map[string]interface{}) error { data := make(map[string]interface{}) _ = schema.VisitJSON(data, @@ -122,13 +140,9 @@ func insertError(current map[string]interface{}, i int, paths []string, err *ope if isIndex { ensureArray(current, "", index) arr := current[""].([]interface{}) - if err.Origin == nil { - arr[index] = formatError(err) - } + arr[index] = formatError(err) } else { - if err.Origin == nil { - current[key] = formatError(err) - } + current[key] = formatError(err) } return } diff --git a/pkg/plugin/base.go b/pkg/plugin/base.go new file mode 100644 index 00000000..2433aa3d --- /dev/null +++ b/pkg/plugin/base.go @@ -0,0 +1,53 @@ +package plugin + +import ( + "context" + "github.com/getkin/kin-openapi/openapi3" + "github.com/mitchellh/mapstructure" + "github.com/webhookx-io/webhookx/pkg/openapi" + "github.com/webhookx-io/webhookx/utils" +) + +// Configuration plugin configuration +type Configuration interface { + Schema() *openapi3.Schema +} + +type BasePlugin[T Configuration] struct { + Config T +} + +func (p *BasePlugin[T]) Init(config map[string]interface{}) error { + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + TagName: "json", + Result: &p.Config, + }) + if err != nil { + return err + } + return decoder.Decode(config) +} + +func (p *BasePlugin[T]) GetConfig() map[string]interface{} { + m, err := utils.StructToMap(p.Config) + if err != nil { + panic(err) + } + return m +} + +func (p *BasePlugin[T]) ValidateConfig(config map[string]interface{}) error { + err := openapi.Validate(p.Config.Schema(), config) + if err != nil { + return err + } + return nil +} + +func (p *BasePlugin[T]) ExecuteOutbound(ctx context.Context, outbound *Outbound) error { + panic("not implemented") +} + +func (p *BasePlugin[T]) ExecuteInbound(ctx context.Context, inbound *Inbound) (InboundResult, error) { + panic("not implemented") +} diff --git a/pkg/plugin/base_test.go b/pkg/plugin/base_test.go new file mode 100644 index 00000000..210fe93e --- /dev/null +++ b/pkg/plugin/base_test.go @@ -0,0 +1,29 @@ +package plugin + +import ( + "context" + "github.com/getkin/kin-openapi/openapi3" + "github.com/stretchr/testify/assert" + "testing" +) + +type config struct { +} + +func (c config) Schema() *openapi3.Schema { + return &openapi3.Schema{} +} + +type MyPlugin struct { + BasePlugin[config] +} + +func (m MyPlugin) Name() string { + panic("my-plugin") +} + +func Test(t *testing.T) { + myPlugin := &MyPlugin{} + assert.PanicsWithValue(t, "not implemented", func() { myPlugin.ExecuteInbound(context.TODO(), nil) }) + assert.PanicsWithValue(t, "not implemented", func() { myPlugin.ExecuteOutbound(context.TODO(), nil) }) +} diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 18bac841..acd0c252 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -1,36 +1,36 @@ package plugin import ( - "encoding/json" + "context" "net/http" ) type Plugin interface { - ExecuteOutbound(outbound *Outbound, context *Context) error - ExecuteInbound(inbound *Inbound) (InboundResult, error) - ValidateConfig() error - MarshalConfig() ([]byte, error) -} + // Name returns plugin's name + Name() string -type BasePlugin[T any] struct { - Name string - Config T -} + // Init inits plugin with configuration + Init(config map[string]interface{}) error -func (p *BasePlugin[T]) UnmarshalConfig(data []byte) error { - return json.Unmarshal(data, &p.Config) -} + // GetConfig returns plugin's configuration + GetConfig() map[string]interface{} -func (p *BasePlugin[T]) MarshalConfig() ([]byte, error) { - return json.Marshal(p.Config) -} + // ValidateConfig validates plugin's configuration + ValidateConfig(config map[string]interface{}) error -func (p *BasePlugin[T]) ExecuteOutbound(outbound *Outbound, context *Context) error { - panic("not implemented") + // ExecuteInbound executes inbound + ExecuteInbound(ctx context.Context, inbound *Inbound) (InboundResult, error) + + // ExecuteOutbound executes outbound + ExecuteOutbound(ctx context.Context, outbound *Outbound) error } -func (p *BasePlugin[T]) ExecuteInbound(inbound *Inbound) (InboundResult, error) { - panic("not implemented") +func New(name string) (Plugin, bool) { + r := GetRegistration(name) + if r == nil { + return nil, false + } + return r.Factory(), true } type Outbound struct { @@ -46,10 +46,6 @@ type Inbound struct { RawBody []byte } -type Context struct { - //Workspace *entities.Workspace -} - type InboundResult struct { Terminated bool Payload []byte diff --git a/pkg/plugin/registry.go b/pkg/plugin/registry.go index ec746b7a..4001dac1 100644 --- a/pkg/plugin/registry.go +++ b/pkg/plugin/registry.go @@ -12,17 +12,15 @@ const ( TypeOutbound Type = "outbound" ) -type NewPluginFunc func(config []byte) (Plugin, error) - type Registration struct { - Type Type - New NewPluginFunc + Type Type + Factory func() Plugin } var mux sync.RWMutex -var registry = make(map[string]*Registration) +var registry = map[string]*Registration{} -func RegisterPlugin(typ Type, name string, fn NewPluginFunc) { +func RegisterPlugin(typ Type, name string, fn func() Plugin) { mux.Lock() defer mux.Unlock() if _, ok := registry[name]; ok { @@ -30,8 +28,8 @@ func RegisterPlugin(typ Type, name string, fn NewPluginFunc) { } registry[name] = &Registration{ - Type: typ, - New: fn, + Type: typ, + Factory: fn, } } diff --git a/plugins/function/plugin.go b/plugins/function/plugin.go index cee28b5e..879b6ad9 100644 --- a/plugins/function/plugin.go +++ b/plugins/function/plugin.go @@ -1,38 +1,31 @@ package function import ( + "context" + "github.com/getkin/kin-openapi/openapi3" + "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/pkg/plugin" "github.com/webhookx-io/webhookx/plugins/function/function" "github.com/webhookx-io/webhookx/plugins/function/sdk" - "github.com/webhookx-io/webhookx/utils" ) type Config struct { - Function string `json:"function" validate:"required,max=1048576"` + Function string `json:"function"` } -type FunctionPlugin struct { - plugin.BasePlugin[Config] +func (c Config) Schema() *openapi3.Schema { + return entities.LookupSchema("FunctionPluginConfiguration") } -func New(config []byte) (plugin.Plugin, error) { - p := &FunctionPlugin{} - p.Name = "function" - - if config != nil { - if err := p.UnmarshalConfig(config); err != nil { - return nil, err - } - } - - return p, nil +type FunctionPlugin struct { + plugin.BasePlugin[Config] } -func (p *FunctionPlugin) ValidateConfig() error { - return utils.Validate(p.Config) +func (p *FunctionPlugin) Name() string { + return "function" } -func (p *FunctionPlugin) ExecuteInbound(inbound *plugin.Inbound) (result plugin.InboundResult, err error) { +func (p *FunctionPlugin) ExecuteInbound(ctx context.Context, inbound *plugin.Inbound) (result plugin.InboundResult, err error) { fn := function.New("javascript", p.Config.Function) req := sdk.HTTPRequest{ diff --git a/plugins/jsonschema_validator/plugin.go b/plugins/jsonschema_validator/plugin.go index dbb20d95..873519d1 100644 --- a/plugins/jsonschema_validator/plugin.go +++ b/plugins/jsonschema_validator/plugin.go @@ -3,99 +3,37 @@ package jsonschema_validator import ( "context" "encoding/json" - "errors" - "fmt" "github.com/getkin/kin-openapi/openapi3" - "github.com/webhookx-io/webhookx/pkg/errs" + "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/pkg/http/response" "github.com/webhookx-io/webhookx/pkg/plugin" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/plugins/jsonschema_validator/jsonschema" - "github.com/webhookx-io/webhookx/utils" ) type Config struct { - Draft string `json:"draft" validate:"required,oneof=6 default:6"` - DefaultSchema string `json:"default_schema" validate:"omitempty,json,max=1048576"` - Schemas map[string]*Schema `json:"schemas" validate:"dive"` + Draft string `json:"draft"` + DefaultSchema string `json:"default_schema"` + Schemas map[string]*Schema `json:"schemas"` +} + +func (c Config) Schema() *openapi3.Schema { + return entities.LookupSchema("JsonschemaValidatorPluginConfiguration") } type Schema struct { - Schema string `json:"schema" validate:"omitempty,json,max=1048576"` + Schema string `json:"schema"` } type SchemaValidatorPlugin struct { plugin.BasePlugin[Config] } -func New(config []byte) (plugin.Plugin, error) { - p := &SchemaValidatorPlugin{} - p.Name = "jsonschema-validator" - - if config != nil { - if err := p.UnmarshalConfig(config); err != nil { - return nil, err - } - } - return p, nil -} - -func unmarshalAndValidateSchema(schema string) (*openapi3.Schema, error) { - openapiSchema := &openapi3.Schema{} - err := openapiSchema.UnmarshalJSON([]byte(schema)) - if err != nil { - return nil, fmt.Errorf("value must be a valid jsonschema") - } - err = openapiSchema.Validate(context.Background(), openapi3.EnableSchemaFormatValidation()) - if err != nil { - return openapiSchema, err - } - return openapiSchema, nil -} - -func (p *SchemaValidatorPlugin) ValidateConfig() error { - err := utils.Validate(p.Config) - if err != nil { - return err - } - - e := errs.NewValidateError(errors.New("request validation")) - - var defaultErr error - if p.Config.DefaultSchema != "" { - _, err := unmarshalAndValidateSchema(p.Config.DefaultSchema) - if err != nil { - defaultErr = err - e.Fields = map[string]interface{}{ - "default_schema": err.Error(), - } - } - } - - for event, schema := range p.Config.Schemas { - field := fmt.Sprintf("schemas[%s]", event) - if schema == nil || schema.Schema == "" { - if defaultErr != nil { - e.Fields[field] = map[string]string{ - "schema": "invalid due to reusing the default_schema definition", - } - } - } else { - _, err = unmarshalAndValidateSchema(schema.Schema) - if err != nil { - e.Fields[field] = map[string]string{ - "schema": err.Error(), - } - } - } - } - if len(e.Fields) > 0 { - return e - } - return nil +func (p *SchemaValidatorPlugin) Name() string { + return "jsonschema-validator" } -func (p *SchemaValidatorPlugin) ExecuteInbound(inbound *plugin.Inbound) (res plugin.InboundResult, err error) { +func (p *SchemaValidatorPlugin) ExecuteInbound(ctx context.Context, inbound *plugin.Inbound) (res plugin.InboundResult, err error) { var event map[string]any body := inbound.RawBody if err = json.Unmarshal(body, &event); err != nil { diff --git a/plugins/plugins.go b/plugins/plugins.go index 7bddbab2..38d5ea2a 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -9,8 +9,16 @@ import ( ) func LoadPlugins() { - plugin.RegisterPlugin(plugin.TypeInbound, "function", function.New) - plugin.RegisterPlugin(plugin.TypeInbound, "jsonschema-validator", jsonschema_validator.New) - plugin.RegisterPlugin(plugin.TypeOutbound, "wasm", wasm.New) - plugin.RegisterPlugin(plugin.TypeOutbound, "webhookx-signature", webhookx_signature.New) + plugin.RegisterPlugin(plugin.TypeInbound, "function", func() plugin.Plugin { + return &function.FunctionPlugin{} + }) + plugin.RegisterPlugin(plugin.TypeOutbound, "wasm", func() plugin.Plugin { + return &wasm.WasmPlugin{} + }) + plugin.RegisterPlugin(plugin.TypeOutbound, "webhookx-signature", func() plugin.Plugin { + return &webhookx_signature.SignaturePlugin{} + }) + plugin.RegisterPlugin(plugin.TypeInbound, "jsonschema-validator", func() plugin.Plugin { + return &jsonschema_validator.SchemaValidatorPlugin{} + }) } diff --git a/plugins/wasm/plugin.go b/plugins/wasm/plugin.go index d94e9a9e..70150824 100644 --- a/plugins/wasm/plugin.go +++ b/plugins/wasm/plugin.go @@ -3,45 +3,37 @@ package wasm import ( "context" "fmt" + "github.com/getkin/kin-openapi/openapi3" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/pkg/plugin" - "github.com/webhookx-io/webhookx/utils" "os" ) type Config struct { - File string `json:"file" validate:"required"` + File string `json:"file"` Envs map[string]string `json:"envs"` } +func (c Config) Schema() *openapi3.Schema { + return entities.LookupSchema("WasmPluginConfiguration") +} + type WasmPlugin struct { plugin.BasePlugin[Config] } -func New(config []byte) (plugin.Plugin, error) { - p := &WasmPlugin{} - p.Name = "wasm" - - if config != nil { - if err := p.UnmarshalConfig(config); err != nil { - return nil, err - } - } - - return p, nil -} -func (p *WasmPlugin) ValidateConfig() error { - return utils.Validate(p.Config) +func (p *WasmPlugin) Name() string { + return "wasm" } -func (p *WasmPlugin) ExecuteOutbound(outbound *plugin.Outbound, _ *plugin.Context) error { +func (p *WasmPlugin) ExecuteOutbound(ctx context.Context, outbound *plugin.Outbound) error { source, err := os.ReadFile(p.Config.File) if err != nil { return err } - ctx := context.Background() runtime := wazero.NewRuntime(ctx) defer func() { _ = runtime.Close(ctx) }() diff --git a/plugins/wasm/plugin_test.go b/plugins/wasm/plugin_test.go index 5987aa10..21656583 100644 --- a/plugins/wasm/plugin_test.go +++ b/plugins/wasm/plugin_test.go @@ -2,6 +2,7 @@ package wasm import ( "bytes" + "context" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" @@ -39,9 +40,8 @@ var _ = Describe("wasm", Ordered, func() { for language, filename := range languages { It(language, func() { buf := initLogger() - p, err := New(nil) - assert.Nil(GinkgoT(), err) - p.(*WasmPlugin).Config.File = filename + p := new(WasmPlugin) + p.Config.File = filename pluginReq := &plugin.Outbound{ URL: "https://example.com", @@ -49,7 +49,7 @@ var _ = Describe("wasm", Ordered, func() { Headers: make(map[string]string), Payload: "", } - err = p.ExecuteOutbound(pluginReq, nil) + err := p.ExecuteOutbound(context.TODO(), pluginReq) assert.NoError(GinkgoT(), err) assert.Equal(GinkgoT(), "https://httpbin.org/anything", pluginReq.URL) assert.Equal(GinkgoT(), "POST", pluginReq.Method) @@ -65,28 +65,25 @@ var _ = Describe("wasm", Ordered, func() { Context("errors", func() { It("file not found", func() { - p, err := New(nil) - assert.Nil(GinkgoT(), err) - p.(*WasmPlugin).Config.File = "notfound.wasm" - err = p.ExecuteOutbound(nil, nil) + p := new(WasmPlugin) + p.Config.File = "notfound.wasm" + err := p.ExecuteOutbound(context.TODO(), nil) assert.Error(GinkgoT(), err) assert.Equal(GinkgoT(), "open notfound.wasm: no such file or directory", err.Error()) }) It("transform not defined", func() { - p, err := New(nil) - assert.Nil(GinkgoT(), err) - p.(*WasmPlugin).Config.File = "./testdata/no_transform.wasm" - err = p.ExecuteOutbound(nil, nil) + p := new(WasmPlugin) + p.Config.File = "./testdata/no_transform.wasm" + err := p.ExecuteOutbound(context.TODO(), nil) assert.Error(GinkgoT(), err) assert.Equal(GinkgoT(), "exported function 'transform' is not defined in module", err.Error()) }) It("transform return does not return 0", func() { - p, err := New(nil) - assert.Nil(GinkgoT(), err) - p.(*WasmPlugin).Config.File = "./testdata/transform_return_1.wasm" - err = p.ExecuteOutbound(nil, nil) + p := new(WasmPlugin) + p.Config.File = "./testdata/transform_return_1.wasm" + err := p.ExecuteOutbound(context.TODO(), nil) assert.Error(GinkgoT(), err) assert.Equal(GinkgoT(), "transform failed with value 0", err.Error()) }) diff --git a/plugins/webhookx_signature/plugin.go b/plugins/webhookx_signature/plugin.go index adcf6535..e765c832 100644 --- a/plugins/webhookx_signature/plugin.go +++ b/plugins/webhookx_signature/plugin.go @@ -1,9 +1,12 @@ package webhookx_signature import ( + "context" "crypto/hmac" "crypto/sha256" "encoding/hex" + "github.com/getkin/kin-openapi/openapi3" + "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/pkg/plugin" "github.com/webhookx-io/webhookx/utils" "strconv" @@ -11,7 +14,11 @@ import ( ) type Config struct { - SigningSecret string `json:"signing_secret" validate:"required"` + SigningSecret string `json:"signing_secret"` +} + +func (c Config) Schema() *openapi3.Schema { + return entities.LookupSchema("WebhookxSignaturePluginConfiguration") } type SignaturePlugin struct { @@ -20,23 +27,17 @@ type SignaturePlugin struct { ts time.Time // used in testing } -func New(config []byte) (plugin.Plugin, error) { - p := &SignaturePlugin{} - p.Name = "webhookx-signature" - - p.Config.SigningSecret = utils.RandomString(32) +func (p *SignaturePlugin) Name() string { + return "webhookx-signature" +} - if config != nil { - if err := p.UnmarshalConfig(config); err != nil { - return nil, err - } +// TODO +func (p *SignaturePlugin) ValidateConfig(config map[string]interface{}) error { + if _, ok := config["signing_secret"]; !ok { + config["signing_secret"] = utils.RandomString(32) } - return p, nil -} - -func (p *SignaturePlugin) ValidateConfig() error { - return utils.Validate(p.Config) + return p.BasePlugin.ValidateConfig(config) } func computeSignature(ts time.Time, payload []byte, secret string) []byte { @@ -47,7 +48,7 @@ func computeSignature(ts time.Time, payload []byte, secret string) []byte { return mac.Sum(nil) } -func (p *SignaturePlugin) ExecuteOutbound(outbound *plugin.Outbound, _ *plugin.Context) error { +func (p *SignaturePlugin) ExecuteOutbound(ctx context.Context, outbound *plugin.Outbound) error { ts := p.ts if ts.IsZero() { ts = time.Now() diff --git a/plugins/webhookx_signature/plugin_test.go b/plugins/webhookx_signature/plugin_test.go index f7513564..6b9446bd 100644 --- a/plugins/webhookx_signature/plugin_test.go +++ b/plugins/webhookx_signature/plugin_test.go @@ -1,6 +1,7 @@ package webhookx_signature import ( + "context" "github.com/stretchr/testify/assert" "github.com/webhookx-io/webhookx/pkg/plugin" "testing" @@ -8,10 +9,9 @@ import ( ) func TestExecute(t *testing.T) { - p, err := New(nil) - assert.Nil(t, err) - p.(*SignaturePlugin).ts = time.Unix(1726285679, 0) - p.(*SignaturePlugin).Config.SigningSecret = "QGvaZ0uPwA9nYi7jr31JtZn1EKK4pJpK" + p := new(SignaturePlugin) + p.ts = time.Unix(1726285679, 0) + p.Config.SigningSecret = "QGvaZ0uPwA9nYi7jr31JtZn1EKK4pJpK" pluginReq := &plugin.Outbound{ URL: "https://example.com", @@ -19,7 +19,7 @@ func TestExecute(t *testing.T) { Headers: make(map[string]string), Payload: "foo", } - p.ExecuteOutbound(pluginReq, nil) + p.ExecuteOutbound(context.TODO(), pluginReq) assert.Equal(t, "https://example.com", pluginReq.URL) assert.Equal(t, "POST", pluginReq.Method) diff --git a/proxy/gateway.go b/proxy/gateway.go index 691b06c1..8f5d8cb7 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -245,18 +245,22 @@ func (gw *Gateway) handle(w http.ResponseWriter, r *http.Request) bool { } for _, p := range plugins { - executor, err := p.Plugin() + executor, err := p.ToPlugin() if err != nil { response.JSON(w, 500, types.ErrorResponse{Message: "internal error"}) return false } - result, err := executor.ExecuteInbound(&plugin.Inbound{ + err = executor.Init(p.Config) + if err != nil { + response.JSON(w, 500, types.ErrorResponse{Message: "internal error"}) + } + result, err := executor.ExecuteInbound(context.TODO(), &plugin.Inbound{ Request: r, Response: w, RawBody: body, }) if err != nil { - gw.log.Errorf("failed to execute plugin: %v", err) + gw.log.Errorf("failed to execute %s plugin: %v", executor.Name(), err) response.JSON(w, 500, types.ErrorResponse{Message: "internal error"}) return false } diff --git a/test/admin/plugins_test.go b/test/admin/plugins_test.go index 9426e02c..3625eed6 100644 --- a/test/admin/plugins_test.go +++ b/test/admin/plugins_test.go @@ -2,7 +2,6 @@ package admin import ( "context" - "encoding/json" "github.com/go-resty/resty/v2" . "github.com/onsi/ginkgo/v2" "github.com/stretchr/testify/assert" @@ -22,9 +21,15 @@ import ( var _ = Describe("/plugins", Ordered, func() { - plugin.RegisterPlugin(plugin.TypeInbound, "inbound", inbound.New) - plugin.RegisterPlugin(plugin.TypeOutbound, "outbound", outbound.New) - plugin.RegisterPlugin(plugin.TypeOutbound, "hello", hello.New) + plugin.RegisterPlugin(plugin.TypeInbound, "inbound", func() plugin.Plugin { + return &inbound.InboundPlugin{} + }) + plugin.RegisterPlugin(plugin.TypeOutbound, "outbound", func() plugin.Plugin { + return &outbound.OutboundPlugin{} + }) + plugin.RegisterPlugin(plugin.TypeOutbound, "hello", func() plugin.Plugin { + return &hello.HelloPlugin{} + }) var adminClient *resty.Client var app *app.Application @@ -55,7 +60,6 @@ var _ = Describe("/plugins", Ordered, func() { EndpointId: utils.Pointer(endpoint.ID), Name: "webhookx-signature", Enabled: true, - Config: entities.PluginConfiguration("{}"), } plugin.WorkspaceId = ws.ID assert.NoError(GinkgoT(), db.Plugins.Insert(context.TODO(), &plugin)) @@ -99,7 +103,7 @@ var _ = Describe("/plugins", Ordered, func() { Context("POST", func() { Context("webhookx-signature plugin", func() { - It("creates a plugin with missing config", func() { + It("creates plugin with missing config", func() { endpoint := factory.EndpointP() assert.Nil(GinkgoT(), db.Endpoints.Insert(context.TODO(), endpoint)) resp, err := adminClient.R(). @@ -118,16 +122,14 @@ var _ = Describe("/plugins", Ordered, func() { assert.Equal(GinkgoT(), endpoint.ID, *result.EndpointId) assert.Equal(GinkgoT(), "webhookx-signature", result.Name) assert.Equal(GinkgoT(), true, result.Enabled) - data := make(map[string]string) - json.Unmarshal(result.Config, &data) - assert.Equal(GinkgoT(), 32, len(data["signing_secret"])) + assert.Equal(GinkgoT(), 32, len(result.Config["signing_secret"].(string))) e, err := db.Plugins.Get(context.TODO(), result.ID) assert.Nil(GinkgoT(), err) assert.NotNil(GinkgoT(), e) }) - It("creates a plugin with plugin config", func() { + It("creates plugin with plugin config", func() { endpoint := factory.EndpointP() assert.Nil(GinkgoT(), db.Endpoints.Insert(context.TODO(), endpoint)) resp, err := adminClient.R(). @@ -149,15 +151,43 @@ var _ = Describe("/plugins", Ordered, func() { assert.Equal(GinkgoT(), endpoint.ID, *result.EndpointId) assert.Equal(GinkgoT(), "webhookx-signature", result.Name) assert.Equal(GinkgoT(), true, result.Enabled) - data := make(map[string]string) - json.Unmarshal(result.Config, &data) - assert.Equal(GinkgoT(), "abcde", data["signing_secret"]) + assert.Equal(GinkgoT(), "abcde", result.Config["signing_secret"].(string)) e, err := db.Plugins.Get(context.TODO(), result.ID) assert.Nil(GinkgoT(), err) assert.NotNil(GinkgoT(), e) }) + It("creates plugin with unknown properties", func() { + endpoint := factory.EndpointP() + assert.Nil(GinkgoT(), db.Endpoints.Insert(context.TODO(), endpoint)) + resp, err := adminClient.R(). + SetBody(map[string]interface{}{ + "name": "webhookx-signature", + "endpoint_id": endpoint.ID, + "config": map[string]string{ + "signing_secret": "abcde", + "unknown": "unknown", + }, + }). + SetResult(entities.Plugin{}). + Post("/workspaces/default/plugins") + assert.Nil(GinkgoT(), err) + + assert.Equal(GinkgoT(), 201, resp.StatusCode()) + + result := resp.Result().(*entities.Plugin) + assert.NotNil(GinkgoT(), result.ID) + assert.Equal(GinkgoT(), endpoint.ID, *result.EndpointId) + assert.Equal(GinkgoT(), "webhookx-signature", result.Name) + assert.Equal(GinkgoT(), true, result.Enabled) + // the unknown property should be removed + assert.EqualValues(GinkgoT(), map[string]interface{}{"signing_secret": "abcde"}, result.Config) + + e, err := db.Plugins.Get(context.TODO(), result.ID) + assert.Nil(GinkgoT(), err) + assert.NotNil(GinkgoT(), e) + }) }) Context("function plugin", func() { @@ -178,7 +208,7 @@ var _ = Describe("/plugins", Ordered, func() { assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"function":"length must be at most 1048576"}}}}`, + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"function":"maximum string length is 1048576"}}}}`, string(resp.Body())) }) }) @@ -260,7 +290,7 @@ var _ = Describe("/plugins", Ordered, func() { assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), - `{"message":"json: cannot unmarshal number into Go struct field Config.message of type string"}`, + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"message":"value must be a string"}}}}`, string(resp.Body())) }) @@ -279,7 +309,7 @@ var _ = Describe("/plugins", Ordered, func() { EndpointId: utils.Pointer(entitiesConfig.Endpoints[0].ID), Name: "webhookx-signature", Enabled: true, - Config: entities.PluginConfiguration(`{"signing_secret":"abcde"}`), + Config: map[string]interface{}{"signing_secret": "abcde"}, } entitiesConfig.Plugins = []*entities.Plugin{entity} @@ -297,7 +327,7 @@ var _ = Describe("/plugins", Ordered, func() { assert.Equal(GinkgoT(), entity.EndpointId, result.EndpointId) assert.Equal(GinkgoT(), entity.Name, result.Name) assert.Equal(GinkgoT(), entity.Enabled, result.Enabled) - assert.Equal(GinkgoT(), `{"signing_secret": "abcde"}`, string(entity.Config)) + assert.EqualValues(GinkgoT(), map[string]interface{}{"signing_secret": "abcde"}, entity.Config) }) Context("errors", func() { @@ -330,7 +360,6 @@ var _ = Describe("/plugins", Ordered, func() { ID: utils.KSUID(), Name: "webhookx-signature", Enabled: true, - Config: entities.PluginConfiguration("{}"), EndpointId: utils.Pointer(endpoint.ID), } plugin.WorkspaceId = ws.ID @@ -358,7 +387,7 @@ var _ = Describe("/plugins", Ordered, func() { }) Context("errors", func() { - It("should return HTTP 400 for unkown plugin name", func() { + It("return HTTP 400 when configuration filed type does not match", func() { resp, err := adminClient.R(). SetBody(map[string]interface{}{ "config": map[string]interface{}{ @@ -370,7 +399,7 @@ var _ = Describe("/plugins", Ordered, func() { assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), - "{\"message\":\"json: cannot unmarshal number into Go struct field Config.signing_secret of type string\"}", + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"signing_secret":"value must be a string"}}}}`, string(resp.Body())) }) @@ -414,7 +443,6 @@ var _ = Describe("/plugins", Ordered, func() { ID: utils.KSUID(), Name: "webhookx-signature", Enabled: true, - Config: entities.PluginConfiguration("{}"), EndpointId: utils.Pointer(endpoint.ID), } entity.WorkspaceId = ws.ID diff --git a/test/cmd/admin_test.go b/test/cmd/admin_test.go index 7f94a123..b4fb78fc 100644 --- a/test/cmd/admin_test.go +++ b/test/cmd/admin_test.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "encoding/json" "fmt" . "github.com/onsi/ginkgo/v2" "github.com/stretchr/testify/assert" @@ -97,17 +98,19 @@ var _ = Describe("admin", Ordered, func() { assert.Equal(GinkgoT(), 1, len(plugins)) assert.Equal(GinkgoT(), "webhookx-signature", plugins[0].Name) assert.Equal(GinkgoT(), true, plugins[0].Enabled) - assert.Equal(GinkgoT(), `{"signing_secret": "foo"}`, string(plugins[0].Config)) + assert.EqualValues(GinkgoT(), map[string]interface{}{"signing_secret": "foo"}, plugins[0].Config) plugins, err = db.Plugins.ListSourcePlugin(context.TODO(), source.ID) assert.NoError(GinkgoT(), err) assert.Equal(GinkgoT(), 2, len(plugins)) assert.Equal(GinkgoT(), "function", plugins[0].Name) assert.Equal(GinkgoT(), true, plugins[0].Enabled) - assert.Equal(GinkgoT(), `{"function": "function handle() {}"}`, string(plugins[0].Config)) + assert.EqualValues(GinkgoT(), map[string]interface{}{"function": "function handle() {}"}, plugins[0].Config) assert.Equal(GinkgoT(), `jsonschema-validator`, plugins[1].Name) assert.Equal(GinkgoT(), true, plugins[1].Enabled) - assert.Equal(GinkgoT(), `{"draft": "6", "schemas": {"charge.succeeded": {"schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}, "default_schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" }\n },\n \"required\": [\"id\"]\n}\n"}`, string(plugins[1].Config)) + assert.JSONEq(GinkgoT(), + `{"draft": "6", "schemas": {"charge.succeeded": {"schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}, "default_schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" }\n },\n \"required\": [\"id\"]\n}\n"}`, + string(utils.Must(json.Marshal(plugins[1].Config)))) }) It("entities not defined in the declarative configuration should be deleted", func() { diff --git a/test/declarative/declarative_test.go b/test/declarative/declarative_test.go index 8179b6e1..9d5ee2c0 100644 --- a/test/declarative/declarative_test.go +++ b/test/declarative/declarative_test.go @@ -146,7 +146,7 @@ var _ = Describe("Declarative", Ordered, func() { assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"default_schema":"value must be a valid json string","draft":"required field missing","schemas[charge.succeed]":{"schema":"value must be a valid json string"}}}}}`, + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"default_schema":"string doesn't match the format \"jsonschema\" (invalid character 'i' looking for beginning of value)","schemas":{"charge.succeed":{"schema":"string doesn't match the format \"jsonschema\" (invalid character 'i' looking for beginning of value)"}}}}}}`, string(resp.Body())) }) @@ -159,7 +159,7 @@ var _ = Describe("Declarative", Ordered, func() { assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"default_schema":"unsupported 'type' value \"invlidObject\"","schemas[charge.succeed]":{"schema":"unsupported 'format' value \"invalid\""},"schemas[reuse.default_schema]":{"schema":"invalid due to reusing the default_schema definition"}}}}}`, + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"default_schema":"string doesn't match the format \"jsonschema\" (unsupported 'type' value \"invlidObject\")","schemas":{"charge.succeed":{"schema":"string doesn't match the format \"jsonschema\" (unsupported 'format' value \"invalid\")"},"reuse.default_schema":"Value is not nullable"}}}}}`, string(resp.Body())) }) }) diff --git a/test/delivery/delivery_test.go b/test/delivery/delivery_test.go index 4aa0d51d..01c35011 100644 --- a/test/delivery/delivery_test.go +++ b/test/delivery/delivery_test.go @@ -37,7 +37,7 @@ var _ = Describe("delivery", Ordered, func() { EndpointId: utils.Pointer(entitiesConfig.Endpoints[0].ID), Name: "webhookx-signature", Enabled: true, - Config: []byte(`{"key":"abcdefg"}`), + Config: map[string]interface{}{"key": "abcdefg"}, }} BeforeAll(func() { diff --git a/test/delivery/http_proxy_test.go b/test/delivery/http_proxy_test.go index 2e6ec2bb..72619e5f 100644 --- a/test/delivery/http_proxy_test.go +++ b/test/delivery/http_proxy_test.go @@ -473,6 +473,7 @@ var _ = Describe("Proxy", Ordered, func() { assert.Nil(GinkgoT(), attempt.Response.Headers) assert.Nil(GinkgoT(), attempt.Response.Body) + time.Sleep(time.Millisecond * 100) detail, err := db.AttemptDetails.Get(context.TODO(), attempt.ID) assert.NoError(GinkgoT(), err) assert.NotNil(GinkgoT(), detail.RequestHeaders) diff --git a/test/fixtures/plugins/hello/hello_plugin.go b/test/fixtures/plugins/hello/hello_plugin.go index 06ffe80b..fbc03a62 100644 --- a/test/fixtures/plugins/hello/hello_plugin.go +++ b/test/fixtures/plugins/hello/hello_plugin.go @@ -1,37 +1,46 @@ package hello import ( + "context" "fmt" + "github.com/getkin/kin-openapi/openapi3" "github.com/webhookx-io/webhookx/pkg/plugin" - "github.com/webhookx-io/webhookx/utils" ) -type Config struct { - Message string `json:"message" validate:"required"` +var schemaJSON = ` +{ + "type": "object", + "properties": { + "message": { + "type": "string" + } + }, + "required": ["message"] } +` -type HelloPlugin struct { - plugin.BasePlugin[Config] +type Config struct { + Message string `json:"message"` } -func New(config []byte) (plugin.Plugin, error) { - p := &HelloPlugin{} - p.Name = "hello" - - if config != nil { - if err := p.UnmarshalConfig(config); err != nil { - return nil, err - } +func (c Config) Schema() *openapi3.Schema { + schema := openapi3.NewSchema() + err := schema.UnmarshalJSON([]byte(schemaJSON)) + if err != nil { + panic(err) } + return schema +} - return p, nil +type HelloPlugin struct { + plugin.BasePlugin[Config] } -func (p *HelloPlugin) ValidateConfig() error { - return utils.Validate(p.Config) +func (p *HelloPlugin) Name() string { + return "hello" } -func (p *HelloPlugin) ExecuteOutbound(outbound *plugin.Outbound, _ *plugin.Context) error { +func (p *HelloPlugin) ExecuteOutbound(ctx context.Context, outbound *plugin.Outbound) error { fmt.Println(p.Config.Message) return nil } diff --git a/test/fixtures/plugins/inbound/inbound_plugin.go b/test/fixtures/plugins/inbound/inbound_plugin.go index 02da9bef..692b28e1 100644 --- a/test/fixtures/plugins/inbound/inbound_plugin.go +++ b/test/fixtures/plugins/inbound/inbound_plugin.go @@ -1,34 +1,26 @@ package inbound import ( + "context" + "github.com/getkin/kin-openapi/openapi3" "github.com/webhookx-io/webhookx/pkg/plugin" - "github.com/webhookx-io/webhookx/utils" ) type Config struct { } -type InboundPlugin struct { - plugin.BasePlugin[Config] +func (c Config) Schema() *openapi3.Schema { + return openapi3.NewObjectSchema() } -func New(config []byte) (plugin.Plugin, error) { - p := &InboundPlugin{} - p.Name = "inbound" - - if config != nil { - if err := p.UnmarshalConfig(config); err != nil { - return nil, err - } - } - - return p, nil +type InboundPlugin struct { + plugin.BasePlugin[Config] } -func (p *InboundPlugin) ValidateConfig() error { - return utils.Validate(p.Config) +func (p *InboundPlugin) Name() string { + return "inbound" } -func (p *InboundPlugin) ExecuteInbound(inbound *plugin.Inbound) (res plugin.InboundResult, err error) { +func (p *InboundPlugin) ExecuteInbound(ctx context.Context, inbound *plugin.Inbound) (res plugin.InboundResult, err error) { return } diff --git a/test/fixtures/plugins/outbound/outbound_plugin.go b/test/fixtures/plugins/outbound/outbound_plugin.go index 8739d4ad..2605acea 100644 --- a/test/fixtures/plugins/outbound/outbound_plugin.go +++ b/test/fixtures/plugins/outbound/outbound_plugin.go @@ -1,34 +1,26 @@ package outbound import ( + "context" + "github.com/getkin/kin-openapi/openapi3" "github.com/webhookx-io/webhookx/pkg/plugin" - "github.com/webhookx-io/webhookx/utils" ) type Config struct { } -type OutboundPlugin struct { - plugin.BasePlugin[Config] +func (c Config) Schema() *openapi3.Schema { + return openapi3.NewObjectSchema() } -func New(config []byte) (plugin.Plugin, error) { - p := &OutboundPlugin{} - p.Name = "outbound" - - if config != nil { - if err := p.UnmarshalConfig(config); err != nil { - return nil, err - } - } - - return p, nil +type OutboundPlugin struct { + plugin.BasePlugin[Config] } -func (p *OutboundPlugin) ValidateConfig() error { - return utils.Validate(p.Config) +func (p *OutboundPlugin) Name() string { + return "outbound" } -func (p *OutboundPlugin) ExecuteOutbound(outbound *plugin.Outbound, _ *plugin.Context) error { +func (p *OutboundPlugin) ExecuteOutbound(ctx context.Context, outbound *plugin.Outbound) error { return nil } diff --git a/test/helper/factory/factory.go b/test/helper/factory/factory.go index 447cd3e4..fdfdb3e6 100644 --- a/test/helper/factory/factory.go +++ b/test/helper/factory/factory.go @@ -6,6 +6,7 @@ import ( "github.com/getkin/kin-openapi/openapi3" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/pkg/openapi" + "github.com/webhookx-io/webhookx/pkg/plugin" "github.com/webhookx-io/webhookx/utils" ) @@ -150,7 +151,7 @@ func defaultPlugin() entities.Plugin { SetDefault(entities.LookupSchema("Plugin"), &entity) entity.ID = utils.KSUID() - entity.Config = entities.PluginConfiguration("{}") + entity.Config = make(map[string]interface{}) return entity } @@ -187,19 +188,13 @@ func WithPluginMetadata(metadata map[string]string) PluginOption { } } -func WithPluginConfig(config interface{}) PluginOption { +func WithPluginConfig(config plugin.Configuration) PluginOption { return func(e *entities.Plugin) { - b, err := json.Marshal(config) + properties, err := utils.StructToMap(config) if err != nil { panic(err) } - e.Config = b - } -} - -func WithPluginConfigJSON(json string) PluginOption { - return func(e *entities.Plugin) { - e.Config = entities.PluginConfiguration(json) + e.Config = properties } } diff --git a/test/helper/helper.go b/test/helper/helper.go index 64010e15..89ae086f 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -151,6 +151,7 @@ func ExecAppCommand(args ...string) (output string, err error) { func AdminClient() *resty.Client { c := resty.New() c.SetBaseURL(AdminHttpURL) + c.DisableWarn = true return c } @@ -164,6 +165,7 @@ func AdminTLSClient() *resty.Client { func ProxyClient() *resty.Client { c := resty.New() c.SetBaseURL(ProxyHttpURL) + c.DisableWarn = true return c } @@ -177,6 +179,7 @@ func ProxyTLSClient() *resty.Client { func StatusClient() *resty.Client { c := resty.New() c.SetBaseURL(StatusHttpURL) + c.DisableWarn = true return c } diff --git a/test/metrics/opentelemetry_test.go b/test/metrics/opentelemetry_test.go index 11c54626..863bb403 100644 --- a/test/metrics/opentelemetry_test.go +++ b/test/metrics/opentelemetry_test.go @@ -11,6 +11,7 @@ import ( "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/helper/factory" + "github.com/webhookx-io/webhookx/utils" "testing" "time" ) @@ -28,21 +29,26 @@ var _ = Describe("opentelemetry", Ordered, func() { BeforeAll(func() { entitiesConfig := helper.EntitiesConfig{ - Endpoints: []*entities.Endpoint{factory.EndpointP(), factory.EndpointP()}, - Sources: []*entities.Source{factory.SourceP(factory.WithSourceAsync(true))}, + Endpoints: []*entities.Endpoint{ + factory.EndpointP(), + factory.EndpointP(func(e *entities.Endpoint) { + e.Request.Timeout = 1 + }), + }, + Sources: []*entities.Source{factory.SourceP()}, } - entitiesConfig.Endpoints[1].Request.Timeout = 1 helper.InitDB(true, &entitiesConfig) helper.InitOtelOutput() proxyClient = helper.ProxyClient() - var err error - app, err = helper.Start(map[string]string{ + app = utils.Must(helper.Start(map[string]string{ "WEBHOOKX_METRICS_EXPORTS": "opentelemetry", "WEBHOOKX_METRICS_PUSH_INTERVAL": "3", "WEBHOOKX_METRICS_OPENTELEMETRY_PROTOCOL": protocol, "WEBHOOKX_METRICS_OPENTELEMETRY_ENDPOINT": endpoints[protocol], - }) - assert.Nil(GinkgoT(), err) + })) + + err := helper.WaitForServer(helper.ProxyHttpURL, time.Second) + assert.NoError(GinkgoT(), err) }) AfterAll(func() { @@ -50,16 +56,17 @@ var _ = Describe("opentelemetry", Ordered, func() { }) It("sanity", func() { - assert.Eventually(GinkgoT(), func() bool { + n, err := helper.FileCountLine(helper.OtelCollectorMetricsFile) + assert.Nil(GinkgoT(), err) + n++ + + for i := 0; i < 3; i++ { resp, err := proxyClient.R(). - SetBody(`{ - "event_type": "foo.bar", - "data": { - "key": "value" - } - }`).Post("/") - return err == nil && resp.StatusCode() == 200 - }, time.Second*5, time.Second) + SetBody(`{"event_type": "foo.bar","data": {"key": "value"}}`). + Post("/") + assert.NoError(GinkgoT(), err) + assert.Equal(GinkgoT(), 200, resp.StatusCode()) + } expected := []string{ "webhookx.runtime.num_goroutine", @@ -84,9 +91,6 @@ var _ = Describe("opentelemetry", Ordered, func() { "webhookx.attempt.failed", } - n, err := helper.FileCountLine(helper.OtelCollectorMetricsFile) - assert.Nil(GinkgoT(), err) - n++ uploaded := make(map[string]bool) assert.Eventually(GinkgoT(), func() bool { line, err := helper.FileLine(helper.OtelCollectorMetricsFile, n) diff --git a/worker/worker.go b/worker/worker.go index e81a1fe9..53c36079 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -383,18 +383,21 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er Payload: data.Event, } maps.Copy(outbound.Headers, endpoint.Request.Headers) - pluginCtx := &plugin.Context{ - //Workspace: workspace, - } + for _, p := range plugins { - executor, err := p.Plugin() + executor, err := p.ToPlugin() + if err != nil { + return err + } + + err = executor.Init(p.Config) if err != nil { return err } - err = executor.ExecuteOutbound(&outbound, pluginCtx) + err = executor.ExecuteOutbound(context.TODO(), &outbound) if err != nil { - return fmt.Errorf("failed to execute %s plugin: %v", p.Name, err) + return fmt.Errorf("failed to execute %s plugin: %v", executor.Name(), err) } }