/
log_distribution_worker.go
142 lines (126 loc) · 3.59 KB
/
log_distribution_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package usecase
import (
"context"
"strconv"
"sync"
"github.com/log-rush/distribution-server/domain"
"github.com/log-rush/distribution-server/pkg/app"
"github.com/log-rush/distribution-server/pkg/commons"
"github.com/log-rush/distribution-server/pkg/lrp"
)
type logJob struct {
logs []domain.Log
stream string
}
type logDistributionWorkerPool struct {
workers []*logDistributionWorker
maxWorkers int
subscriptionRepo *domain.SubscriptionsRepository
logPlugins *[]app.LogPlugin
jobs chan logJob
results chan error
l *domain.Logger
}
type logDistributionWorker struct {
id int
stop chan bool
jobs <-chan logJob
results chan<- error
l *domain.Logger
repo *domain.SubscriptionsRepository
logPlugins *[]app.LogPlugin
}
var (
encoder = lrp.NewEncoder()
)
func NewPool(maxWorkers int, subscriptionRepo *domain.SubscriptionsRepository, logPlugins *[]app.LogPlugin, logger domain.Logger) logDistributionWorkerPool {
return logDistributionWorkerPool{
maxWorkers: maxWorkers,
subscriptionRepo: subscriptionRepo,
jobs: make(chan logJob, 64),
results: make(chan error),
l: &logger,
workers: []*logDistributionWorker{},
logPlugins: logPlugins,
}
}
func (p logDistributionWorkerPool) Start() {
(*p.l).Debugf("started worker pool (%d instances)", p.maxWorkers)
go func() {
defer commons.RecoverRoutine(p.l)
for err := range p.results {
if err != nil {
(*p.l).Warnf("error in worker: %s", err.Error())
}
}
}()
for i := 0; i < p.maxWorkers; i++ {
worker := newWorker(i, p.jobs, p.results, p.subscriptionRepo, p.logPlugins, p.l)
p.workers = append(p.workers, worker)
go worker.work()
(*p.l).Debugf("[%d] worker started", worker.id)
}
}
func (p logDistributionWorkerPool) PostJob(logs []domain.Log, stream string) {
p.jobs <- logJob{
logs: logs,
stream: stream,
}
}
func (p logDistributionWorkerPool) Stop() {
for _, worker := range p.workers {
worker.stop <- true
(*p.l).Debugf("[%d] worker stopped", worker.id)
}
}
func newWorker(id int, jobs <-chan logJob, result chan<- error, repo *domain.SubscriptionsRepository, logPlugins *[]app.LogPlugin, logger *domain.Logger) *logDistributionWorker {
return &logDistributionWorker{
id: id,
jobs: jobs,
stop: make(chan bool),
results: result,
repo: repo,
l: logger,
logPlugins: logPlugins,
}
}
func (w *logDistributionWorker) work() {
defer commons.RecoverRoutine(w.l)
for {
select {
case job := <-w.jobs:
(*w.l).Debugf("[%d] worker received job for %s (%d)", w.id, job.stream, len(job.logs))
wg := sync.WaitGroup{}
subscribers, err := (*w.repo).GetSubscribers(context.Background(), job.stream)
if err != nil {
(*w.l).Warnf("[%d] cant get subscribers for %s ", w.id, job.stream)
w.results <- err
continue
}
(*w.l).Debugf("[%d] sending to %d subscribers", w.id, len(subscribers))
for _, client := range subscribers {
wg.Add(1)
go func(client domain.Client) {
defer commons.RecoverRoutine(w.l)
for _, log := range job.logs {
client.Send <- encoder.Encode(lrp.NewMesssage(lrp.OprLog, []byte(job.stream+","+strconv.Itoa(log.TimeStamp)+","+log.Message)))
}
wg.Done()
}(client)
}
(*w.l).Debugf("[%d] sending to %d plugins", w.id, len(*w.logPlugins))
wg.Add(1)
go func() {
for _, plugin := range *w.logPlugins {
for _, log := range job.logs {
plugin.HandleLog(log)
}
}
wg.Done()
}()
wg.Wait()
case <-w.stop:
return
}
}
}