-
Notifications
You must be signed in to change notification settings - Fork 0
/
datamesh.go
118 lines (107 loc) · 3.07 KB
/
datamesh.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package datamesh
import (
"fmt"
"github.com/openziti-incubator/datamesh/channel"
"github.com/openziti/dilithium"
"github.com/openziti/foundation/identity/identity"
"github.com/openziti/foundation/transport"
"github.com/openziti/foundation/util/sequence"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"os"
"sync"
)
type Circuit string
type Datamesh struct {
cf *Config
self *identity.TokenId
listeners map[string]*Listener
dialers map[string]*Dialer
nics map[string]NIC
overlay *Overlay
Fwd *Forwarder
Handlers *Handlers
sequence *sequence.Sequence
pool *dilithium.Pool
lock sync.Mutex
instrument dilithium.Instrument
}
func NewDatamesh(cf *Config) *Datamesh {
d := &Datamesh{
cf: cf,
listeners: make(map[string]*Listener),
dialers: make(map[string]*Dialer),
nics: make(map[string]NIC),
overlay: newGraph(),
Fwd: newForwarder(),
Handlers: &Handlers{},
sequence: sequence.NewSequence(),
pool: dilithium.NewPool("link", 128*1024, dilithium.NilInstrumentInstance{}),
}
channel.SetUnderlayRegistrySequence(d.sequence)
d.overlay.addLinkCb = d.addLinkCb
for _, listenerCf := range cf.Listeners {
d.listeners[listenerCf.Id] = NewListener(listenerCf, &identity.TokenId{Token: listenerCf.Id})
logrus.Infof("added listener at [%s]", listenerCf.BindAddress)
}
for _, dialerCf := range cf.Dialers {
d.dialers[dialerCf.Id] = NewDialer(dialerCf, &identity.TokenId{Token: dialerCf.Id})
logrus.Infof("added dialer at [%s]", dialerCf.BindAddress)
}
return d
}
func (self *Datamesh) Start() {
for _, v := range self.listeners {
go v.Listen(self, self.overlay.incoming)
}
self.overlay.start()
}
func (self *Datamesh) DialLink(id string, endpoint transport.Address) (Link, error) {
if dialer, found := self.dialers[id]; found {
l, err := dialer.Dial(self, endpoint)
if err != nil {
return nil, errors.Wrapf(err, "error dialing link at [%s]", endpoint)
}
self.overlay.addLink(l)
return l, nil
} else {
return nil, errors.Errorf("no dialer [%s]", id)
}
}
func (self *Datamesh) InsertNIC(circuitId Circuit, endpoint Endpoint) (NIC, error) {
self.lock.Lock()
defer self.lock.Unlock()
addr, err := self.sequence.NextHash()
if err != nil {
return nil, err
}
path := fmt.Sprintf("/home/plorenz/tmp/dilithium/%v", os.Getpid())
if err := os.MkdirAll(path, 0700); err != nil {
return nil, err
}
instrument, err := dilithium.NewMetricsInstrumentNoCf(path, 250, true)
if err != nil {
return nil, err
}
ii := instrument.NewInstance("test")
nic := newNIC(self, circuitId, Address(addr), endpoint, ii)
txAlg, err := self.cf.Profile.(dilithium.TxAlgorithmProfile).Create(ii)
if err != nil {
return nil, err
}
if err := nic.(*nicImpl).SetTxAlgorithm(txAlg); err != nil {
return nil, err
}
if err := nic.(*nicImpl).Start(); err != nil {
return nil, err
}
self.nics[addr] = nic
self.Fwd.AddDestination(nic)
return nic, nil
}
func (self *Datamesh) addLinkCb(l *link) {
self.Fwd.AddDestination(l)
for _, handler := range self.Handlers.linkUpHandlers {
handler(l)
}
}