-
Notifications
You must be signed in to change notification settings - Fork 4
/
context.go
137 lines (122 loc) · 4.29 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package rtmp
import (
"errors"
"fmt"
"github.com/torresjeff/rtmp/config"
"sync"
)
type ContextStore interface {
RegisterPublisher(streamKey string) error
DestroyPublisher(streamKey string) error
RegisterSubscriber(streamKey string, subscriber Subscriber) error
GetSubscribersForStream(streamKey string) ([]Subscriber, error)
DestroySubscriber(streamKey string, sessionID string) error
StreamExists(streamKey string) bool
SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte)
GetAvcSequenceHeaderForPublisher(streamKey string) []byte
SetAacSequenceHeaderForPublisher(streamKey string, payload []byte)
GetAacSequenceHeaderForPublisher(streamKey string) []byte
}
type InMemoryContext struct {
ContextStore
subMutex sync.RWMutex
subscribers map[string][]Subscriber
seqMutex sync.RWMutex
avcSequenceHeaderCache map[string][]byte
aacSequenceHeaderCache map[string][]byte
numberOfSessions uint32
}
var StreamNotFound error = errors.New("StreamNotFound")
func NewInMemoryContext() *InMemoryContext {
return &InMemoryContext{
subscribers: make(map[string][]Subscriber),
avcSequenceHeaderCache: make(map[string][]byte),
aacSequenceHeaderCache: make(map[string][]byte),
}
}
// Registers the session in the broadcaster to keep a reference to all open subscribers
func (c *InMemoryContext) RegisterPublisher(streamKey string) error {
// Assume there will be a small amount of subscribers (ie. a few instances of ffmpeg that transcode our audio/video)
c.subMutex.Lock()
c.subscribers[streamKey] = make([]Subscriber, 0, 5)
if config.Debug {
fmt.Println("context: registered publisher with stream key", streamKey)
}
c.numberOfSessions++
c.subMutex.Unlock()
return nil
}
func (c *InMemoryContext) DestroyPublisher(streamKey string) error {
c.subMutex.Lock()
defer c.subMutex.Unlock()
if _, exists := c.subscribers[streamKey]; exists {
delete(c.subscribers, streamKey)
c.numberOfSessions--
}
return nil
}
func (c *InMemoryContext) RegisterSubscriber(streamKey string, subscriber Subscriber) error {
c.subMutex.Lock()
defer c.subMutex.Unlock()
// If a stream with the key exists, then register the subscriber
if _, exists := c.subscribers[streamKey]; exists {
c.subscribers[streamKey] = append(c.subscribers[streamKey], subscriber)
return nil
}
// If no stream was found with that stream key, send an error
return StreamNotFound
}
func (c *InMemoryContext) StreamExists(streamKey string) bool {
c.subMutex.RLock()
defer c.subMutex.RUnlock()
_, exists := c.subscribers[streamKey]
return exists
}
func (c *InMemoryContext) GetSubscribersForStream(streamKey string) ([]Subscriber, error) {
// We could add a cache check if this context got the subscribers from a DB rather than from memory
c.subMutex.RLock()
defer c.subMutex.RUnlock()
// If a stream with the key exists, then return its subscribers
if subscribers, exists := c.subscribers[streamKey]; exists {
return subscribers, nil
}
// If no stream was found with that stream key, send an error
return nil, StreamNotFound
}
func (c *InMemoryContext) DestroySubscriber(streamKey string, sessionID string) error {
c.subMutex.Lock()
defer c.subMutex.Unlock()
subscribers, exists := c.subscribers[streamKey]
if !exists {
return nil
}
numberOfSubs := len(subscribers)
for i, sub := range subscribers {
if sub.GetID() == sessionID {
// Swap the subscriber we're deleting with the last element, to avoid having to delete (more efficient)
subscribers[i] = subscribers[numberOfSubs-1]
c.subscribers[streamKey] = subscribers[:numberOfSubs-1]
}
}
return nil
}
func (c *InMemoryContext) SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte) {
c.seqMutex.Lock()
c.avcSequenceHeaderCache[streamKey] = payload
c.seqMutex.Unlock()
}
func (c *InMemoryContext) GetAvcSequenceHeaderForPublisher(streamKey string) []byte {
c.seqMutex.RLock()
defer c.seqMutex.RUnlock()
return c.avcSequenceHeaderCache[streamKey]
}
func (c *InMemoryContext) SetAacSequenceHeaderForPublisher(streamKey string, payload []byte) {
c.seqMutex.Lock()
c.aacSequenceHeaderCache[streamKey] = payload
c.seqMutex.Unlock()
}
func (c *InMemoryContext) GetAacSequenceHeaderForPublisher(streamKey string) []byte {
c.seqMutex.RLock()
defer c.seqMutex.RUnlock()
return c.aacSequenceHeaderCache[streamKey]
}