Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions webhook/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webhook

import "slices"

type filter struct {
params FilterParams
}

func newFilter(params FilterParams) *filter {
return &filter{
params: params,
}
}

func (f *filter) SetFilter(params FilterParams) {
f.params = params
}

func (f *filter) IsAllowed(event string) bool {
// includes get higher precendence than excludes
if len(f.params.IncludeEvents) != 0 {
return slices.Contains(f.params.IncludeEvents, event)
}

if len(f.params.ExcludeEvents) != 0 {
return !slices.Contains(f.params.ExcludeEvents, event)
}

// default allow
return true
}
19 changes: 19 additions & 0 deletions webhook/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

type QueuedNotifier interface {
RegisterProcessedHook(f func(ctx context.Context, whi *livekit.WebhookInfo))
SetKeys(apiKey, apiSecret string)
SetFilter(params FilterParams)
QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error
Stop(force bool)
}
Expand Down Expand Up @@ -75,6 +77,18 @@ func (n *DefaultNotifier) RegisterProcessedHook(hook func(ctx context.Context, w
}
}

func (n *DefaultNotifier) SetKeys(apiKey, apiSecret string) {
for _, u := range n.notifiers {
u.SetKeys(apiKey, apiSecret)
}
}

func (n *DefaultNotifier) SetFilter(params FilterParams) {
for _, u := range n.notifiers {
u.SetFilter(params)
}
}

// ---------------------------------

type HTTPClientParams struct {
Expand All @@ -84,6 +98,11 @@ type HTTPClientParams struct {
ClientTimeout time.Duration
}

type FilterParams struct {
IncludeEvents []string
ExcludeEvents []string
}

// ---------------------------------

type logAdapter struct{}
Expand Down
18 changes: 17 additions & 1 deletion webhook/resource_url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type ResourceURLNotifierParams struct {
APIKey string
APISecret string
FieldsHook func(whi *livekit.WebhookInfo)
FilterParams
}

// ResourceURLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL.
Expand All @@ -78,7 +79,10 @@ type ResourceURLNotifier struct {

resourceQueues map[string]*resourceQueueInfo
resourceQueueTimeoutQueue utils.TimeoutQueue[*resourceQueueInfo]
closed core.Fuse

filter *filter

closed core.Fuse
}

func NewResourceURLNotifier(params ResourceURLNotifierParams) *ResourceURLNotifier {
Expand Down Expand Up @@ -114,6 +118,7 @@ func NewResourceURLNotifier(params ResourceURLNotifierParams) *ResourceURLNotifi
params: params,
client: rhc,
resourceQueues: make(map[string]*resourceQueueInfo),
filter: newFilter(params.FilterParams),
}

go r.sweeper()
Expand All @@ -128,6 +133,13 @@ func (r *ResourceURLNotifier) SetKeys(apiKey, apiSecret string) {
r.params.APISecret = apiSecret
}

func (r *ResourceURLNotifier) SetFilter(params FilterParams) {
r.mu.Lock()
defer r.mu.Unlock()

r.filter.SetFilter(params)
}

func (r *ResourceURLNotifier) RegisterProcessedHook(hook func(ctx context.Context, whi *livekit.WebhookInfo)) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -141,6 +153,10 @@ func (r *ResourceURLNotifier) getProcessedHook() func(ctx context.Context, whi *
}

func (r *ResourceURLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
if !r.filter.IsAllowed(event.Event) {
return nil
}

if r.closed.IsBroken() {
return errClosed
}
Expand Down
13 changes: 13 additions & 0 deletions webhook/url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type URLNotifierParams struct {
APIKey string
APISecret string
FieldsHook func(whi *livekit.WebhookInfo)
FilterParams
}

// URLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL.
Expand All @@ -56,6 +57,7 @@ type URLNotifier struct {
dropped atomic.Int32
pool core.QueuePool
processedHook func(ctx context.Context, whi *livekit.WebhookInfo)
filter *filter
}

func NewURLNotifier(params URLNotifierParams) *URLNotifier {
Expand All @@ -82,6 +84,7 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier {
n := &URLNotifier{
params: params,
client: rhc,
filter: newFilter(params.FilterParams),
}
n.client.Logger = &logAdapter{}

Expand All @@ -99,6 +102,12 @@ func (n *URLNotifier) SetKeys(apiKey, apiSecret string) {
n.params.APISecret = apiSecret
}

func (n *URLNotifier) SetFilter(params FilterParams) {
n.mu.Lock()
defer n.mu.Unlock()
n.filter.SetFilter(params)
}

func (n *URLNotifier) RegisterProcessedHook(hook func(ctx context.Context, whi *livekit.WebhookInfo)) {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -112,6 +121,10 @@ func (n *URLNotifier) getProcessedHook() func(ctx context.Context, whi *livekit.
}

func (n *URLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
if !n.filter.IsAllowed(event.Event) {
return nil
}

enqueuedAt := time.Now()

key := eventKey(event)
Expand Down
Loading