/
registry.go
110 lines (89 loc) · 3.47 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package stats
import (
"context"
"log"
"sync"
"github.com/patrickmn/go-cache"
authinterceptor "github.com/tnyim/jungletv/server/interceptors/auth"
"gopkg.in/alexcesaro/statsd.v2"
)
// Registry handles statistics
type Registry struct {
log *log.Logger
statsClient *statsd.Client
// spectatorsByRemoteAddress is a set of remote addresses
spectatorsByRemoteAddress map[string]int
spectatorsMutex sync.RWMutex
streamingSubsCounters *cache.OrderedCache[StatStreamConsumersType, int]
}
// StatStreamConsumersType is a type of gRPC stream consumer count
type StatStreamConsumersType string
// StatStreamConsumersQueue is the queue gRPC stream consumer count
const StatStreamConsumersQueue StatStreamConsumersType = "queue"
// StatStreamConsumersCommunitySkipping is the community skipping gRPC stream consumer count
const StatStreamConsumersCommunitySkipping StatStreamConsumersType = "community_skipping"
// StatStreamConsumersChat is the chat gRPC stream consumer count
const StatStreamConsumersChat StatStreamConsumersType = "chat"
// NewRegistry creates a new stats Registry
func NewRegistry(log *log.Logger, statsClient *statsd.Client) (*Registry, error) {
go statsClient.Gauge("spectators", 0)
s := &Registry{
log: log,
statsClient: statsClient,
spectatorsByRemoteAddress: make(map[string]int),
streamingSubsCounters: cache.NewOrderedCache[StatStreamConsumersType, int](cache.NoExpiration, -1),
}
s.streamingSubsCounters.SetDefault(StatStreamConsumersQueue, 0)
s.streamingSubsCounters.SetDefault(StatStreamConsumersQueue+"_authenticated", 0)
s.streamingSubsCounters.SetDefault(StatStreamConsumersCommunitySkipping, 0)
s.streamingSubsCounters.SetDefault(StatStreamConsumersCommunitySkipping+"_authenticated", 0)
s.streamingSubsCounters.SetDefault(StatStreamConsumersChat, 0)
s.streamingSubsCounters.SetDefault(StatStreamConsumersChat+"_authenticated", 0)
return s, nil
}
func (s *Registry) RegisterSpectator(ctx context.Context) (func(), error) {
s.spectatorsMutex.Lock()
defer s.spectatorsMutex.Unlock()
remoteAddress := authinterceptor.RemoteAddressFromContext(ctx)
ipCountry := authinterceptor.IPCountryFromContext(ctx)
if ipCountry == "T1" {
return func() {}, nil
}
s.spectatorsByRemoteAddress[remoteAddress]++
go s.statsClient.Gauge("spectators", len(s.spectatorsByRemoteAddress))
return func() {
s.spectatorsMutex.Lock()
defer s.spectatorsMutex.Unlock()
s.spectatorsByRemoteAddress[remoteAddress]--
if s.spectatorsByRemoteAddress[remoteAddress] <= 0 {
delete(s.spectatorsByRemoteAddress, remoteAddress)
}
go s.statsClient.Gauge("spectators", len(s.spectatorsByRemoteAddress))
}, nil
}
func (s *Registry) CurrentlyWatching() int {
s.spectatorsMutex.RLock()
defer s.spectatorsMutex.RUnlock()
return len(s.spectatorsByRemoteAddress)
}
func (s *Registry) RegisterStreamSubscriber(stream StatStreamConsumersType, authenticated bool) func() {
s.streamingSubsCounters.Increment(stream, 1)
authenticatedKey := stream + "_authenticated"
if authenticated {
s.streamingSubsCounters.Increment(authenticatedKey, 1)
}
gauge := func() {
v, _ := s.streamingSubsCounters.Get(stream)
s.statsClient.Gauge("subscribers."+string(stream), v)
v, _ = s.streamingSubsCounters.Get(authenticatedKey)
s.statsClient.Gauge(string("subscribers."+authenticatedKey), v)
}
go gauge()
return func() {
s.streamingSubsCounters.Increment(stream, -1)
if authenticated {
s.streamingSubsCounters.Increment(authenticatedKey, -1)
}
go gauge()
}
}