/
wsc.go
96 lines (86 loc) · 2.2 KB
/
wsc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package garbanzo
import (
"log"
"sync"
"time"
"github.com/gorilla/websocket"
)
type wsClient struct {
// socketはこのクライアントのためのWebSocket
socket *websocket.Conn
// sendはイベントが送られるチャネル。WebSocketを通じてユーザのブラウザに送られるのを待機する
send chan *Event
// 統計
stats chan *Stats
// roomはこのクライアントが参加している接続
room *room
// doneはクライアントに送信済みの通知IDを保持する
done map[string]bool
mu *sync.RWMutex
}
type mark struct {
Source sourceType
ID string
HTMLURL string
ProxyURL string
}
// 無限ループでwebsocketを受信し続ける
func (wsc *wsClient) read() {
for {
var m *mark
err := wsc.socket.ReadJSON(&m)
if err != nil {
// 読み込めないと終了
// このループを抜けるとハンドラの実行が終了する。deferによってleaveチャンネルに送られ、送信対象から外される
log.Println(err)
break
}
wsc.room.markRead <- m
}
wsc.socket.Close()
}
// 直近〜分だけブラウザ通知する
const notifyMinutesAgo = 60
// clientへのメッセージをwebsocketに書き込む
func (wsc *wsClient) write() {
go func() {
for stats := range wsc.stats {
wsc.mu.Lock()
err := wsc.socket.WriteJSON(stats)
wsc.mu.Unlock()
if err != nil {
break
}
}
wsc.socket.Close()
}()
go func() {
for send := range wsc.send {
// doneに存在しないときだけ書き込み
wsc.mu.RLock()
_, exists := wsc.done[send.NotificationID]
wsc.mu.RUnlock()
if exists {
continue
}
// 直近のイベントだけブラウザ通知する
now := time.Now()
minutesAgo := now.Add(-notifyMinutesAgo * time.Minute)
// 「更新時間」が、「更新時刻よりN分前」より未来にあるか?
// (過去) ---> 今-N分前 ---> |-> 通知有効期間 <-| ---> 今 ---> (未来)
if send.UpdatedAt.After(minutesAgo) {
send.IsNotifyBrowser = true
}
wsc.mu.Lock()
err := wsc.socket.WriteJSON(send)
wsc.mu.Unlock()
if err != nil {
break
}
wsc.mu.Lock()
wsc.done[send.NotificationID] = true
wsc.mu.Unlock()
}
wsc.socket.Close()
}()
}