From 6e22724b7e8fed134f9eb2e435e0cd30f8f4d276 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Fri, 7 Mar 2025 23:23:52 +0530 Subject: [PATCH 1/3] Support filtering webhook events. --- webhook/filter.go | 41 +++++++ webhook/notifier.go | 5 + webhook/resource_url_notifier.go | 14 ++- webhook/url_notifier.go | 10 ++ webhook/webhook_test.go | 195 +++++++++++++++++++++++++++++++ 5 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 webhook/filter.go diff --git a/webhook/filter.go b/webhook/filter.go new file mode 100644 index 000000000..7898a6b74 --- /dev/null +++ b/webhook/filter.go @@ -0,0 +1,41 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhook + +import "slices" + +type filterParams struct { + Includes []string + Excludes []string +} + +type filter struct { + params filterParams +} + +func newFilter(params filterParams) *filter { + return &filter{ + params: params, + } +} + +func (f *filter) IsAllowed(event string) bool { + // includes get higher precendence than excludes + if slices.Contains(f.params.Includes, event) { + return true + } + + return !slices.Contains(f.params.Excludes, event) +} diff --git a/webhook/notifier.go b/webhook/notifier.go index 12bb8898b..93cb57ca8 100644 --- a/webhook/notifier.go +++ b/webhook/notifier.go @@ -84,6 +84,11 @@ type HTTPClientParams struct { ClientTimeout time.Duration } +type FilterParams struct { + IncludeEvents []string + ExcludeEvents []string +} + // --------------------------------- type logAdapter struct{} diff --git a/webhook/resource_url_notifier.go b/webhook/resource_url_notifier.go index 89bc51198..69a00c2ad 100644 --- a/webhook/resource_url_notifier.go +++ b/webhook/resource_url_notifier.go @@ -63,6 +63,7 @@ type ResourceURLNotifierParams struct { APIKey string APISecret string FieldsHook func(whi *livekit.WebhookInfo) + FilterParams } // ResourceURLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL. @@ -78,7 +79,10 @@ type ResourceURLNotifier struct { resourceQueues map[string]*resourceQueueInfo resourceQueueTimeoutQueue utils.TimeoutQueue[*resourceQueueInfo] - closed core.Fuse + + filter *filter + + closed core.Fuse } func NewResourceURLNotifier(params ResourceURLNotifierParams) *ResourceURLNotifier { @@ -114,6 +118,10 @@ func NewResourceURLNotifier(params ResourceURLNotifierParams) *ResourceURLNotifi params: params, client: rhc, resourceQueues: make(map[string]*resourceQueueInfo), + filter: newFilter(filterParams{ + Includes: params.IncludeEvents, + Excludes: params.ExcludeEvents, + }), } go r.sweeper() @@ -141,6 +149,10 @@ func (r *ResourceURLNotifier) getProcessedHook() func(ctx context.Context, whi * } func (r *ResourceURLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error { + if !r.filter.IsAllowed(event.Event) { + return nil + } + if r.closed.IsBroken() { return errClosed } diff --git a/webhook/url_notifier.go b/webhook/url_notifier.go index b8a866e5b..ef0e2d379 100644 --- a/webhook/url_notifier.go +++ b/webhook/url_notifier.go @@ -45,6 +45,7 @@ type URLNotifierParams struct { APIKey string APISecret string FieldsHook func(whi *livekit.WebhookInfo) + FilterParams } // URLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL. @@ -56,6 +57,7 @@ type URLNotifier struct { dropped atomic.Int32 pool core.QueuePool processedHook func(ctx context.Context, whi *livekit.WebhookInfo) + filter *filter } func NewURLNotifier(params URLNotifierParams) *URLNotifier { @@ -82,6 +84,10 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier { n := &URLNotifier{ params: params, client: rhc, + filter: newFilter(filterParams{ + Includes: params.IncludeEvents, + Excludes: params.ExcludeEvents, + }), } n.client.Logger = &logAdapter{} @@ -112,6 +118,10 @@ func (n *URLNotifier) getProcessedHook() func(ctx context.Context, whi *livekit. } func (n *URLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error { + if !n.filter.IsAllowed(event.Event) { + return nil + } + enqueuedAt := time.Now() key := eventKey(event) diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go index 3fa746cd1..feaf34032 100644 --- a/webhook/webhook_test.go +++ b/webhook/webhook_test.go @@ -198,6 +198,102 @@ func TestURLNotifierLifecycle(t *testing.T) { }) } +func TestURLNotifierFilter(t *testing.T) { + s := newServer(testAddr) + require.NoError(t, s.Start()) + defer s.Stop() + + t.Run("includes", func(t *testing.T) { + urlNotifier := NewURLNotifier(URLNotifierParams{ + QueueSize: 20, + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + FilterParams: FilterParams{ + IncludeEvents: []string{EventRoomStarted}, + }, + }) + defer urlNotifier.Stop(false) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + numCalled.Inc() + } + + // as there is no explicit ExcludeEvents, EventRoomFinished should be allowed + _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) + require.Eventually( + t, + func() bool { + return numCalled.Load() == 2 + }, + 5*time.Second, + webhookCheckInterval, + ) + }) + + t.Run("excludes", func(t *testing.T) { + urlNotifier := NewURLNotifier(URLNotifierParams{ + QueueSize: 20, + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + FilterParams: FilterParams{ + ExcludeEvents: []string{EventRoomStarted}, + }, + }) + defer urlNotifier.Stop(false) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + numCalled.Inc() + } + + _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) + require.Eventually( + t, + func() bool { + return numCalled.Load() == 1 + }, + 5*time.Second, + webhookCheckInterval, + ) + }) + + t.Run("includes + excludes", func(t *testing.T) { + urlNotifier := NewURLNotifier(URLNotifierParams{ + QueueSize: 20, + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + FilterParams: FilterParams{ + IncludeEvents: []string{EventRoomStarted}, + ExcludeEvents: []string{EventRoomStarted, EventRoomFinished}, + }, + }) + defer urlNotifier.Stop(false) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + numCalled.Inc() + } + + // EventRoomStarted should be allowed as IncludeEvents take precedence + _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) + require.Eventually( + t, + func() bool { + return numCalled.Load() == 1 + }, + 5*time.Second, + webhookCheckInterval, + ) + }) +} + func newTestNotifier() *URLNotifier { return NewURLNotifier(URLNotifierParams{ QueueSize: 20, @@ -532,6 +628,105 @@ func TestResourceURLNotifierLifecycle(t *testing.T) { }) } +func TestResourceURLNotifierFilter(t *testing.T) { + s := newServer(testAddr) + require.NoError(t, s.Start()) + defer s.Stop() + + t.Run("includes", func(t *testing.T) { + resourceURLNotifier := NewResourceURLNotifier(ResourceURLNotifierParams{ + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + MaxAge: 200 * time.Millisecond, + MaxDepth: 50, + FilterParams: FilterParams{ + IncludeEvents: []string{EventRoomStarted}, + }, + }) + defer resourceURLNotifier.Stop(false) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + numCalled.Inc() + } + + // as there is no explicit ExcludeEvents, EventRoomFinished should be allowed + _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) + _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) + require.Eventually( + t, + func() bool { + return numCalled.Load() == 2 + }, + 5*time.Second, + webhookCheckInterval, + ) + }) + + t.Run("excludes", func(t *testing.T) { + resourceURLNotifier := NewResourceURLNotifier(ResourceURLNotifierParams{ + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + MaxAge: 200 * time.Millisecond, + MaxDepth: 50, + FilterParams: FilterParams{ + ExcludeEvents: []string{EventRoomStarted}, + }, + }) + defer resourceURLNotifier.Stop(false) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + numCalled.Inc() + } + + _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) + _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) + require.Eventually( + t, + func() bool { + return numCalled.Load() == 1 + }, + 5*time.Second, + webhookCheckInterval, + ) + }) + + t.Run("includes + excludes", func(t *testing.T) { + resourceURLNotifier := NewResourceURLNotifier(ResourceURLNotifierParams{ + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + MaxAge: 200 * time.Millisecond, + MaxDepth: 50, + FilterParams: FilterParams{ + IncludeEvents: []string{EventRoomStarted}, + ExcludeEvents: []string{EventRoomStarted, EventRoomFinished}, + }, + }) + defer resourceURLNotifier.Stop(false) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + numCalled.Inc() + } + + // EventRoomStarted should be allowed as IncludeEvents take precedence + _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) + _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) + require.Eventually( + t, + func() bool { + return numCalled.Load() == 1 + }, + 5*time.Second, + webhookCheckInterval, + ) + }) +} + func newTestResourceNotifier(timeout time.Duration, maxAge time.Duration, maxDepth int) *ResourceURLNotifier { return NewResourceURLNotifier(ResourceURLNotifierParams{ URL: testUrl, From d596318f1011d1c69661490f5393b62f61e5c06f Mon Sep 17 00:00:00 2001 From: boks1971 Date: Fri, 7 Mar 2025 23:30:19 +0530 Subject: [PATCH 2/3] test --- webhook/filter.go | 11 +++++--- webhook/webhook_test.go | 60 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 7 deletions(-) diff --git a/webhook/filter.go b/webhook/filter.go index 7898a6b74..8da02dff6 100644 --- a/webhook/filter.go +++ b/webhook/filter.go @@ -33,9 +33,14 @@ func newFilter(params filterParams) *filter { func (f *filter) IsAllowed(event string) bool { // includes get higher precendence than excludes - if slices.Contains(f.params.Includes, event) { - return true + if len(f.params.Includes) != 0 { + return slices.Contains(f.params.Includes, event) } - return !slices.Contains(f.params.Excludes, event) + if len(f.params.Excludes) != 0 { + return !slices.Contains(f.params.Excludes, event) + } + + // default allow + return true } diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go index feaf34032..b68c427f4 100644 --- a/webhook/webhook_test.go +++ b/webhook/webhook_test.go @@ -203,6 +203,32 @@ func TestURLNotifierFilter(t *testing.T) { require.NoError(t, s.Start()) defer s.Stop() + t.Run("none", func(t *testing.T) { + urlNotifier := NewURLNotifier(URLNotifierParams{ + QueueSize: 20, + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + }) + defer urlNotifier.Stop(false) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + numCalled.Inc() + } + + _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) + require.Eventually( + t, + func() bool { + return numCalled.Load() == 2 + }, + 5*time.Second, + webhookCheckInterval, + ) + }) + t.Run("includes", func(t *testing.T) { urlNotifier := NewURLNotifier(URLNotifierParams{ QueueSize: 20, @@ -220,13 +246,12 @@ func TestURLNotifierFilter(t *testing.T) { numCalled.Inc() } - // as there is no explicit ExcludeEvents, EventRoomFinished should be allowed _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) _ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) require.Eventually( t, func() bool { - return numCalled.Load() == 2 + return numCalled.Load() == 1 }, 5*time.Second, webhookCheckInterval, @@ -633,6 +658,34 @@ func TestResourceURLNotifierFilter(t *testing.T) { require.NoError(t, s.Start()) defer s.Stop() + t.Run("none", func(t *testing.T) { + resourceURLNotifier := NewResourceURLNotifier(ResourceURLNotifierParams{ + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + MaxAge: 200 * time.Millisecond, + MaxDepth: 50, + FilterParams: FilterParams{}, + }) + defer resourceURLNotifier.Stop(false) + + numCalled := atomic.Int32{} + s.handler = func(w http.ResponseWriter, r *http.Request) { + numCalled.Inc() + } + + _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) + _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) + require.Eventually( + t, + func() bool { + return numCalled.Load() == 2 + }, + 5*time.Second, + webhookCheckInterval, + ) + }) + t.Run("includes", func(t *testing.T) { resourceURLNotifier := NewResourceURLNotifier(ResourceURLNotifierParams{ URL: testUrl, @@ -651,13 +704,12 @@ func TestResourceURLNotifierFilter(t *testing.T) { numCalled.Inc() } - // as there is no explicit ExcludeEvents, EventRoomFinished should be allowed _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted}) _ = resourceURLNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished}) require.Eventually( t, func() bool { - return numCalled.Load() == 2 + return numCalled.Load() == 1 }, 5*time.Second, webhookCheckInterval, From 757b7ad3134d6f033b9e26c0074a0a290ea9c3e2 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Fri, 7 Mar 2025 23:40:46 +0530 Subject: [PATCH 3/3] set filter --- webhook/filter.go | 21 ++++++++++----------- webhook/notifier.go | 14 ++++++++++++++ webhook/resource_url_notifier.go | 12 ++++++++---- webhook/url_notifier.go | 11 +++++++---- 4 files changed, 39 insertions(+), 19 deletions(-) diff --git a/webhook/filter.go b/webhook/filter.go index 8da02dff6..591a331f5 100644 --- a/webhook/filter.go +++ b/webhook/filter.go @@ -16,29 +16,28 @@ package webhook import "slices" -type filterParams struct { - Includes []string - Excludes []string -} - type filter struct { - params filterParams + params FilterParams } -func newFilter(params filterParams) *filter { +func newFilter(params FilterParams) *filter { return &filter{ params: params, } } +func (f *filter) SetFilter(params FilterParams) { + f.params = params +} + func (f *filter) IsAllowed(event string) bool { // includes get higher precendence than excludes - if len(f.params.Includes) != 0 { - return slices.Contains(f.params.Includes, event) + if len(f.params.IncludeEvents) != 0 { + return slices.Contains(f.params.IncludeEvents, event) } - if len(f.params.Excludes) != 0 { - return !slices.Contains(f.params.Excludes, event) + if len(f.params.ExcludeEvents) != 0 { + return !slices.Contains(f.params.ExcludeEvents, event) } // default allow diff --git a/webhook/notifier.go b/webhook/notifier.go index 93cb57ca8..0a38f9ef5 100644 --- a/webhook/notifier.go +++ b/webhook/notifier.go @@ -26,6 +26,8 @@ import ( type QueuedNotifier interface { RegisterProcessedHook(f func(ctx context.Context, whi *livekit.WebhookInfo)) + SetKeys(apiKey, apiSecret string) + SetFilter(params FilterParams) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error Stop(force bool) } @@ -75,6 +77,18 @@ func (n *DefaultNotifier) RegisterProcessedHook(hook func(ctx context.Context, w } } +func (n *DefaultNotifier) SetKeys(apiKey, apiSecret string) { + for _, u := range n.notifiers { + u.SetKeys(apiKey, apiSecret) + } +} + +func (n *DefaultNotifier) SetFilter(params FilterParams) { + for _, u := range n.notifiers { + u.SetFilter(params) + } +} + // --------------------------------- type HTTPClientParams struct { diff --git a/webhook/resource_url_notifier.go b/webhook/resource_url_notifier.go index 69a00c2ad..bce2f7f6a 100644 --- a/webhook/resource_url_notifier.go +++ b/webhook/resource_url_notifier.go @@ -118,10 +118,7 @@ func NewResourceURLNotifier(params ResourceURLNotifierParams) *ResourceURLNotifi params: params, client: rhc, resourceQueues: make(map[string]*resourceQueueInfo), - filter: newFilter(filterParams{ - Includes: params.IncludeEvents, - Excludes: params.ExcludeEvents, - }), + filter: newFilter(params.FilterParams), } go r.sweeper() @@ -136,6 +133,13 @@ func (r *ResourceURLNotifier) SetKeys(apiKey, apiSecret string) { r.params.APISecret = apiSecret } +func (r *ResourceURLNotifier) SetFilter(params FilterParams) { + r.mu.Lock() + defer r.mu.Unlock() + + r.filter.SetFilter(params) +} + func (r *ResourceURLNotifier) RegisterProcessedHook(hook func(ctx context.Context, whi *livekit.WebhookInfo)) { r.mu.Lock() defer r.mu.Unlock() diff --git a/webhook/url_notifier.go b/webhook/url_notifier.go index ef0e2d379..4ed0d1a20 100644 --- a/webhook/url_notifier.go +++ b/webhook/url_notifier.go @@ -84,10 +84,7 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier { n := &URLNotifier{ params: params, client: rhc, - filter: newFilter(filterParams{ - Includes: params.IncludeEvents, - Excludes: params.ExcludeEvents, - }), + filter: newFilter(params.FilterParams), } n.client.Logger = &logAdapter{} @@ -105,6 +102,12 @@ func (n *URLNotifier) SetKeys(apiKey, apiSecret string) { n.params.APISecret = apiSecret } +func (n *URLNotifier) SetFilter(params FilterParams) { + n.mu.Lock() + defer n.mu.Unlock() + n.filter.SetFilter(params) +} + func (n *URLNotifier) RegisterProcessedHook(hook func(ctx context.Context, whi *livekit.WebhookInfo)) { n.mu.Lock() defer n.mu.Unlock()