Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
buf.build/go/protoyaml v0.3.1
github.com/benbjohnson/clock v1.3.5
github.com/dennwc/iters v1.0.1
github.com/frostbyte73/core v0.1.0
github.com/frostbyte73/core v0.1.1
github.com/fsnotify/fsnotify v1.8.0
github.com/gammazero/deque v1.0.0
github.com/go-jose/go-jose/v3 v3.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/frostbyte73/core v0.1.0 h1:KA4klxRjLbEHLv+judmlRtweyjcj1NWOJ+BQHQgNxfw=
github.com/frostbyte73/core v0.1.0/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo=
github.com/frostbyte73/core v0.1.1 h1:ChhJOR7bAKOCPbA+lqDLE2cGKlCG5JXsDvvQr4YaJIA=
github.com/frostbyte73/core v0.1.1/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
Expand Down
571 changes: 427 additions & 144 deletions livekit/livekit_analytics.pb.go

Large diffs are not rendered by default.

31 changes: 29 additions & 2 deletions protobufs/livekit_analytics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ enum AnalyticsEventType {
SIP_CALL_ENDED = 39;
REPORT = 40;
API_CALL = 41;
WEBHOOK = 42;

// NEXT_ID: 42
// NEXT_ID: 43
}

message AnalyticsClientMeta {
Expand Down Expand Up @@ -175,8 +176,9 @@ message AnalyticsEvent {
SIPDispatchRuleInfo sip_dispatch_rule = 32;
ReportInfo report = 33;
APICallInfo api_call = 34;
WebhookInfo webhook = 35;

// NEXT_ID: 35
// NEXT_ID: 36
}

message AnalyticsEvents {
Expand Down Expand Up @@ -269,3 +271,28 @@ message APICallInfo {
google.protobuf.Timestamp started_at = 14;
int64 duration_ns = 15;
}

message WebhookInfo {
string event_id = 1;
string event = 2;
string project_id = 3;
string room_name = 4;
string room_id = 5;
string participant_identity = 6;
string participant_id = 7;
string track_id = 8;
string egress_id = 9;
string ingress_id = 10;
google.protobuf.Timestamp created_at = 11;
google.protobuf.Timestamp queued_at = 12;
int64 queue_duration_ns = 13;
google.protobuf.Timestamp sent_at = 14;
int64 send_duration_ns = 15;
string url = 16;
int32 num_dropped = 17;
bool is_dropped = 18;
string service_status = 19;
int32 service_error_code = 20;
string service_error = 21;
string send_error = 22;
}
2 changes: 1 addition & 1 deletion replay/cloud_replay.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions webhook/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

type QueuedNotifier interface {
RegisterProcessedHook(f func(ctx context.Context, whi *livekit.WebhookInfo))
QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error
}

Expand Down Expand Up @@ -56,11 +57,17 @@ func (n *DefaultNotifier) Stop(force bool) {
wg.Wait()
}

func (n *DefaultNotifier) QueueNotify(_ context.Context, event *livekit.WebhookEvent) error {
func (n *DefaultNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
for _, u := range n.urlNotifiers {
if err := u.QueueNotify(event); err != nil {
if err := u.QueueNotify(ctx, event); err != nil {
return err
}
}
return nil
}

func (n *DefaultNotifier) RegisterProcessedHook(hook func(ctx context.Context, whi *livekit.WebhookInfo)) {
for _, u := range n.urlNotifiers {
u.RegisterProcessedHook(hook)
}
}
166 changes: 144 additions & 22 deletions webhook/url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package webhook

import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"sync"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/hashicorp/go-retryablehttp"
"go.uber.org/atomic"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
Expand All @@ -37,11 +39,12 @@ const (

type URLNotifierParams struct {
HTTPClientParams
Logger logger.Logger
QueueSize int
URL string
APIKey string
APISecret string
Logger logger.Logger
QueueSize int
URL string
APIKey string
APISecret string
FieldsHook func(whi *livekit.WebhookInfo)
}

type HTTPClientParams struct {
Expand All @@ -56,11 +59,12 @@ const defaultQueueSize = 100
// URLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL.
// It will retry on failure, and will drop events if notification fall too far behind
type URLNotifier struct {
mu sync.RWMutex
params URLNotifierParams
client *retryablehttp.Client
dropped atomic.Int32
pool core.QueuePool
mu sync.RWMutex
params URLNotifierParams
client *retryablehttp.Client
dropped atomic.Int32
pool core.QueuePool
processedHook func(ctx context.Context, whi *livekit.WebhookInfo)
}

func NewURLNotifier(params URLNotifierParams) *URLNotifier {
Expand Down Expand Up @@ -93,7 +97,6 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier {
n.pool = core.NewQueuePool(numWorkers, core.QueueWorkerParams{
QueueSize: params.QueueSize,
DropWhenFull: true,
OnDropped: func() { n.dropped.Inc() },
})
return n
}
Expand All @@ -105,25 +108,76 @@ func (n *URLNotifier) SetKeys(apiKey, apiSecret string) {
n.params.APISecret = apiSecret
}

func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error {
func (n *URLNotifier) RegisterProcessedHook(hook func(ctx context.Context, whi *livekit.WebhookInfo)) {
n.mu.Lock()
defer n.mu.Unlock()
n.processedHook = hook
}

func (n *URLNotifier) getProcessedHook() func(ctx context.Context, whi *livekit.WebhookInfo) {
n.mu.RLock()
defer n.mu.RUnlock()
return n.processedHook
}

func (n *URLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
enqueuedAt := time.Now()

n.pool.Submit(n.eventKey(event), func() {
fields := logFields(event)
fields = append(fields,
"url", n.params.URL,
"queueDuration", time.Since(enqueuedAt),
)
sentStart := time.Now()
if !n.pool.Submit(n.eventKey(event), func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the needed change to core to get a synchronous return value from Submit if it is successful or not. That will allow logging/posting event for dropped events too.

fields := logFields(event, n.params.URL)

queueDuration := time.Since(enqueuedAt)
fields = append(fields, "queueDuration", queueDuration)

sendStart := time.Now()
err := n.send(event)
fields = append(fields, "sendDuration", time.Since(sentStart))
sendDuration := time.Since(sendStart)
fields = append(fields, "sendDuration", sendDuration)
if err != nil {
n.params.Logger.Warnw("failed to send webhook", err, fields...)
n.dropped.Add(event.NumDropped + 1)
} else {
n.params.Logger.Infow("sent webhook", fields...)
}
})
if ph := n.getProcessedHook(); ph != nil {
whi := webhookInfo(
event,
enqueuedAt,
queueDuration,
sendStart,
sendDuration,
n.params.URL,
false,
err,
)
if n.params.FieldsHook != nil {
n.params.FieldsHook(whi)
}
ph(ctx, whi)
}
}) {
n.dropped.Inc()

fields := logFields(event, n.params.URL)
n.params.Logger.Infow("dropped webhook", fields...)

if ph := n.getProcessedHook(); ph != nil {
whi := webhookInfo(
event,
time.Time{},
0,
time.Time{},
0,
n.params.URL,
true,
nil,
)
if n.params.FieldsHook != nil {
n.params.FieldsHook(whi)
}
ph(ctx, whi)
}
}
return nil
}

Expand Down Expand Up @@ -197,12 +251,13 @@ type logAdapter struct{}

func (l *logAdapter) Printf(string, ...interface{}) {}

func logFields(event *livekit.WebhookEvent) []interface{} {
func logFields(event *livekit.WebhookEvent, url string) []interface{} {
fields := make([]interface{}, 0, 20)
fields = append(fields,
"event", event.Event,
"id", event.Id,
"webhookTime", event.CreatedAt,
"url", url,
)

if event.Room != nil {
Expand All @@ -217,6 +272,11 @@ func logFields(event *livekit.WebhookEvent) []interface{} {
"pID", event.Participant.Sid,
)
}
if event.Track != nil {
fields = append(fields,
"trackID", event.Track.Sid,
)
}
if event.EgressInfo != nil {
fields = append(fields,
"egressID", event.EgressInfo.EgressId,
Expand All @@ -239,3 +299,65 @@ func logFields(event *livekit.WebhookEvent) []interface{} {
}
return fields
}

func webhookInfo(
event *livekit.WebhookEvent,
queuedAt time.Time,
queueDuration time.Duration,
sentAt time.Time,
sendDuration time.Duration,
url string,
isDropped bool,
sendError error,
) *livekit.WebhookInfo {
whi := &livekit.WebhookInfo{
EventId: event.Id,
Event: event.Event,
CreatedAt: timestamppb.New(time.Unix(event.CreatedAt, 0)),
QueuedAt: timestamppb.New(queuedAt),
QueueDurationNs: queueDuration.Nanoseconds(),
SentAt: timestamppb.New(sentAt),
SendDurationNs: sendDuration.Nanoseconds(),
Url: url,
NumDropped: event.NumDropped,
IsDropped: isDropped,
}
if !queuedAt.IsZero() {
whi.QueuedAt = timestamppb.New(queuedAt)
}
if !sentAt.IsZero() {
whi.SentAt = timestamppb.New(sentAt)
}
if event.Room != nil {
whi.RoomName = event.Room.Name
whi.RoomId = event.Room.Sid
}
if event.Participant != nil {
whi.ParticipantIdentity = event.Participant.Identity
whi.ParticipantId = event.Participant.Sid
}
if event.Track != nil {
whi.TrackId = event.Track.Sid
}
if event.EgressInfo != nil {
whi.EgressId = event.EgressInfo.EgressId
whi.ServiceStatus = event.EgressInfo.Status.String()
if event.EgressInfo.Error != "" {
whi.ServiceErrorCode = event.EgressInfo.ErrorCode
whi.ServiceError = event.EgressInfo.Error
}
}
if event.IngressInfo != nil {
whi.IngressId = event.IngressInfo.IngressId
if event.IngressInfo.State != nil {
whi.ServiceStatus = event.IngressInfo.State.Status.String()
if event.IngressInfo.State.Error != "" {
whi.ServiceError = event.IngressInfo.State.Error
}
}
}
if sendError != nil {
whi.SendError = sendError.Error()
}
return whi
}
14 changes: 7 additions & 7 deletions webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func TestURLNotifierDropped(t *testing.T) {
}
// send multiple notifications
for i := 0; i < 10; i++ {
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventParticipantJoined})
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished})
}

time.Sleep(webhookCheckInterval)
Expand All @@ -120,8 +120,8 @@ func TestURLNotifierLifecycle(t *testing.T) {
numCalled.Inc()
}
for i := 0; i < 10; i++ {
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished})
}
urlNotifier.Stop(false)
require.Eventually(t, func() bool { return numCalled.Load() == 20 }, 5*time.Second, webhookCheckInterval)
Expand All @@ -134,8 +134,8 @@ func TestURLNotifierLifecycle(t *testing.T) {
numCalled.Inc()
}
for i := 0; i < 10; i++ {
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished})
}
urlNotifier.Stop(true)
time.Sleep(time.Second)
Expand Down