From b0f9001a80fe74ef6963996bb0021a4c4dfb952c Mon Sep 17 00:00:00 2001 From: David Zhao Date: Thu, 8 Aug 2024 13:14:20 -0700 Subject: [PATCH 1/2] Use multiple workers for webhooks, while still ensuring order within a session --- go.mod | 2 +- go.sum | 4 +-- webhook/url_notifier.go | 35 ++++++++++++++++++---- webhook/webhook_test.go | 65 +++++++++++++++++++++++++++++++++++++---- 4 files changed, 92 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index b718cd86d..b30654b90 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/benbjohnson/clock v1.3.5 github.com/bufbuild/protoyaml-go v0.1.9 github.com/eapache/channels v1.1.0 - github.com/frostbyte73/core v0.0.10 + github.com/frostbyte73/core v0.0.12 github.com/fsnotify/fsnotify v1.7.0 github.com/gammazero/deque v0.2.1 github.com/go-jose/go-jose/v3 v3.0.3 diff --git a/go.sum b/go.sum index dcb62c34d..0c99c495c 100644 --- a/go.sum +++ b/go.sum @@ -29,8 +29,8 @@ github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= -github.com/frostbyte73/core v0.0.10 h1:D4DQXdPb8ICayz0n75rs4UYTXrUSdxzUfeleuNJORsU= -github.com/frostbyte73/core v0.0.10/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU= +github.com/frostbyte73/core v0.0.12 h1:kySA8+Os6eqnPFoExD2T7cehjSAY1MRyIViL0yTy2uc= +github.com/frostbyte73/core v0.0.12/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= diff --git a/webhook/url_notifier.go b/webhook/url_notifier.go index 03ba4cdf9..4afc235cb 100644 --- a/webhook/url_notifier.go +++ b/webhook/url_notifier.go @@ -31,6 +31,10 @@ import ( "github.com/livekit/protocol/logger" ) +const ( + numWorkers = 10 +) + type URLNotifierParams struct { HTTPClientParams Logger logger.Logger @@ -56,7 +60,7 @@ type URLNotifier struct { params URLNotifierParams client *retryablehttp.Client dropped atomic.Int32 - worker core.QueueWorker + pool core.QueuePool } func NewURLNotifier(params URLNotifierParams) *URLNotifier { @@ -85,7 +89,8 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier { client: rhc, } n.client.Logger = &logAdapter{} - n.worker = core.NewQueueWorker(core.QueueWorkerParams{ + + n.pool = core.NewQueuePool(numWorkers, core.QueueWorkerParams{ QueueSize: params.QueueSize, DropWhenFull: true, OnDropped: func() { n.dropped.Inc() }, @@ -102,7 +107,8 @@ func (n *URLNotifier) SetKeys(apiKey, apiSecret string) { func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error { enqueuedAt := time.Now() - n.worker.Submit(func() { + + n.pool.Submit(n.eventKey(event), func() { fields := logFields(event) fields = append(fields, "url", n.params.URL, @@ -121,11 +127,30 @@ func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error { return nil } +func (c *URLNotifier) eventKey(event *livekit.WebhookEvent) string { + if event.EgressInfo != nil { + return event.EgressInfo.EgressId + } + if event.IngressInfo != nil { + return event.IngressInfo.IngressId + } + if event.Room != nil { + return event.Room.Name + } + if event.Participant != nil { + return event.Participant.Identity + } + if event.Track != nil { + return event.Track.Sid + } + return "default" +} + func (n *URLNotifier) Stop(force bool) { if force { - n.worker.Kill() + n.pool.Kill() } else { - n.worker.Drain() + n.pool.Drain() } } diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go index 1ec111c2e..42cb3f4ab 100644 --- a/webhook/webhook_test.go +++ b/webhook/webhook_test.go @@ -61,7 +61,7 @@ func TestWebHook(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - s.handler = func(r *http.Request) { + s.handler = func(w http.ResponseWriter, r *http.Request) { defer wg.Done() decodedEvent, err := ReceiveWebhookEvent(r, authProvider) require.NoError(t, err) @@ -83,7 +83,7 @@ func TestURLNotifierDropped(t *testing.T) { defer urlNotifier.Stop(true) totalDropped := atomic.Int32{} totalReceived := atomic.Int32{} - s.handler = func(r *http.Request) { + s.handler = func(w http.ResponseWriter, r *http.Request) { decodedEvent, err := ReceiveWebhookEvent(r, authProvider) require.NoError(t, err) totalReceived.Inc() @@ -116,7 +116,7 @@ func TestURLNotifierLifecycle(t *testing.T) { t.Run("stop allowing to drain", func(t *testing.T) { urlNotifier := newTestNotifier() numCalled := atomic.Int32{} - s.handler = func(r *http.Request) { + s.handler = func(w http.ResponseWriter, r *http.Request) { numCalled.Inc() } for i := 0; i < 10; i++ { @@ -130,7 +130,7 @@ func TestURLNotifierLifecycle(t *testing.T) { t.Run("force stop", func(t *testing.T) { urlNotifier := newTestNotifier() numCalled := atomic.Int32{} - s.handler = func(r *http.Request) { + s.handler = func(w http.ResponseWriter, r *http.Request) { numCalled.Inc() } for i := 0; i < 10; i++ { @@ -141,6 +141,59 @@ func TestURLNotifierLifecycle(t *testing.T) { time.Sleep(time.Second) require.Greater(t, int32(20), numCalled.Load()) }) + + t.Run("times out after accepting connection", func(t *testing.T) { + urlNotifier := NewURLNotifier(URLNotifierParams{ + QueueSize: 20, + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + HTTPClientParams: HTTPClientParams{ + RetryWaitMax: time.Millisecond, + MaxRetries: 1, + ClientTimeout: 100 * time.Millisecond, + }, + }) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Write([]byte("ok")) + + // delay the request to cause it to fail + time.Sleep(time.Second) + if r.Context().Err() == nil { + // inc if not canceled + numCalled.Inc() + } + } + defer urlNotifier.Stop(false) + + err := urlNotifier.send(&livekit.WebhookEvent{Event: EventRoomStarted}) + require.Error(t, err) + }) + + t.Run("times out before connection", func(t *testing.T) { + ln, err := net.Listen("tcp", ":9987") + require.NoError(t, err) + defer ln.Close() + urlNotifier := NewURLNotifier(URLNotifierParams{ + URL: "http://localhost:9987", + APIKey: apiKey, + APISecret: apiSecret, + HTTPClientParams: HTTPClientParams{ + RetryWaitMax: time.Millisecond, + MaxRetries: 1, + ClientTimeout: 100 * time.Millisecond, + }, + }) + defer urlNotifier.Stop(false) + + startedAt := time.Now() + err = urlNotifier.send(&livekit.WebhookEvent{Event: EventRoomStarted}) + require.Error(t, err) + require.Less(t, time.Since(startedAt).Seconds(), float64(2)) + }) } func newTestNotifier() *URLNotifier { @@ -153,7 +206,7 @@ func newTestNotifier() *URLNotifier { } type testServer struct { - handler func(r *http.Request) + handler func(w http.ResponseWriter, r *http.Request) server *http.Server } @@ -168,7 +221,7 @@ func newServer(addr string) *testServer { func (s *testServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.handler != nil { - s.handler(r) + s.handler(w, r) } } From 7d08b52cb02d78ff25adf9b8b387b737d3c5ba89 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Thu, 8 Aug 2024 13:16:56 -0700 Subject: [PATCH 2/2] changeset --- .changeset/swift-maps-applaud.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/swift-maps-applaud.md diff --git a/.changeset/swift-maps-applaud.md b/.changeset/swift-maps-applaud.md new file mode 100644 index 000000000..00a76dd34 --- /dev/null +++ b/.changeset/swift-maps-applaud.md @@ -0,0 +1,5 @@ +--- +"github.com/livekit/protocol": patch +--- + +Use multiple webhook workers for each URL to improve parallelism