This repository has been archived by the owner on Jul 8, 2023. It is now read-only.
/
coordinator.go
102 lines (88 loc) · 1.81 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (C) 2023 myl7
// SPDX-License-Identifier: Apache-2.0
package coordinator
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/go-redis/redis/v9"
"github.com/myl7/karaoke/pkg/rpc"
"go.mongodb.org/mongo-driver/mongo"
)
type Coordinator struct {
c CoordinatorConfig
round int
rC *redis.Client
mDB *mongo.Database
}
type CoordinatorConfig struct {
ServerN int
// Redis addr
RAddr string
// MongoDB URI
MURI string
}
func NewCoordinator(c CoordinatorConfig) *Coordinator {
return &Coordinator{c: c}
}
func (co *Coordinator) Run(ctx context.Context) error {
// To make sure all servers are waiting rounds but still separate Run & Bootstrap,
// poll subscriber number until enough.
for {
nM, err := co.rC.PubSubNumSub(ctx, "karaoke/round").Result()
if err != nil {
return err
}
if nM["karaoke/round"] >= int64(co.c.ServerN) {
break
}
select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
for {
co.round += 1
rS := rpc.RoundStartMsg{Round: co.round}
rSB, err := json.Marshal(rS)
if err != nil {
panic(err)
}
err = co.rC.Publish(ctx, "karaoke/round", rSB).Err()
if err != nil {
return err
}
sub := co.rC.Subscribe(ctx, "karaoke/round_ok")
ch := sub.Channel()
m := make(map[string]bool, co.c.ServerN)
L:
for {
select {
case rEB := <-ch:
var rE rpc.RoundEndMsg
err = json.Unmarshal([]byte(rEB.Payload), &rE)
if err != nil {
panic(err)
}
round := rE.Round
id := rE.ID
if round != co.round {
panic(ErrRoundNotMatch)
}
m[id] = true
if len(m) >= co.c.ServerN {
break L
}
case <-ctx.Done():
return ctx.Err()
}
}
err = sub.Close()
if err != nil {
panic(err)
}
}
}
var ErrRoundNotMatch = errors.New("start and end round not match")