Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/swift-maps-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"github.com/livekit/protocol": patch
---

Use multiple webhook workers for each URL to improve parallelism
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/benbjohnson/clock v1.3.5
github.com/bufbuild/protoyaml-go v0.1.9
github.com/eapache/channels v1.1.0
github.com/frostbyte73/core v0.0.10
github.com/frostbyte73/core v0.0.12
github.com/fsnotify/fsnotify v1.7.0
github.com/gammazero/deque v0.2.1
github.com/go-jose/go-jose/v3 v3.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/frostbyte73/core v0.0.10 h1:D4DQXdPb8ICayz0n75rs4UYTXrUSdxzUfeleuNJORsU=
github.com/frostbyte73/core v0.0.10/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU=
github.com/frostbyte73/core v0.0.12 h1:kySA8+Os6eqnPFoExD2T7cehjSAY1MRyIViL0yTy2uc=
github.com/frostbyte73/core v0.0.12/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
Expand Down
35 changes: 30 additions & 5 deletions webhook/url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
"github.com/livekit/protocol/logger"
)

const (
numWorkers = 10
)

type URLNotifierParams struct {
HTTPClientParams
Logger logger.Logger
Expand All @@ -56,7 +60,7 @@ type URLNotifier struct {
params URLNotifierParams
client *retryablehttp.Client
dropped atomic.Int32
worker core.QueueWorker
pool core.QueuePool
}

func NewURLNotifier(params URLNotifierParams) *URLNotifier {
Expand Down Expand Up @@ -85,7 +89,8 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier {
client: rhc,
}
n.client.Logger = &logAdapter{}
n.worker = core.NewQueueWorker(core.QueueWorkerParams{

n.pool = core.NewQueuePool(numWorkers, core.QueueWorkerParams{
QueueSize: params.QueueSize,
DropWhenFull: true,
OnDropped: func() { n.dropped.Inc() },
Expand All @@ -102,7 +107,8 @@ func (n *URLNotifier) SetKeys(apiKey, apiSecret string) {

func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error {
enqueuedAt := time.Now()
n.worker.Submit(func() {

n.pool.Submit(n.eventKey(event), func() {
fields := logFields(event)
fields = append(fields,
"url", n.params.URL,
Expand All @@ -121,11 +127,30 @@ func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error {
return nil
}

func (c *URLNotifier) eventKey(event *livekit.WebhookEvent) string {
if event.EgressInfo != nil {
return event.EgressInfo.EgressId
}
if event.IngressInfo != nil {
return event.IngressInfo.IngressId
}
if event.Room != nil {
return event.Room.Name
}
if event.Participant != nil {
return event.Participant.Identity
}
if event.Track != nil {
return event.Track.Sid
}
return "default"
}

func (n *URLNotifier) Stop(force bool) {
if force {
n.worker.Kill()
n.pool.Kill()
} else {
n.worker.Drain()
n.pool.Drain()
}
}

Expand Down
65 changes: 59 additions & 6 deletions webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestWebHook(t *testing.T) {

wg := sync.WaitGroup{}
wg.Add(1)
s.handler = func(r *http.Request) {
s.handler = func(w http.ResponseWriter, r *http.Request) {
defer wg.Done()
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
require.NoError(t, err)
Expand All @@ -83,7 +83,7 @@ func TestURLNotifierDropped(t *testing.T) {
defer urlNotifier.Stop(true)
totalDropped := atomic.Int32{}
totalReceived := atomic.Int32{}
s.handler = func(r *http.Request) {
s.handler = func(w http.ResponseWriter, r *http.Request) {
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
require.NoError(t, err)
totalReceived.Inc()
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestURLNotifierLifecycle(t *testing.T) {
t.Run("stop allowing to drain", func(t *testing.T) {
urlNotifier := newTestNotifier()
numCalled := atomic.Int32{}
s.handler = func(r *http.Request) {
s.handler = func(w http.ResponseWriter, r *http.Request) {
numCalled.Inc()
}
for i := 0; i < 10; i++ {
Expand All @@ -130,7 +130,7 @@ func TestURLNotifierLifecycle(t *testing.T) {
t.Run("force stop", func(t *testing.T) {
urlNotifier := newTestNotifier()
numCalled := atomic.Int32{}
s.handler = func(r *http.Request) {
s.handler = func(w http.ResponseWriter, r *http.Request) {
numCalled.Inc()
}
for i := 0; i < 10; i++ {
Expand All @@ -141,6 +141,59 @@ func TestURLNotifierLifecycle(t *testing.T) {
time.Sleep(time.Second)
require.Greater(t, int32(20), numCalled.Load())
})

t.Run("times out after accepting connection", func(t *testing.T) {
urlNotifier := NewURLNotifier(URLNotifierParams{
QueueSize: 20,
URL: testUrl,
APIKey: apiKey,
APISecret: apiSecret,
HTTPClientParams: HTTPClientParams{
RetryWaitMax: time.Millisecond,
MaxRetries: 1,
ClientTimeout: 100 * time.Millisecond,
},
})

numCalled := atomic.Int32{}
s.handler = func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("ok"))

// delay the request to cause it to fail
time.Sleep(time.Second)
if r.Context().Err() == nil {
// inc if not canceled
numCalled.Inc()
}
}
defer urlNotifier.Stop(false)

err := urlNotifier.send(&livekit.WebhookEvent{Event: EventRoomStarted})
require.Error(t, err)
})

t.Run("times out before connection", func(t *testing.T) {
ln, err := net.Listen("tcp", ":9987")
require.NoError(t, err)
defer ln.Close()
urlNotifier := NewURLNotifier(URLNotifierParams{
URL: "http://localhost:9987",
APIKey: apiKey,
APISecret: apiSecret,
HTTPClientParams: HTTPClientParams{
RetryWaitMax: time.Millisecond,
MaxRetries: 1,
ClientTimeout: 100 * time.Millisecond,
},
})
defer urlNotifier.Stop(false)

startedAt := time.Now()
err = urlNotifier.send(&livekit.WebhookEvent{Event: EventRoomStarted})
require.Error(t, err)
require.Less(t, time.Since(startedAt).Seconds(), float64(2))
})
}

func newTestNotifier() *URLNotifier {
Expand All @@ -153,7 +206,7 @@ func newTestNotifier() *URLNotifier {
}

type testServer struct {
handler func(r *http.Request)
handler func(w http.ResponseWriter, r *http.Request)
server *http.Server
}

Expand All @@ -168,7 +221,7 @@ func newServer(addr string) *testServer {

func (s *testServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.handler != nil {
s.handler(r)
s.handler(w, r)
}
}

Expand Down