/
worker.go
95 lines (76 loc) · 1.95 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package worker
import (
"fmt"
"log"
"runtime"
"strconv"
"sync"
"time"
"github.com/go-co-op/gocron"
"github.com/nsqio/go-nsq"
"github.com/themartes/erd/config"
"github.com/themartes/erd/env"
"github.com/themartes/erd/queue"
"github.com/themartes/erd/replication"
)
type ReplicationWorker struct {
DBEngine string
SourceDB string
SourceCollection string
ReplicationIndex string
NSQProducer *nsq.Producer
NSQConsumer *nsq.Consumer
withInitialLoad bool
}
func CreateReplicationWorker(engine string, sourcedb string, collection string, replicationIndex string, withInitialLoad bool) *ReplicationWorker {
topicName := fmt.Sprintf("%s.%s", engine, sourcedb)
producer, err := nsq.NewProducer(
env.Params.NSQProducerURL,
config.NSQ,
)
if err != nil {
log.Fatal(err)
}
consumer, err := nsq.NewConsumer(
topicName,
collection,
config.NSQ,
)
if err != nil {
log.Fatal(err)
}
worker := ReplicationWorker{
engine,
sourcedb,
collection,
replicationIndex,
producer,
consumer,
withInitialLoad,
}
return &worker
}
func (worker ReplicationWorker) StartReplication() {
if worker.withInitialLoad {
log.Println("Starting initial load")
startInitialLoad := time.Now()
wg := new(sync.WaitGroup)
wg.Add(runtime.NumCPU())
replication.InitialLoad(worker.DBEngine, worker.SourceDB, worker.SourceCollection, wg)
endIntialLoad := time.Since(startInitialLoad).Milliseconds()
log.Println("Initial load done in", endIntialLoad, "ms")
}
producerCron := gocron.NewScheduler(time.UTC)
interval, ConvErr := strconv.Atoi(env.Params.NSQProductionTimeout)
if ConvErr != nil {
log.Fatal(ConvErr)
}
_, err := producerCron.Every(interval).Seconds().Do(func() {
go queue.StartProducer(worker.DBEngine, worker.SourceDB, worker.NSQProducer)
})
if err != nil {
log.Fatal(err)
}
producerCron.StartAsync()
queue.StartConsumer(worker.DBEngine, worker.SourceDB, worker.SourceCollection, worker.NSQConsumer)
}