-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
96 lines (83 loc) · 3.34 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package bundle
import (
"github.com/warp-contracts/syncer/src/utils/arweave"
"github.com/warp-contracts/syncer/src/utils/bundlr"
"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/listener"
"github.com/warp-contracts/syncer/src/utils/model"
"github.com/warp-contracts/syncer/src/utils/monitoring"
monitor_bundler "github.com/warp-contracts/syncer/src/utils/monitoring/bundler"
"github.com/warp-contracts/syncer/src/utils/task"
"github.com/warp-contracts/syncer/src/utils/turbo"
)
type Controller struct {
*task.Task
}
// +---------------+
// | Collector |
// | |
// | |
// | +-----------+ |
// | | Poller | | +----------+ +-----------+ +-----------------+
// | +-----------+ | tx | | pd | | network_info | |
// | +------------>| Bundler +-------->| Confirmer |<-------------- | Network Monitor |
// | +-----------+ | | | | | | |
// | | Notifier | | +----------+ +-----------+ +-----------------+
// | +-----------+ |
// | |
// +---------------+
// Main class that orchestrates main syncer functionalities
func NewController(config *config.Config) (self *Controller, err error) {
self = new(Controller)
self.Task = task.NewTask(config, "bundle-controller")
// SQL database
db, err := model.NewConnection(self.Ctx, config, "bundler")
if err != nil {
return
}
// Arweave client
arweaveClient := arweave.NewClient(self.Ctx, config)
// Bundlr client
irysClient := bundlr.NewClient(self.Ctx, &config.Bundlr)
turboClient := turbo.NewClient(self.Ctx, &config.Bundlr)
// Monitoring
monitor := monitor_bundler.NewMonitor()
server := monitoring.NewServer(config).
WithMonitor(monitor)
// Gets interactions to bundle from the database
collector := NewCollector(config, db).
WithMonitor(monitor)
// Monitors latest Arweave network block height
networkMonitor := listener.NewNetworkMonitor(config).
WithClient(arweaveClient).
WithMonitor(monitor).
WithInterval(config.NetworkMonitor.Period).
WithRequiredConfirmationBlocks(0).
WithEnableOutput(false /*disable output channel to avoid blocking*/)
// Sends interactions to bundlr.network
bundler := NewBundler(config, db).
WithInputChannel(collector.Output).
WithMonitor(monitor).
WithIrysClient(irysClient).
WithTurboClient(turboClient)
// Confirmer periodically updates the state of the bundled interactions
confirmer := NewConfirmer(config).
WithDB(db).
WithMonitor(monitor).
WithNetworkMonitor(networkMonitor).
WithInputChannel(bundler.Output)
// Periodically run queries. Results stored in the monitor.
dbPoller := monitoring.NewDbPoller(config).
WithDB(db).
WithQuery(config.Bundler.DBPollerInterval, &monitor.GetReport().Bundler.State.PendingBundleItems, "SELECT count(1) FROM bundle_items WHERE state='PENDING'")
// Setup everything, will start upon calling Controller.Start()
self.Task.
WithConditionalSubtask(!config.Bundler.NotifierDisabled && config.Bundler.PollerDisabled, dbPoller.Task).
WithSubtask(confirmer.Task).
WithSubtask(bundler.Task).
WithSubtask(monitor.Task).
WithSubtask(networkMonitor.Task).
WithSubtask(server.Task).
WithSubtask(collector.Task)
return
}