/
controller.go
129 lines (124 loc) · 3.33 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package controller
import (
"context"
"math/rand"
"time"
"github.com/ugorji/go/codec"
"github.com/p9c/node9/pkg/broadcast"
chain "github.com/p9c/node9/pkg/chain"
"github.com/p9c/node9/pkg/chain/fork"
"github.com/p9c/node9/pkg/chain/mining"
"github.com/p9c/node9/pkg/conte"
"github.com/p9c/node9/pkg/gcm"
"github.com/p9c/node9/pkg/log"
)
type Blocks []*mining.BlockTemplate
// Run starts a controller instance
func Run(cx *conte.Xt) (cancel context.CancelFunc) {
if len(cx.StateCfg.ActiveMiningAddrs) <1 {
log.WARN("no mining addresses - not starting controller")
return
}
log.WARN("starting controller")
var mh codec.MsgpackHandle
var ctx context.Context
ctx, cancel = context.WithCancel(context.Background())
ciph := gcm.GetCipher(*cx.Config.MinerPass)
outAddr, err := broadcast.New(*cx.Config.BroadcastAddress)
if err != nil {
log.ERROR(err)
cancel()
return
}
blockChan := make(chan Blocks)
bytes := make([]byte, 0, broadcast.MaxDatagramSize)
enc := codec.NewEncoderBytes(&bytes, &mh)
go func() {
for {
// work dispatch loop
select {
case lb := <-blockChan:
// send out block broadcast
log.DEBUG("sending out block broadcast")
// serialize blocks
log.SPEW(lb)
err := enc.Encode(lb)
if err != nil {
log.ERROR(err)
break
}
err = broadcast.Send(outAddr, bytes, ciph, broadcast.Template)
if err != nil {
log.ERROR(err)
}
// reset the bytes for next round
bytes = bytes[:0]
enc.ResetBytes(&bytes)
case <-ctx.Done():
// cancel has been called
return
// default:
}
}
}()
//connCount := cx.RPCServer.Cfg.ConnMgr.ConnectedCount()
//current := cx.RPCServer.Cfg.SyncMgr.IsCurrent()
//// if out of sync or disconnected,
//// once a second send out empty blocks
//for (connCount < 1 && !*cx.Config.Solo) || !current {
// time.Sleep(time.Second)
// connCount = cx.RPCServer.Cfg.ConnMgr.ConnectedCount()
// current = cx.RPCServer.Cfg.SyncMgr.IsCurrent()
// log.WARN("waiting for sync/peers", connCount, current)
// select {
// case <-ctx.Done():
// log.WARN("cancelled before initial connection/sync")
// return
// default:
// }
//}
blocks := Blocks{}
// create subscriber for new block event
cx.RPCServer.Cfg.Chain.Subscribe(func(n *chain.
Notification) {
switch n.Type {
case chain.NTBlockConnected:
log.WARN("new block found")
blocks = Blocks{}
// generate Blocks
for algo := range fork.List[fork.GetCurrent(cx.RPCServer.Cfg.Chain.
BestSnapshot().Height+1)].Algos {
// Choose a payment address at random.
rand.Seed(time.Now().UnixNano())
payToAddr := cx.StateCfg.ActiveMiningAddrs[rand.Intn(len(cx.
StateCfg.ActiveMiningAddrs))]
template, err := cx.RPCServer.Cfg.Generator.NewBlockTemplate(0,
payToAddr, algo)
if err != nil {
log.ERROR("failed to create new block template:", err)
continue
}
blocks = append(blocks, template)
}
blockChan <- blocks
}
})
// goroutine loop checking for connection and sync status
go func() {
for {
time.Sleep(time.Second)
connCount := cx.RPCServer.Cfg.ConnMgr.ConnectedCount()
current := cx.RPCServer.Cfg.SyncMgr.IsCurrent()
// if out of sync or disconnected,
// once a second send out empty blocks
if connCount < 1 || !current {
blockChan <- Blocks{}
}
select {
case <-ctx.Done():
break
}
}
}()
return
}