/
roomallocator.go
129 lines (111 loc) · 3.31 KB
/
roomallocator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package service
import (
"context"
"time"
"github.com/underpin-korea/protocol/livekit"
"github.com/underpin-korea/protocol/logger"
"github.com/underpin-korea/protocol/utils"
"github.com/underpin-korea/livekit_server_go/pkg/config"
"github.com/underpin-korea/livekit_server_go/pkg/routing"
"github.com/underpin-korea/livekit_server_go/pkg/routing/selector"
)
type StandardRoomAllocator struct {
config *config.Config
router routing.Router
selector selector.NodeSelector
roomStore ObjectStore
}
func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore) (RoomAllocator, error) {
ns, err := selector.CreateNodeSelector(conf)
if err != nil {
return nil, err
}
return &StandardRoomAllocator{
config: conf,
router: router,
selector: ns,
roomStore: rs,
}, nil
}
// CreateRoom creates a new room from a request and allocates it to a node to handle
// it'll also monitor its state, and cleans it up when appropriate
func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) {
token, err := r.roomStore.LockRoom(ctx, livekit.RoomName(req.Name), 5*time.Second)
if err != nil {
return nil, err
}
defer func() {
_ = r.roomStore.UnlockRoom(ctx, livekit.RoomName(req.Name), token)
}()
// find existing room and update it
rm, err := r.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name))
if err == ErrRoomNotFound {
rm = &livekit.Room{
Sid: utils.NewGuid(utils.RoomPrefix),
Name: req.Name,
CreationTime: time.Now().Unix(),
TurnPassword: utils.RandomSecret(),
}
applyDefaultRoomConfig(rm, &r.config.Room)
} else if err != nil {
return nil, err
}
if req.EmptyTimeout > 0 {
rm.EmptyTimeout = req.EmptyTimeout
}
if req.MaxParticipants > 0 {
rm.MaxParticipants = req.MaxParticipants
}
if req.RoomType != "" {
rm.RoomType = req.RoomType
}
if req.Metadata != "" {
rm.Metadata = req.Metadata
}
if err := r.roomStore.StoreRoom(ctx, rm); err != nil {
return nil, err
}
// check if room already assigned
existing, err := r.router.GetNodeForRoom(ctx, livekit.RoomName(rm.Name))
if err != routing.ErrNotFound && err != nil {
return nil, err
}
// if already assigned and still available, keep it on that node
if err == nil && selector.IsAvailable(existing) {
// if node hosting the room is full, deny entry
if selector.LimitsReached(r.config.Limit, existing.Stats) {
return nil, routing.ErrNodeLimitReached
}
return rm, nil
}
// select a new node
nodeID := livekit.NodeID(req.NodeId)
if nodeID == "" {
nodes, err := r.router.ListNodes()
if err != nil {
return nil, err
}
node, err := r.selector.SelectNode(nodes)
if err != nil {
return nil, err
}
nodeID = livekit.NodeID(node.Id)
}
logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeID)
err = r.router.SetNodeForRoom(ctx, livekit.RoomName(rm.Name), nodeID)
if err != nil {
return nil, err
}
return rm, nil
}
func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) {
room.EmptyTimeout = conf.EmptyTimeout
room.MaxParticipants = conf.MaxParticipants
room.RoomType = conf.RoomType
for _, codec := range conf.EnabledCodecs {
room.EnabledCodecs = append(room.EnabledCodecs, &livekit.Codec{
Mime: codec.Mime,
FmtpLine: codec.FmtpLine,
})
}
}