diff --git a/app/app.go b/app/app.go index 2288fede..fbe91745 100644 --- a/app/app.go +++ b/app/app.go @@ -174,10 +174,29 @@ func (app *Application) initialize() error { // worker if cfg.Worker.Enabled { + d := deliverer.NewHTTPDeliverer(deliverer.Options{ + Logger: log, + RequestTimeout: time.Duration(cfg.Worker.Deliverer.Timeout) * time.Millisecond, + AccessControlOptions: deliverer.AccessControlOptions{ + Deny: cfg.Worker.Deliverer.ACL.Deny, + }, + }) + if cfg.Worker.Deliverer.Proxy != "" { + err := d.SetupProxy(deliverer.ProxyOptions{ + URL: cfg.Worker.Deliverer.Proxy, + TLSCert: cfg.Worker.Deliverer.ProxyTLSCert, + TLSKey: cfg.Worker.Deliverer.ProxyTLSKey, + TLSCaCertificate: cfg.Worker.Deliverer.ProxyTLSCaCert, + TLSVerify: cfg.Worker.Deliverer.ProxyTLSVerify, + }) + if err != nil { + return err + } + } opts := worker.Options{ PoolSize: int(cfg.Worker.Pool.Size), PoolConcurrency: int(cfg.Worker.Pool.Concurrency), - Deliverer: deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer), + Deliverer: d, DB: db, Srv: app.srv, Tracer: tracer, diff --git a/config.yml b/config.yml index 8f471175..db5ae33a 100644 --- a/config.yml +++ b/config.yml @@ -91,6 +91,15 @@ worker: # - '2606:2800:220:1:248:1893:25c8:1946' # - '*.example.com' # + #proxy: # Proxy server URL. Supports HTTP and HTTPS. + # When a proxy is enabled, the ACL is automatically disabled. + # Example of HTTP: http://: + # Example of HTTPS: https://: + #proxy_tls_cert: # Path to the client certificate file used for mTLS proxy authentication. + #proxy_tls_key: # Path to the client private key file used for mTLS proxy authentication. + #proxy_tls_ca_cert: # Path to the CA certificate file used to verify the HTTPS proxy’s certificate. + #proxy_tls_verify: true # Whether to verify the proxy server's TLS certificate. + pool: size: 10000 # pool size, default to 10000. concurrency: 0 # pool concurrency, default to 100 * CPUs diff --git a/config/config_test.go b/config/config_test.go index 924672ff..57d4ebd6 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -431,6 +431,54 @@ func TestWorkerConfig(t *testing.T) { } } +func TestWorkerProxyConfig(t *testing.T) { + tests := []struct { + desc string + cfg WorkerDeliverer + validateErr error + }{ + { + desc: "sanity", + cfg: WorkerDeliverer{ + Proxy: "http://example.com:8080", + }, + validateErr: nil, + }, + { + desc: "invalid proxy url: missing schema", + cfg: WorkerDeliverer{ + Proxy: "example.com", + }, + validateErr: errors.New("invalid proxy url: 'example.com'"), + }, + { + desc: "invalid proxy url: invalid schema ", + cfg: WorkerDeliverer{ + Proxy: "ftp://example.com", + }, + validateErr: errors.New("proxy schema must be http or https"), + }, + { + desc: "invalid proxy url: missing host ", + cfg: WorkerDeliverer{ + Proxy: "http://", + }, + validateErr: errors.New("invalid proxy url: 'http://'"), + }, + { + desc: "invalid proxy url: missing host ", + cfg: WorkerDeliverer{ + Proxy: "http ://", + }, + validateErr: errors.New("invalid proxy url: parse \"http ://\": first path segment in URL cannot contain colon"), + }, + } + for _, test := range tests { + actual := test.cfg.Validate() + assert.Equal(t, test.validateErr, actual, "expected %v got %v", test.validateErr, actual) + } +} + func TestConfig(t *testing.T) { cfg, err := Init() assert.Nil(t, err) diff --git a/config/worker.go b/config/worker.go index c5c204e0..aec92baf 100644 --- a/config/worker.go +++ b/config/worker.go @@ -3,13 +3,19 @@ package config import ( "fmt" "net/netip" + "net/url" "regexp" "slices" ) type WorkerDeliverer struct { - Timeout int64 `yaml:"timeout" json:"timeout" default:"60000"` - ACL ACLConfig `yaml:"acl" json:"acl"` + Timeout int64 `yaml:"timeout" json:"timeout" default:"60000"` + ACL ACLConfig `yaml:"acl" json:"acl"` + Proxy string `yaml:"proxy" json:"proxy"` + ProxyTLSCert string `yaml:"proxy_tls_cert" json:"proxy_tls_cert" envconfig:"PROXY_TLS_CERT"` + ProxyTLSKey string `yaml:"proxy_tls_key" json:"proxy_tls_key" envconfig:"PROXY_TLS_KEY"` + ProxyTLSCaCert string `yaml:"proxy_tls_ca_cert" json:"proxy_tls_ca_cert" envconfig:"PROXY_TLS_CA_CERT"` + ProxyTLSVerify bool `yaml:"proxy_tls_verify" json:"proxy_tls_verify" envconfig:"PROXY_TLS_VERIFY"` } func (cfg *WorkerDeliverer) Validate() error { @@ -19,6 +25,19 @@ func (cfg *WorkerDeliverer) Validate() error { if err := cfg.ACL.Validate(); err != nil { return err } + if cfg.Proxy != "" { + u, err := url.Parse(cfg.Proxy) + if err != nil { + return fmt.Errorf("invalid proxy url: %s", err) + } + if u.Scheme == "" || u.Host == "" { + return fmt.Errorf("invalid proxy url: '%s'", cfg.Proxy) + } + if u.Scheme != "http" && u.Scheme != "https" { + return fmt.Errorf("proxy schema must be http or https") + } + } + return nil } diff --git a/go.mod b/go.mod index 1996cf2a..cc1c7ab3 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/creasty/defaults v1.8.0 github.com/dop251/goja v0.0.0-20250309171923-bcd7cc6bf64c + github.com/elazarl/goproxy v1.7.2 github.com/getkin/kin-openapi v0.132.0 github.com/go-kit/kit v0.13.0 github.com/go-playground/validator/v10 v10.26.0 diff --git a/go.sum b/go.sum index 0ee2d5aa..2f1e163e 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dop251/goja v0.0.0-20250309171923-bcd7cc6bf64c h1:mxWGS0YyquJ/ikZOjSrRjjFIbUqIP9ojyYQ+QZTU3Rg= github.com/dop251/goja v0.0.0-20250309171923-bcd7cc6bf64c/go.mod h1:MxLav0peU43GgvwVgNbLAj1s/bSGboKkhuULvq/7hx4= +github.com/elazarl/goproxy v1.7.2 h1:Y2o6urb7Eule09PjlhQRGNsqRfPmYI3KKQLFpCAV3+o= +github.com/elazarl/goproxy v1.7.2/go.mod h1:82vkLNir0ALaW14Rc399OTTjyNREgmdL2cVoIbS6XaE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= diff --git a/test/delivery/http_proxy_test.go b/test/delivery/http_proxy_test.go new file mode 100644 index 00000000..2361650c --- /dev/null +++ b/test/delivery/http_proxy_test.go @@ -0,0 +1,567 @@ +package delivery + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "github.com/elazarl/goproxy" + "github.com/go-resty/resty/v2" + . "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/test" + "github.com/webhookx-io/webhookx/test/helper" + "github.com/webhookx-io/webhookx/test/helper/factory" + "github.com/webhookx-io/webhookx/utils" + "github.com/webhookx-io/webhookx/worker/deliverer" + "log" + "net" + "net/http" + "os" + "time" +) + +type NoopLogger struct{} + +func (NoopLogger) Printf(format string, v ...any) {} + +func NewHttpProxyServer(addr string) *http.Server { + proxy := goproxy.NewProxyHttpServer() + proxy.Logger = &NoopLogger{} + proxy.OnRequest(goproxy.ReqHostIs("deny.localhost:443")).HandleConnectFunc(func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) { + return &goproxy.ConnectAction{ + Action: goproxy.ConnectHijack, + Hijack: func(req *http.Request, client net.Conn, ctx *goproxy.ProxyCtx) { + body := `{"error": "Proxy Authentication Required", "code": 407}` + resp := fmt.Sprintf("HTTP/1.1 407 Proxy Authentication Required\r\n"+ + "Content-Type: application/json\r\n"+ + "Content-Length: %d\r\n"+ + "\r\n%s", len(body), body) + client.Write([]byte(resp)) + }, + }, host + }) + proxy.OnRequest().HandleConnect(goproxy.AlwaysMitm) + proxy.OnResponse().DoFunc(func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response { + resp.Header.Set("X-Proxied", "true") + return resp + }) + return &http.Server{ + Addr: addr, + Handler: proxy, + } +} + +var _ = Describe("Proxy", Ordered, func() { + + var httpsBinURL = "https://localhost:9443" + + var httpProxyListen = "127.0.0.1:9901" + var httpsProxyListen = "127.0.0.1:9902" + var mtlsProxyListen = "localhost:9903" + var httpProxyURL = "http://" + httpProxyListen + var httpsProxyURL = "https://" + httpsProxyListen + var mtlsProxyURL = "https://" + mtlsProxyListen + + BeforeAll(func() { + deliverer.DefaultTLSConfig = &tls.Config{InsecureSkipVerify: true} // mock tls config + httpProxyServer := NewHttpProxyServer(httpProxyListen) + go func() { + log.Fatal(httpProxyServer.ListenAndServe()) + }() + httpsProxyServer := NewHttpProxyServer(httpsProxyListen) + go func() { + log.Fatal(httpsProxyServer.ListenAndServeTLS(test.FilePath("server.crt"), test.FilePath("server.key"))) + }() + mTLScert, err := tls.LoadX509KeyPair( + test.FilePath("fixtures/mtls/server.crt"), + test.FilePath("fixtures/mtls/server.key")) + if err != nil { + panic(err) + } + pem, err := os.ReadFile(test.FilePath("fixtures/mtls/client-ca.crt")) + if err != nil { + panic(err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(pem) { + panic("failed to append CA certs") + } + mtlsProxyServer := NewHttpProxyServer(mtlsProxyListen) + mtlsProxyServer.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{mTLScert}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: pool, + } + go func() { + log.Fatal(mtlsProxyServer.ListenAndServeTLS("", "")) + }() + }) + + AfterAll(func() { + deliverer.DefaultTLSConfig = nil // reset tls config + }) + + Context("HTTP URL", func() { + var proxyClient *resty.Client + + var app *app.Application + var db *db.DB + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{ + factory.EndpointP(func(o *entities.Endpoint) { + o.Events = []string{"http"} + }), + factory.EndpointP(func(o *entities.Endpoint) { + o.Request.URL = httpsBinURL + "/anything" + o.Events = []string{"https"} + }), + factory.EndpointP(func(o *entities.Endpoint) { + o.Request.URL = "https://deny.localhost" + o.Events = []string{"deny"} + }), + }, + Sources: []*entities.Source{factory.SourceP()}, + } + + BeforeAll(func() { + db = helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + + app = utils.Must(helper.Start(map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_WORKER_ENABLED": "true", + "WEBHOOKX_WORKER_DELIVERER_PROXY": httpProxyURL, + })) + + }) + + AfterAll(func() { + app.Stop() + }) + + It("http delivery request should be proxied", func() { + err := waitForServer("0.0.0.0:8081", time.Second) + assert.NoError(GinkgoT(), err) + + resp, err := proxyClient.R(). + SetBody(`{"event_type": "http","data": {"key": "value"}}`). + Post("/") + assert.NoError(GinkgoT(), err) + assert.Equal(GinkgoT(), 200, resp.StatusCode()) + eventId := resp.Header().Get(constants.HeaderEventId) + + var attempt *entities.Attempt + assert.Eventually(GinkgoT(), func() bool { + q := query.AttemptQuery{} + q.EventId = &eventId + list, err := db.Attempts.List(context.TODO(), &q) + if err != nil || len(list) == 0 { + return false + } + attempt = list[0] + return attempt.Status == entities.AttemptStatusSuccess + }, time.Second*5, time.Second) + + assert.Equal(GinkgoT(), entitiesConfig.Endpoints[0].ID, attempt.EndpointId) + + // attempt.request + assert.Equal(GinkgoT(), "POST", attempt.Request.Method) + assert.Equal(GinkgoT(), "http://localhost:9999/anything", attempt.Request.URL) + assert.Nil(GinkgoT(), attempt.Request.Headers) + assert.Nil(GinkgoT(), attempt.Request.Body) + + // attempt.resposne + assert.True(GinkgoT(), attempt.Response.Latency > 0) + assert.Equal(GinkgoT(), 200, attempt.Response.Status) + assert.Nil(GinkgoT(), attempt.Response.Headers) + assert.Nil(GinkgoT(), attempt.Response.Body) + + detail, err := db.AttemptDetails.Get(context.TODO(), attempt.ID) + assert.NoError(GinkgoT(), err) + assert.NotNil(GinkgoT(), detail) + assert.NotNil(GinkgoT(), detail.RequestHeaders) + assert.NotNil(GinkgoT(), detail.RequestBody) + assert.NotNil(GinkgoT(), detail.ResponseHeaders) + assert.NotNil(GinkgoT(), detail.ResponseBody) + assert.Equal(GinkgoT(), "true", (*detail.ResponseHeaders)["X-Proxied"]) + }) + + It("https delivery request should be proxied", func() { + err := waitForServer("0.0.0.0:8081", time.Second) + assert.NoError(GinkgoT(), err) + + resp, err := proxyClient.R(). + SetBody(`{"event_type": "https","data": {"key": "value"}}`). + Post("/") + assert.NoError(GinkgoT(), err) + assert.Equal(GinkgoT(), 200, resp.StatusCode()) + eventId := resp.Header().Get(constants.HeaderEventId) + + var attempt *entities.Attempt + assert.Eventually(GinkgoT(), func() bool { + q := query.AttemptQuery{} + q.EventId = &eventId + list, err := db.Attempts.List(context.TODO(), &q) + if err != nil || len(list) == 0 { + return false + } + attempt = list[0] + return attempt.Status == entities.AttemptStatusSuccess + }, time.Second*5, time.Second) + + assert.Equal(GinkgoT(), entitiesConfig.Endpoints[1].ID, attempt.EndpointId) + + // attempt.request + assert.Equal(GinkgoT(), "POST", attempt.Request.Method) + assert.Equal(GinkgoT(), "https://localhost:9443/anything", attempt.Request.URL) + assert.Nil(GinkgoT(), attempt.Request.Headers) + assert.Nil(GinkgoT(), attempt.Request.Body) + + // attempt.resposne + assert.True(GinkgoT(), attempt.Response.Latency > 0) + assert.Equal(GinkgoT(), 200, attempt.Response.Status) + assert.Nil(GinkgoT(), attempt.Response.Headers) + assert.Nil(GinkgoT(), attempt.Response.Body) + + detail, err := db.AttemptDetails.Get(context.TODO(), attempt.ID) + assert.NoError(GinkgoT(), err) + assert.NotNil(GinkgoT(), detail.RequestHeaders) + assert.NotNil(GinkgoT(), detail.RequestBody) + assert.NotNil(GinkgoT(), detail.ResponseHeaders) + assert.NotNil(GinkgoT(), detail.ResponseBody) + assert.Equal(GinkgoT(), "true", (*detail.ResponseHeaders)["X-Proxied"]) + }) + + It("should be failed when connect ", func() { + err := waitForServer("0.0.0.0:8081", time.Second) + assert.NoError(GinkgoT(), err) + + resp, err := proxyClient.R(). + SetBody(`{"event_type": "deny","data": {"key": "value"}}`). + Post("/") + assert.NoError(GinkgoT(), err) + assert.Equal(GinkgoT(), 200, resp.StatusCode()) + eventId := resp.Header().Get(constants.HeaderEventId) + + var attempt *entities.Attempt + assert.Eventually(GinkgoT(), func() bool { + q := query.AttemptQuery{} + q.EventId = &eventId + list, err := db.Attempts.List(context.TODO(), &q) + if err != nil || len(list) == 0 { + return false + } + attempt = list[0] + return attempt.Status == entities.AttemptStatusFailure + }, time.Second*5, time.Second) + + assert.Equal(GinkgoT(), entitiesConfig.Endpoints[2].ID, attempt.EndpointId) + + // attempt.request + assert.Equal(GinkgoT(), "POST", attempt.Request.Method) + assert.Equal(GinkgoT(), "https://deny.localhost", attempt.Request.URL) + }) + }) + + Context("HTTPS URL", func() { + Context("scenario: tls verify = false", func() { + var proxyClient *resty.Client + + var app *app.Application + var db *db.DB + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{ + factory.EndpointP(func(o *entities.Endpoint) { + o.Events = []string{"http"} + }), + factory.EndpointP(func(o *entities.Endpoint) { + o.Request.URL = httpsBinURL + "/anything" + o.Events = []string{"https"} + }), + }, + Sources: []*entities.Source{factory.SourceP()}, + } + + BeforeAll(func() { + deliverer.DefaultTLSConfig = &tls.Config{InsecureSkipVerify: true} // mock tls config + db = helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + + app = utils.Must(helper.Start(map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_WORKER_ENABLED": "true", + "WEBHOOKX_WORKER_DELIVERER_PROXY": httpsProxyURL, + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_VERIFY": "TRUE", + })) + + }) + + AfterAll(func() { + app.Stop() + deliverer.DefaultTLSConfig = nil // reset tls config + }) + + It("http delivery request should be proxied", func() { + err := waitForServer("0.0.0.0:8081", time.Second) + assert.NoError(GinkgoT(), err) + + resp, err := proxyClient.R(). + SetBody(`{"event_type": "http","data": {"key": "value"}}`). + Post("/") + assert.NoError(GinkgoT(), err) + assert.Equal(GinkgoT(), 200, resp.StatusCode()) + eventId := resp.Header().Get(constants.HeaderEventId) + + var attempt *entities.Attempt + assert.Eventually(GinkgoT(), func() bool { + q := query.AttemptQuery{} + q.EventId = &eventId + list, err := db.Attempts.List(context.TODO(), &q) + if err != nil || len(list) == 0 { + return false + } + attempt = list[0] + return attempt.Status == entities.AttemptStatusSuccess + }, time.Second*5, time.Second) + + assert.Equal(GinkgoT(), entitiesConfig.Endpoints[0].ID, attempt.EndpointId) + + // attempt.request + assert.Equal(GinkgoT(), "POST", attempt.Request.Method) + assert.Equal(GinkgoT(), "http://localhost:9999/anything", attempt.Request.URL) + assert.Nil(GinkgoT(), attempt.Request.Headers) + assert.Nil(GinkgoT(), attempt.Request.Body) + + // attempt.resposne + assert.True(GinkgoT(), attempt.Response.Latency > 0) + assert.Equal(GinkgoT(), 200, attempt.Response.Status) + assert.Nil(GinkgoT(), attempt.Response.Headers) + assert.Nil(GinkgoT(), attempt.Response.Body) + + detail, err := db.AttemptDetails.Get(context.TODO(), attempt.ID) + assert.NoError(GinkgoT(), err) + assert.NotNil(GinkgoT(), detail.RequestHeaders) + assert.NotNil(GinkgoT(), detail.RequestBody) + assert.NotNil(GinkgoT(), detail.ResponseHeaders) + assert.NotNil(GinkgoT(), detail.ResponseBody) + assert.Equal(GinkgoT(), "true", (*detail.ResponseHeaders)["X-Proxied"]) + }) + + It("https delivery request should be proxied", func() { + err := waitForServer("0.0.0.0:8081", time.Second) + assert.NoError(GinkgoT(), err) + + resp, err := proxyClient.R(). + SetBody(`{"event_type": "https","data": {"key": "value"}}`). + Post("/") + assert.NoError(GinkgoT(), err) + assert.Equal(GinkgoT(), 200, resp.StatusCode()) + eventId := resp.Header().Get(constants.HeaderEventId) + + var attempt *entities.Attempt + assert.Eventually(GinkgoT(), func() bool { + q := query.AttemptQuery{} + q.EventId = &eventId + list, err := db.Attempts.List(context.TODO(), &q) + if err != nil || len(list) == 0 { + return false + } + attempt = list[0] + return attempt.Status == entities.AttemptStatusSuccess + }, time.Second*5, time.Second) + + assert.Equal(GinkgoT(), entitiesConfig.Endpoints[1].ID, attempt.EndpointId) + + // attempt.request + assert.Equal(GinkgoT(), "POST", attempt.Request.Method) + assert.Equal(GinkgoT(), "https://localhost:9443/anything", attempt.Request.URL) + assert.Nil(GinkgoT(), attempt.Request.Headers) + assert.Nil(GinkgoT(), attempt.Request.Body) + + // attempt.resposne + assert.True(GinkgoT(), attempt.Response.Latency > 0) + assert.Equal(GinkgoT(), 200, attempt.Response.Status) + assert.Nil(GinkgoT(), attempt.Response.Headers) + assert.Nil(GinkgoT(), attempt.Response.Body) + + detail, err := db.AttemptDetails.Get(context.TODO(), attempt.ID) + assert.NoError(GinkgoT(), err) + assert.NotNil(GinkgoT(), detail.RequestHeaders) + assert.NotNil(GinkgoT(), detail.RequestBody) + assert.NotNil(GinkgoT(), detail.ResponseHeaders) + assert.NotNil(GinkgoT(), detail.ResponseBody) + assert.Equal(GinkgoT(), "true", (*detail.ResponseHeaders)["X-Proxied"]) + }) + }) + + Context("scenario: mTLS", func() { + var proxyClient *resty.Client + + var app *app.Application + var db *db.DB + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{ + factory.EndpointP(func(o *entities.Endpoint) { + o.Events = []string{"http"} + }), + factory.EndpointP(func(o *entities.Endpoint) { + o.Request.URL = httpsBinURL + "/anything" + o.Events = []string{"https"} + }), + }, + Sources: []*entities.Source{factory.SourceP()}, + } + + BeforeAll(func() { + deliverer.DefaultTLSConfig = &tls.Config{InsecureSkipVerify: true} // mock tls config + db = helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + + app = utils.Must(helper.Start(map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_WORKER_ENABLED": "true", + "WEBHOOKX_WORKER_DELIVERER_PROXY": mtlsProxyURL, + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_CERT": test.FilePath("fixtures/mtls/client.crt"), + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_KEY": test.FilePath("fixtures/mtls/client.key"), + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_CA_CERT": test.FilePath("fixtures/mtls/server-ca.crt"), + })) + + }) + + AfterAll(func() { + app.Stop() + deliverer.DefaultTLSConfig = nil // reset tls config + }) + + It("http delivery request should be proxied", func() { + err := waitForServer("0.0.0.0:8081", time.Second) + assert.NoError(GinkgoT(), err) + + resp, err := proxyClient.R(). + SetBody(`{"event_type": "http","data": {"key": "value"}}`). + Post("/") + assert.NoError(GinkgoT(), err) + assert.Equal(GinkgoT(), 200, resp.StatusCode()) + eventId := resp.Header().Get(constants.HeaderEventId) + + var attempt *entities.Attempt + assert.Eventually(GinkgoT(), func() bool { + q := query.AttemptQuery{} + q.EventId = &eventId + list, err := db.Attempts.List(context.TODO(), &q) + if err != nil || len(list) == 0 { + return false + } + attempt = list[0] + return attempt.Status == entities.AttemptStatusSuccess + }, time.Second*5, time.Second) + + assert.Equal(GinkgoT(), entitiesConfig.Endpoints[0].ID, attempt.EndpointId) + + // attempt.request + assert.Equal(GinkgoT(), "POST", attempt.Request.Method) + assert.Equal(GinkgoT(), "http://localhost:9999/anything", attempt.Request.URL) + assert.Nil(GinkgoT(), attempt.Request.Headers) + assert.Nil(GinkgoT(), attempt.Request.Body) + + // attempt.resposne + assert.True(GinkgoT(), attempt.Response.Latency > 0) + assert.Equal(GinkgoT(), 200, attempt.Response.Status) + assert.Nil(GinkgoT(), attempt.Response.Headers) + assert.Nil(GinkgoT(), attempt.Response.Body) + + detail, err := db.AttemptDetails.Get(context.TODO(), attempt.ID) + assert.NoError(GinkgoT(), err) + assert.NotNil(GinkgoT(), detail.RequestHeaders) + assert.NotNil(GinkgoT(), detail.RequestBody) + assert.NotNil(GinkgoT(), detail.ResponseHeaders) + assert.NotNil(GinkgoT(), detail.ResponseBody) + assert.Equal(GinkgoT(), "true", (*detail.ResponseHeaders)["X-Proxied"]) + }) + + It("https delivery request should be proxied", func() { + err := waitForServer("0.0.0.0:8081", time.Second) + assert.NoError(GinkgoT(), err) + + resp, err := proxyClient.R(). + SetBody(`{"event_type": "https","data": {"key": "value"}}`). + Post("/") + assert.NoError(GinkgoT(), err) + assert.Equal(GinkgoT(), 200, resp.StatusCode()) + eventId := resp.Header().Get(constants.HeaderEventId) + + var attempt *entities.Attempt + assert.Eventually(GinkgoT(), func() bool { + q := query.AttemptQuery{} + q.EventId = &eventId + list, err := db.Attempts.List(context.TODO(), &q) + if err != nil || len(list) == 0 { + return false + } + attempt = list[0] + return attempt.Status == entities.AttemptStatusSuccess + }, time.Second*5, time.Second) + + assert.Equal(GinkgoT(), entitiesConfig.Endpoints[1].ID, attempt.EndpointId) + + // attempt.request + assert.Equal(GinkgoT(), "POST", attempt.Request.Method) + assert.Equal(GinkgoT(), "https://localhost:9443/anything", attempt.Request.URL) + assert.Nil(GinkgoT(), attempt.Request.Headers) + assert.Nil(GinkgoT(), attempt.Request.Body) + + // attempt.resposne + assert.True(GinkgoT(), attempt.Response.Latency > 0) + assert.Equal(GinkgoT(), 200, attempt.Response.Status) + assert.Nil(GinkgoT(), attempt.Response.Headers) + assert.Nil(GinkgoT(), attempt.Response.Body) + + detail, err := db.AttemptDetails.Get(context.TODO(), attempt.ID) + assert.NoError(GinkgoT(), err) + assert.NotNil(GinkgoT(), detail.RequestHeaders) + assert.NotNil(GinkgoT(), detail.RequestBody) + assert.NotNil(GinkgoT(), detail.ResponseHeaders) + assert.NotNil(GinkgoT(), detail.ResponseBody) + assert.Equal(GinkgoT(), "true", (*detail.ResponseHeaders)["X-Proxied"]) + }) + }) + }) + + Context("error", func() { + It("returns error when certificate not found", func() { + _, err := helper.Start(map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_WORKER_ENABLED": "true", + "WEBHOOKX_WORKER_DELIVERER_PROXY": mtlsProxyURL, + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_CERT": test.FilePath("fixtures/mtls/notfound.crt"), + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_KEY": test.FilePath("fixtures/mtls/client.key"), + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_CA_CERT": test.FilePath("fixtures/mtls/server-ca.crt"), + }) + assert.Equal(GinkgoT(), + fmt.Sprintf("failed to load client certificate: open %s: no such file or directory", test.FilePath("fixtures/mtls/notfound.crt")), + err.Error()) + }) + It("returns error when ca cert not found", func() { + _, err := helper.Start(map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_WORKER_ENABLED": "true", + "WEBHOOKX_WORKER_DELIVERER_PROXY": mtlsProxyURL, + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_CLIENT_CERT": test.FilePath("fixtures/mtls/client.crt"), + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_CLIENT_KEY": test.FilePath("fixtures/mtls/client.key"), + "WEBHOOKX_WORKER_DELIVERER_PROXY_TLS_CA_CERT": test.FilePath("fixtures/mtls/notfound.crt"), + }) + assert.Equal(GinkgoT(), + fmt.Sprintf("failed to read ca certificate: open %s: no such file or directory", test.FilePath("fixtures/mtls/notfound.crt")), + err.Error()) + }) + }) +}) diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 3d2568f3..1578152d 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -24,6 +24,17 @@ services: ports: - 9999:80 + httpsbin: + image: nginx:latest + depends_on: + - httpbin + ports: + - "9443:443" + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf:ro + - ./server.crt:/etc/nginx/certs/server.crt:ro + - ./server.key:/etc/nginx/certs/server.key:ro + otel-collector: image: otel/opentelemetry-collector-contrib volumes: diff --git a/test/fixtures/mtls/client-ca.crt b/test/fixtures/mtls/client-ca.crt new file mode 100644 index 00000000..c0bc86e3 --- /dev/null +++ b/test/fixtures/mtls/client-ca.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUYQOjr8UTj94da6sV/oeLhC2Ew3EwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTEwMjcwOTU4NTdaFw0zNTEw +MjUwOTU4NTdaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDkDK/kY66ADvClFR7CH+W3PquvrSBgybse1szv6FIX +QJDQhd5lWgf8Q0MnbA/c1l8QFNKhVqI1EP5ld3O7Ag8mJfzFqeuJRLE7wEFi06Q2 +k9HJMhDoHB6r8ZucVhNbih9oIdna8wpwfhunGT1VZvge7XsfylMrVxPQ9CCSOPGO +umYrYnQarzFIAT82ztHMI8hGwLAKHWMNl7kA7WBL6fsuRYgq85KDo7dXdVhmavBt +m2pzF5NofTmarVM797aN8DC4v1oGByyfKchTopiwrLynMehm7H2HwaWs8vGcs7+s +BfRqVP7b14x4f4ckUwPwrcekePU2FLQRFXQIQ/PL2o0rAgMBAAGjUzBRMB0GA1Ud +DgQWBBQHX5lvbf4ckPjb93iwIwaKgPFMRjAfBgNVHSMEGDAWgBQHX5lvbf4ckPjb +93iwIwaKgPFMRjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQDP +QZoWuCLTaccvCJmzX+GW+kbQmU/3SwCSqUqfM5pa/QLvNbbS0Y1BKYL4TglfRfa7 +koaNmCxRkwBlB6P4WN/uLQm1AOKDOQPxcnPQ9SKt9AX+HRwcAczsrzuUZQvpHK1H +V12Lya/TibKoZmzspS+w5j7Bpoi2xkid3lj+p03fi05dGdXGY+EVVLvjzMydXGon +IOcD26TTbkv/aXbUZFVz05p2UBOGE3rEFop+CUiFqRcpOzYXrpC2q68XAJHg8uMG +a86av5ssM0hFbb9czweIqcJmsb/Osj7VEUQufX5dUnko1A8U4bKsQLzeR9Z4+rjq +X/+30inPRZf1kmufXOww +-----END CERTIFICATE----- diff --git a/test/fixtures/mtls/client.crt b/test/fixtures/mtls/client.crt new file mode 100644 index 00000000..a5b17336 --- /dev/null +++ b/test/fixtures/mtls/client.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDRzCCAi+gAwIBAgIBATANBgkqhkiG9w0BAQsFADBFMQswCQYDVQQGEwJBVTET +MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMB4XDTI1MTAyNzA5NTkwNVoXDTM1MTAyNTA5NTkwNVowRTELMAkGA1UE +BhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdp +ZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALi6 +P8bno65jxADyS8UhourkAJCn41Vr7B2DDSY/1TuvTL8KyOGxqN+Au2frYORD6UKA +zoPQeCk+69f161MxX1EAoxDUbGm/3A/3U19vc27TZfX6dF5fUu1XYmF56740FDgB +5mJ92Fw/C9zMxokZDqLr9hWUqHu9Y2U2HH81y4IWFgRhIgp9x5RSB3/oGkgcXw/J +QrHgu1HR5iGGJReH8yFgXMBrvHH6i1PZGOM499Cvy9v6e6q/FjxFsf5i4zDJGoBQ +1Mjx+q3jx3XZomLr7L248KkbXT3LiqN+UABNYj0j8NAFeekk3QVerHQ9mguHd/Wf +ncdugqMaHZqGXns5y+MCAwEAAaNCMEAwHQYDVR0OBBYEFDXwbms6qniw8eOdZBDX +vec0amwjMB8GA1UdIwQYMBaAFAdfmW9t/hyQ+Nv3eLAjBoqA8UxGMA0GCSqGSIb3 +DQEBCwUAA4IBAQC8m0CzKU6mZCUXCrdL4mbkEbFM4NyuzcxlYLk6d2YW0W6W3mGT +TvvSdxevGv8e5KZ6QsXncsosyt3rKJ2o3Fsw2HMXwMRV9JgoYUWgSg4pg64leDCj +Ib1OcZqo2hNUufJrBxPZHGBV8DwgkA5wXunKD8HVuj+zYD8EkSo0IJx6/tNfTHYc +aAUfkkyWp78bSF0A23I5ovtc2qnorc6mvVO3wWAZ03JekyWRGFA6nXuP/ciX+l4X +7xzkOOHxMwQUpoqFn9pLZTu7ZYF3Bfy0UI2LulCyS0L+u1+4/Xd+IZ1X71/ohi/v +lFuQg8lZ2evt/m2Pu9LXCmilYHfH4jop/1RL +-----END CERTIFICATE----- diff --git a/test/fixtures/mtls/client.key b/test/fixtures/mtls/client.key new file mode 100644 index 00000000..e25b7fbf --- /dev/null +++ b/test/fixtures/mtls/client.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC4uj/G56OuY8QA +8kvFIaLq5ACQp+NVa+wdgw0mP9U7r0y/CsjhsajfgLtn62DkQ+lCgM6D0HgpPuvX +9etTMV9RAKMQ1Gxpv9wP91Nfb3Nu02X1+nReX1LtV2Jheeu+NBQ4AeZifdhcPwvc +zMaJGQ6i6/YVlKh7vWNlNhx/NcuCFhYEYSIKfceUUgd/6BpIHF8PyUKx4LtR0eYh +hiUXh/MhYFzAa7xx+otT2RjjOPfQr8vb+nuqvxY8RbH+YuMwyRqAUNTI8fqt48d1 +2aJi6+y9uPCpG109y4qjflAATWI9I/DQBXnpJN0FXqx0PZoLh3f1n53HboKjGh2a +hl57OcvjAgMBAAECggEADEgvkKSfXAazXPa/I1u4hYA/vIMCz+s2qXkWNszA+1CN +F9AMrm1pJcA4mixGQiAryQ4Wo9f2SLhUL84FLskxpfcpFzoCtCM//aEyJ2LjHIfj +hV149fGhRabtERsRvkfES4S3rW4GttdnH2+9LPLJMNbjTA1P49v9fXQmW1KXiBcv +nOVRz63Jd0YjICGZdOJJWEitKtRm9eM8jDRh3Ypk844Cgx76Iz3lZmc07okwi/sb +j1YJm24oiERI3XEdB5Pq1tPkzePrIE1VZuMgP9twTZqR7CQjdXJl+Jm0Qpzjp9gm +36L1IYfwpEKA3yRQEmDCSh3icgL1jRuiIWtMRceexQKBgQD7VH70NYL3kogrCg1u +fN3rNkb07aFK6JwBcHYQCyXBR2vfbyzJdu9f0m6rQ2u33GqvwCmaC3x2qlzZ5tZY +9X/QW10P3I/TJvlM32zKYF03M1GVL4gU4E94XLzYAdWV8LVIO/HHQirN9oafxMNv +oKKDZwLzpUVymisZP7mtXyfOLwKBgQC8KPHAso3czG+1ivumiE/iTmF00aoHwyJr +zUnbEMIDpHF/8N6OiWq0I5m+x2n2vGe/5/h9ywkIVPuS5RqcRyg66/mchgv0/xTQ +XBrEynsgjsNSiViwc1TrJNq5IFRJw8vxT/CNmQLE33LIvD+7w+ZY211JALhD2GcY +PBTCiZOEjQKBgDMP6LcvBAvOnpG3+iCfh+rY3TO379QrTD7SnXoG+cW6AAWmLcBE +xL+AHnH3QbRaOOa6MPmWKdRmKnUu/A+Y2T34wgCN/D6XJYFjx1OannWvnHyl6ozr +QdofZVKxlLZg8EPbwfSM0euEkbd2H4rXZQ0zaZsc0e5Fuknn845wzcKLAoGBAKpS +deP0vS2tcUFoebuZkJZOVTGlyMAWB0aGIeDHHpildohVxWBJS+mcgEONx4Gtskyo +8usLqzV7l+60rI3ia6xKhz0EqjYv4OtrNGAG2cXy9SP1Z+7xt2DTj5ocha/wKOBb +eGj0pOkJS6IhpZ+WCSFOEPdQS3w+m7P4TuJ6HqrRAoGBALD36isPiolbhF3H9A0Z +CaymMne7tWexEjRj5jAgyl527soZ85Q+tZq5gc/wdR+RtJpM2Mic82aPLDlW53cn +iYTfNsMa9OM+pV+GnXVT1ynYrMYH205Wewgwf3Rve+8XCCLc6RaPHuAyTkCNHwo6 +5RUANpigw2zontXsIR8k/AA8 +-----END PRIVATE KEY----- diff --git a/test/fixtures/mtls/server-ca.crt b/test/fixtures/mtls/server-ca.crt new file mode 100644 index 00000000..39e48e4d --- /dev/null +++ b/test/fixtures/mtls/server-ca.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUNqUgZJu3fal+yfWlhhpn47cobkcwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTEwMjcwOTU4NTJaFw0zNTEw +MjUwOTU4NTJaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQC1eoyewu/5rU4okGNwtt233HN9mSlIx6SmRh3y6KfY +Rl/RZ1z1+Ghcc/1XctJmb7K9tVXyBzRN+83eT/FXP0whqmCAaY8h0CNgJkOO3Oo0 +4UN0dPZYrvexR7A0KmHUHxD/o9W6eszKtXYluY0Gv0V9KKZtU6dPl+k4TRIPTmdC +DjujIMyw2obQob1Y0KQVW8VUGj/VLGlwjdZ7QEaZ7tonRMmLyQmscgS/R1sbXf4B +zma7nxaxCL5rbSoSOTF/6kgzT70hGE8LoHsZxo4v9y3DlcVbszfQJDj0bdGEIYBM +qXEtbPhvIZMqQuoNfP1ebw9NJ+M8mRPuyC8dm7167IkPAgMBAAGjUzBRMB0GA1Ud +DgQWBBQWj+oDMvcqDSrxNnK7CbkdPqqeAjAfBgNVHSMEGDAWgBQWj+oDMvcqDSrx +NnK7CbkdPqqeAjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAN +ob2pVAtmTJiEQDWcFcP5kb+W0IANYfhXxZtTn3YDmy66zgQzVKi8mh1DECXe+g8m +AaAap9zclD+YUsprCatBVPhhC1QBcxgQEL+WE4D2uQM+nck/XqH5Okwav/TJeReH +kw38QRLVTMjnx8GkZE5w5RIWq+mgUm2A/O1pCaAESVqgP3If9AV0GBPW6MzNGKac +fmikMFvgl20Ou0RIfBsGaSUkxzcipH5hx/CSS0NnVZxfGPn4QFQp/8T7FAR0uFQT +YbC8V3p/6ji2xV+ZdEpxMbOcjaLzqXExKYC26cwm9vm+YtYEcihN4ZcbRQF9QPiL ++fOKD702GxSEafU/pg/a +-----END CERTIFICATE----- diff --git a/test/fixtures/mtls/server.crt b/test/fixtures/mtls/server.crt new file mode 100644 index 00000000..06293e91 --- /dev/null +++ b/test/fixtures/mtls/server.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDdTCCAl2gAwIBAgIBATANBgkqhkiG9w0BAQsFADBFMQswCQYDVQQGEwJBVTET +MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMB4XDTI1MTAyNzA5NTkwNVoXDTM1MTAyNTA5NTkwNVowRTELMAkGA1UE +BhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdp +ZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAJcS +6xJKxZPM766wGf5ytOpOTXfAy5HEgp8U9gF69XSAMnDJV+9Cz0r5WKGR3YRbTyXn +f5tufeYoBsFMbsSYwVJImZKJSno+9blnU6fta7jrwNgorN2/BcjxrgcHVMgTlw3u +yP5c+A9qKt+KjrHsJoD9Xow5ZcxPqzY0pB+Sv1UXaWz9Iyt841sVjvt4XEBHwMUW +oLjQ5DkvsLdMed/rN1xnBv4GzfOm+0rI3b4gQH+37uyaCQFKXPC14IdAjiAvVI4h +YSkt9wsHhzm3dbBlBV/JfrWcPYrKkPyTpuaB6YOBkprNTkQjB2cSx4xTWhPvNYti +DyYKhP7o46xCNMaZlBkCAwEAAaNwMG4wHwYDVR0jBBgwFoAUFo/qAzL3Kg0q8TZy +uwm5HT6qngIwCQYDVR0TBAIwADALBgNVHQ8EBAMCBPAwFAYDVR0RBA0wC4IJbG9j +YWxob3N0MB0GA1UdDgQWBBRJPI8RYUtw5k5oF6TJPM1ZlNMm/DANBgkqhkiG9w0B +AQsFAAOCAQEANrGYVlBmC0X4/oTtHRyJRhIG7WVFJscrTIq0dSYFGg1JCPf7STyh +fL02/JQ5Kfg9OVt/8T0/eZsVON+EPME0nt2LOaMeEyDwSW/pUj/BqrmZ+JHgkALS +xJtPrV1oumzgkf2fZXHKKfYHiy/jk1hbFUnhZpl82XpoXGHtGxHJrlWsFhGd9YB/ +kWBDR+Yk4Yc8DYDRuVOTPPyCj09bdoPSHQd+fVs09YLQ2o6gYGA9zeCf0iv7+1se +q3viQhtmZ7l4Oo4YY8O4UOq3XZaYymR440qnOmw9W3t16cnVnD5DtIKLfck/trKm +LubuuUEGsHMfCWpPZBRhegGOZv5hldJJRA== +-----END CERTIFICATE----- diff --git a/test/fixtures/mtls/server.key b/test/fixtures/mtls/server.key new file mode 100644 index 00000000..9d7d647b --- /dev/null +++ b/test/fixtures/mtls/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCXEusSSsWTzO+u +sBn+crTqTk13wMuRxIKfFPYBevV0gDJwyVfvQs9K+Vihkd2EW08l53+bbn3mKAbB +TG7EmMFSSJmSiUp6PvW5Z1On7Wu468DYKKzdvwXI8a4HB1TIE5cN7sj+XPgPairf +io6x7CaA/V6MOWXMT6s2NKQfkr9VF2ls/SMrfONbFY77eFxAR8DFFqC40OQ5L7C3 +THnf6zdcZwb+Bs3zpvtKyN2+IEB/t+7smgkBSlzwteCHQI4gL1SOIWEpLfcLB4c5 +t3WwZQVfyX61nD2KypD8k6bmgemDgZKazU5EIwdnEseMU1oT7zWLYg8mCoT+6OOs +QjTGmZQZAgMBAAECggEALQnUzuU/tep44ilZ9oOX7+pcKgFuLwzYrDiBhrtzhcHa +R8mez5OpXP6tL63ezmCyXeiAIIR2QDFaojH5K98mczN1pTwM2hj/BMELLZsYbE7M +dSTbNFiIjvmOGkZTPjqo49x2S27H/UB3e6FBHUX7zKS8lS1fbeOqdUSIUWlcZS9a +Ikci5ONdXNVz/QbEzAMhXQ2dcVwNZdz4y2OlbjN6/lP63LikivWOjgTy5ev0UATX +EU6QaEeh0jTpTom1PvDSW/8EYxCZqodVdfKpTznf1g0h5uT8EGkGxOs2ap0/rZP2 +ebKNP217llKcas95KJ2wl0nKtwaQ5uWoq3cdS58SCQKBgQDRE5BF8wGhPWF2K7df +OkFsZeW05E8lanz4fO21skwnf6J9wINz8vkgeyGUH0w+fiFUIKv+F8AZS+cj1itj +yjgVPKLj17K8Hhv03boYgmlIXrawIUTscqGILFWV2CMX6brjiqGQHQBs1Q1WYmC4 +1Bdyy938TnXaP8KXv2CWyX0LwwKBgQC4+tVhxFYEZlw4ObflvjJjkYuBr4mmQQ2r +dhYdR0Oxyaa5my1vE7Pw1EWpIKNk07AJK+zDJqaDJDVWXfjkhCo51Kj2nyrSllMb +8mFs+6CbwCZvdCY4uzjLakBho8wyLsjORzb/qp+/SWCgf4/jeBg1xtkwQo9z/Asg +l9kJvEZO8wKBgAdroFU7OLWWTh05k/qHQMcuHqb662wyiVjwZidqupU0THoWGRRG +bV0fwaNWMQiOxXQM7M3J3gGH1h5JfaS/CpqGWmmnwCo5D1jzfaVdC4uMAQPjSmTx +9JW2rRryXtx8aSumQfGxddBnB2AngbNNo79pSOmphzlFxgxIuI7he9StAoGAF6zj +CrRaXg3L19ZrVxhU0rGaLWsOLx08ZqmigvTQET1B/ZeC5Sica0J/9/mZcBo3+bSJ +hSC5RyenO/qjFHxl+yjgx0/v5yweTwFivtQl5kldof43tiMgTci3nMeeJv4d7Wjn +/SkVcSIvH9uzyuVgE+Hzgl3ChpHHytAkkz5psUkCgYADfEgFyntUi93St4IDDIMr +BifnnKLZ9epPMhb+b8QcsqvrKDQyngMhF+mhMNomnpku6SUTwSrkMW9r5J4rhX/k +lMu2mIUVzaO9lmakVGkbNz4VDn7/DoMP1ihraIDkR4+fg0l41Y6w65XCwGoMUXiU +bDMj+j/9bfHl2zwo4Sxbag== +-----END PRIVATE KEY----- diff --git a/test/helper/helper.go b/test/helper/helper.go index 0b18d037..6a9bf43f 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -16,6 +16,7 @@ import ( "github.com/webhookx-io/webhookx/eventbus" "github.com/webhookx-io/webhookx/pkg/log" "github.com/webhookx-io/webhookx/test" + "maps" "os" "regexp" "time" @@ -35,6 +36,12 @@ var defaultEnvs = map[string]string{ "WEBHOOKX_WORKER_DELIVERER_ACL_DENY": "", } +func unsetEnvs(envs map[string]string) { + for k := range envs { + os.Unsetenv(k) + } +} + func setEnvs(envs map[string]string) error { for name, value := range envs { if err := os.Setenv(name, value); err != nil { @@ -45,18 +52,26 @@ func setEnvs(envs map[string]string) error { } // Start starts WebhookX with given environment variables -func Start(envs map[string]string) (*app.Application, error) { - if err := setEnvs(defaultEnvs); err != nil { - return nil, err - } +func Start(envs map[string]string) (application *app.Application, err error) { + environments := maps.Clone(defaultEnvs) + maps.Copy(environments, envs) - if err := setEnvs(envs); err != nil { - return nil, err + defer func() { + if err != nil { + unsetEnvs(environments) + } + }() + + if err = setEnvs(environments); err != nil { + return } cfg, err := config.Init() if err != nil { - return nil, err + return + } + if err = cfg.Validate(); err != nil { + return } if _, err := os.Stat(defaultEnvs["WEBHOOKX_LOG_FILE"]); err == nil { @@ -67,23 +82,21 @@ func Start(envs map[string]string) (*app.Application, error) { TruncateFile(defaultEnvs["WEBHOOKX_ACCESS_LOG_FILE"]) } - app, err := app.New(cfg) + application, err = app.New(cfg) if err != nil { - return nil, err + return } - if err := app.Start(); err != nil { + if err := application.Start(); err != nil { return nil, err } go func() { - app.Wait() - for name := range envs { - os.Unsetenv(name) - } + application.Wait() + unsetEnvs(environments) }() time.Sleep(time.Second) - return app, nil + return application, nil } func AdminClient() *resty.Client { diff --git a/test/nginx.conf b/test/nginx.conf new file mode 100644 index 00000000..760145f1 --- /dev/null +++ b/test/nginx.conf @@ -0,0 +1,18 @@ +events {} + +http { + server { + listen 443 ssl; + server_name localhost; + + ssl_certificate /etc/nginx/certs/server.crt; + ssl_certificate_key /etc/nginx/certs/server.key; + + location / { + proxy_pass http://httpbin:80; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + } +} diff --git a/test/worker/requeue_test.go b/test/worker/requeue_test.go index 316c7413..e2297eba 100644 --- a/test/worker/requeue_test.go +++ b/test/worker/requeue_test.go @@ -53,7 +53,7 @@ var _ = Describe("processRequeue", Ordered, func() { w = worker.NewWorker(worker.Options{ RequeueJobInterval: time.Second, DB: db, - Deliverer: deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), + Deliverer: deliverer.NewHTTPDeliverer(deliverer.Options{}), Metrics: metrics, Tracer: tracer, EventBus: mocks.MockBus{}, diff --git a/worker/deliverer/deliverer.go b/worker/deliverer/deliverer.go index 2990b439..29e743cc 100644 --- a/worker/deliverer/deliverer.go +++ b/worker/deliverer/deliverer.go @@ -25,13 +25,14 @@ type AclDecision struct { } type Response struct { - Request *Request - ACL AclDecision - StatusCode int - Header http.Header - ResponseBody []byte - Error error - Latancy time.Duration + Request *Request + ACL AclDecision + StatusCode int + Header http.Header + ResponseBody []byte + Error error + Latancy time.Duration + ProxyStatusCode int } func (r *Response) Is2xx() bool { diff --git a/worker/deliverer/http.go b/worker/deliverer/http.go index 4cbdf6fe..5b71cf02 100644 --- a/worker/deliverer/http.go +++ b/worker/deliverer/http.go @@ -3,13 +3,17 @@ package deliverer import ( "bytes" "context" + "crypto/tls" + "crypto/x509" "fmt" - "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/constants" + "go.uber.org/zap" "io" "net" "net/http" "net/netip" + "net/url" + "os" "time" ) @@ -18,12 +22,14 @@ type Resolver interface { } var DefaultResolver Resolver = net.DefaultResolver +var DefaultTLSConfig *tls.Config = nil type contextKey struct{} // HTTPDeliverer delivers via HTTP type HTTPDeliverer struct { - defaultTimeout time.Duration + log *zap.SugaredLogger + requestTimeout time.Duration client *http.Client } @@ -54,25 +60,107 @@ func restrictedDialFunc(acl *ACL) func(context.Context, string, string) (net.Con } } -func NewHTTPDeliverer(cfg *config.WorkerDeliverer) *HTTPDeliverer { +type ProxyOptions struct { + URL string + TLSCert string + TLSKey string + TLSCaCertificate string + TLSVerify bool +} + +type AccessControlOptions struct { + Deny []string +} + +type Options struct { + Logger *zap.SugaredLogger + RequestTimeout time.Duration + AccessControlOptions AccessControlOptions +} + +func NewHTTPDeliverer(opts Options) *HTTPDeliverer { transport := &http.Transport{ MaxIdleConns: 1000, MaxIdleConnsPerHost: 1000, IdleConnTimeout: 30 * time.Second, TLSHandshakeTimeout: 5 * time.Second, ExpectContinueTimeout: 1 * time.Second, - DialContext: restrictedDialFunc(NewACL(AclOptions{Rules: cfg.ACL.Deny})), + DialContext: restrictedDialFunc(NewACL(AclOptions{Rules: opts.AccessControlOptions.Deny})), + TLSClientConfig: DefaultTLSConfig, } client := &http.Client{ Transport: transport, } return &HTTPDeliverer{ - defaultTimeout: time.Duration(cfg.Timeout) * time.Millisecond, + log: opts.Logger, + requestTimeout: opts.RequestTimeout, client: client, } } +func (d *HTTPDeliverer) SetupProxy(opts ProxyOptions) error { + proxyURL, err := url.Parse(opts.URL) + if err != nil { + return fmt.Errorf("invalid proxy url '%s': %s", opts.URL, err) + } + + transport := d.client.Transport.(*http.Transport) + + transport.Proxy = http.ProxyURL(proxyURL) + transport.DialContext = nil + transport.OnProxyConnectResponse = func(ctx context.Context, proxyURL *url.URL, connectReq *http.Request, connectRes *http.Response) error { + if connectRes.StatusCode != 200 { + if res, ok := ctx.Value(contextKey{}).(*Response); ok { + res.ProxyStatusCode = connectRes.StatusCode + } + } + return nil + } + + if proxyURL.Scheme == "https" { + tlsConfig := &tls.Config{ + ServerName: proxyURL.Hostname(), + InsecureSkipVerify: opts.TLSVerify, + } + if opts.TLSCert != "" || opts.TLSKey != "" { + cert, err := tls.LoadX509KeyPair(opts.TLSCert, opts.TLSKey) + if err != nil { + return fmt.Errorf("failed to load client certificate: %s", err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + if opts.TLSCaCertificate != "" { + caPEM, err := os.ReadFile(opts.TLSCaCertificate) + if err != nil { + return fmt.Errorf("failed to read ca certificate: %s", err) + } + cp := x509.NewCertPool() + if !cp.AppendCertsFromPEM(caPEM) { + return fmt.Errorf("failed to append ca certificate to pool") + } + tlsConfig.RootCAs = cp + } + transport.DialTLSContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + dialer := &net.Dialer{} + conn, err := dialer.DialContext(ctx, "tcp", addr) + if err != nil { + return nil, err + } + tlsConn := tls.Client(conn, tlsConfig) + if err := tlsConn.HandshakeContext(ctx); err != nil { + _ = conn.Close() + return nil, err + } + return tlsConn, nil + } + } + + d.log.Infow("proxy enabled", "proxy_url", opts.URL) + + return nil +} + func timing(fn func()) time.Duration { start := time.Now() fn() @@ -83,7 +171,7 @@ func timing(fn func()) time.Duration { func (d *HTTPDeliverer) Deliver(ctx context.Context, req *Request) (res *Response) { timeout := req.Timeout if timeout == 0 { - timeout = d.defaultTimeout + timeout = d.requestTimeout } ctx, cancel := context.WithTimeout(ctx, timeout) diff --git a/worker/deliverer/http_test.go b/worker/deliverer/http_test.go index 47bfa443..db17c962 100644 --- a/worker/deliverer/http_test.go +++ b/worker/deliverer/http_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "github.com/stretchr/testify/assert" - "github.com/webhookx-io/webhookx/config" "io" "net/http" "net/http/httptest" @@ -39,10 +38,7 @@ func Test(t *testing.T) { defer server.Close() t.Run("sanity", func(t *testing.T) { - cfg := config.WorkerDeliverer{ - Timeout: 10 * 1000, - } - deliverer := NewHTTPDeliverer(&cfg) + deliverer := NewHTTPDeliverer(Options{RequestTimeout: time.Second * 10}) req := &Request{ URL: server.URL, @@ -65,10 +61,7 @@ func Test(t *testing.T) { }) t.Run("should fail with DeadlineExceeded error", func(t *testing.T) { - cfg := config.WorkerDeliverer{ - Timeout: 10 * 1000, - } - deliverer := NewHTTPDeliverer(&cfg) + deliverer := NewHTTPDeliverer(Options{RequestTimeout: time.Millisecond * 10 * 1000}) req := &Request{ URL: server.URL,