/
main.go
101 lines (78 loc) · 3.83 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"sync"
envcomm "github.com/iterum-provenance/iterum-go/env"
"github.com/iterum-provenance/iterum-go/minio"
"github.com/iterum-provenance/iterum-go/transmit"
"github.com/iterum-provenance/iterum-go/util"
"github.com/iterum-provenance/sidecar/env"
"github.com/iterum-provenance/sidecar/env/config"
"github.com/iterum-provenance/sidecar/garbage"
"github.com/iterum-provenance/sidecar/lineage"
"github.com/iterum-provenance/sidecar/manager"
"github.com/iterum-provenance/sidecar/messageq"
"github.com/iterum-provenance/sidecar/socket"
"github.com/iterum-provenance/sidecar/store"
)
func main() {
// log.Base().SetLevel("Debug")
var wg sync.WaitGroup
mqDownloaderBridgeBufferSize := 10
mqDownloaderBridge := make(chan transmit.Serializable, mqDownloaderBridgeBufferSize)
downloaderSocketBridgeBufferSize := 10
downloaderSocketBridge := make(chan transmit.Serializable, downloaderSocketBridgeBufferSize)
socketUploaderBridgeBufferSize := 10
socketUploaderBridge := make(chan transmit.Serializable, socketUploaderBridgeBufferSize)
uploaderMqBridgeBufferSize := 10
uploaderMqBridge := make(chan transmit.Serializable, uploaderMqBridgeBufferSize)
socketAcknowledgerBridgeBufferSize := 10
socketAcknowledgerBridge := make(chan transmit.Serializable, socketAcknowledgerBridgeBufferSize)
mqLineageBridgeBufferSize := 10
mqLineageBridge := make(chan transmit.Serializable, mqLineageBridgeBufferSize)
fragCollector := garbage.NewFragmentCollector()
fragCollector.Start(&wg)
// Socket setup
toSocketFile := env.TransformationStepInputSocket
fromSocketFile := env.TransformationStepOutputSocket
toSocket, err := socket.NewSocket(toSocketFile, downloaderSocketBridge, socket.SendFileHandler(fragCollector))
util.Ensure(err, "Towards Socket succesfully opened and listening")
toSocket.Start(&wg)
fromSocket, err := socket.NewSocket(fromSocketFile, socketUploaderBridge, socket.ProcessedFileHandler(socketAcknowledgerBridge, fragCollector))
util.Ensure(err, "From Socket succesfully opened and listening")
fromSocket.Start(&wg)
// Download manager setup
minioConfigDown, err := minio.NewMinioConfigFromEnv() // defaults to an output setup
util.PanicIfErr(err, "")
minioConfigDown.TargetBucket = "INVALID" // adjust such that the target output is unusable
err = minioConfigDown.Connect()
util.PanicIfErr(err, "")
downloadManager := store.NewDownloadManager(minioConfigDown, mqDownloaderBridge, downloaderSocketBridge)
downloadManager.Start(&wg)
configDownloader := config.NewDownloader(env.SidecarConfig, minioConfigDown)
configDownloader.Start(&wg)
// Upload manager setup
// Define and connect to minio storage
minioConfigUp, err := minio.NewMinioConfigFromEnv() // defaults to an output setup
util.PanicIfErr(err, "")
err = minioConfigUp.Connect()
util.PanicIfErr(err, "")
uploadManager := store.NewUploadManager(minioConfigUp, socketUploaderBridge, uploaderMqBridge, env.SidecarConfig, fragCollector)
uploadManager.Start(&wg)
// MessageQueue setup
brokerURL := envcomm.MQBrokerURL
outputQueue := envcomm.MQOutputQueue
inputQueue := envcomm.MQInputQueue
prefetchCount := envcomm.MQPrefetchCount
mqListener, err := messageq.NewListener(mqDownloaderBridge, socketAcknowledgerBridge, brokerURL, inputQueue, prefetchCount)
util.Ensure(err, "MessageQueue listener succesfully created and listening")
mqListener.Start(&wg)
mqSender, err := messageq.NewSender(uploaderMqBridge, mqLineageBridge, brokerURL, outputQueue)
util.Ensure(err, "MessageQueue sender succesfully created and listening")
mqSender.Start(&wg)
usChecker := manager.NewUpstreamChecker(envcomm.ManagerURL, envcomm.PipelineHash, envcomm.ProcessName, 5)
usChecker.Start(&wg)
usChecker.Register <- mqListener.CanExit
lineageTracker := lineage.NewMqTracker(envcomm.ProcessName, envcomm.PipelineHash, brokerURL, mqLineageBridge)
lineageTracker.Start(&wg)
wg.Wait()
}