forked from iotaledger/goshimmer
/
chat_livefeed.go
60 lines (49 loc) · 1.74 KB
/
chat_livefeed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package dashboard
import (
"context"
"github.com/izuc/zipp.foundation/core/daemon"
"github.com/izuc/zipp.foundation/core/generics/event"
"github.com/izuc/zipp.foundation/core/workerpool"
"github.com/izuc/zipp/packages/app/chat"
"github.com/izuc/zipp/packages/node/shutdown"
)
var (
chatLiveFeedWorkerCount = 1
chatLiveFeedWorkerQueueSize = 50
chatLiveFeedWorkerPool *workerpool.NonBlockingQueuedWorkerPool
)
type chatBlk struct {
From string `json:"from"`
To string `json:"to"`
Block string `json:"block"`
BlockID string `json:"blockID"`
Timestamp string `json:"timestamp"`
}
func configureChatLiveFeed() {
chatLiveFeedWorkerPool = workerpool.NewNonBlockingQueuedWorkerPool(func(task workerpool.Task) {
newBlock := task.Param(0).(*chat.BlockReceivedEvent)
broadcastWsBlock(&wsblk{MsgTypeChat, &chatBlk{
From: newBlock.From,
To: newBlock.To,
Block: newBlock.Block,
BlockID: newBlock.BlockID,
Timestamp: newBlock.Timestamp.Format("2 Jan 2006 15:04:05"),
}})
task.Return(nil)
}, workerpool.WorkerCount(chatLiveFeedWorkerCount), workerpool.QueueSize(chatLiveFeedWorkerQueueSize))
}
func runChatLiveFeed() {
if err := daemon.BackgroundWorker("Dashboard[ChatUpdater]", func(ctx context.Context) {
notifyNewBlocks := event.NewClosure(func(event *chat.BlockReceivedEvent) {
chatLiveFeedWorkerPool.TrySubmit(event)
})
deps.Chat.Events.BlockReceived.Attach(notifyNewBlocks)
defer chatLiveFeedWorkerPool.Stop()
<-ctx.Done()
log.Info("Stopping Dashboard[ChatUpdater] ...")
deps.Chat.Events.BlockReceived.Detach(notifyNewBlocks)
log.Info("Stopping Dashboard[ChatUpdater] ... done")
}, shutdown.PriorityDashboard); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}