forked from PlatONnetwork/PlatON-Go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
consensus_dialed.go
146 lines (126 loc) · 4.24 KB
/
consensus_dialed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package p2p
import (
"fmt"
"github.com/PlatONnetwork/PlatON-Go/log"
"github.com/PlatONnetwork/PlatON-Go/p2p/discover"
"strings"
)
type dialedTasks struct {
queue []*dialTask
maxPeers int
removeConsensusPeerFn removeConsensusPeerFn
}
func NewDialedTasks(maxPeers int, removeConsensusPeerFn removeConsensusPeerFn) *dialedTasks {
tasks := &dialedTasks{
maxPeers: maxPeers,
removeConsensusPeerFn: removeConsensusPeerFn,
}
return tasks
}
func (tasks *dialedTasks) InitRemoveConsensusPeerFn(removeConsensusPeerFn removeConsensusPeerFn) {
tasks.removeConsensusPeerFn = removeConsensusPeerFn
}
func (tasks *dialedTasks) AddTask(task *dialTask) error {
// whether the task is already in the queue
// 1 if exists,remove task to the end of the queue;
// 2 if not exists(not exceeding the maximum limit),add new task directly to the end of the queue
// 3 if not exists(exceeding the maximum limit),Remove queue head task and add new task to the end of the queue
index := tasks.index(task)
log.Info("[before add]Consensus dialed task list before AddTask operation", "task queue", tasks.description())
if index != -1 {
log.Info("Consensus dialed task exists,Remove new task to the end of the queue", "index", index)
tasks.pollIndex(index)
tasks.offer(task)
} else if tasks.size() < tasks.maxPeers {
log.Info("Consensus dialed task not exists,Not exceeding the maximum limit,Add new task directly to the end of the queue", "tasks size", tasks.size(), "maxConsensusPeers", tasks.maxPeers)
tasks.offer(task)
} else {
log.Info("Consensus dialed task not exists,Exceeding the maximum limit,Remove queue head task and add new task to the end of the queue", "tasks size", tasks.size(), "maxConsensusPeers", tasks.maxPeers)
pollTask := tasks.queue[0] // queue head task
tasks.removeConsensusPeerFn(pollTask.dest) // disconnect head peer
tasks.queue = tasks.queue[1:] // remove queue head task
tasks.offer(task)
}
log.Info("[after add]Consensus dialed task list after AddTask operation", "task queue", tasks.description())
return nil
}
func (tasks *dialedTasks) RemoveTask(NodeID discover.NodeID) error {
log.Info("[before remove]Consensus dialed task list before RemoveTask operation", "task queue", tasks.description())
if !tasks.isEmpty() {
for i, t := range tasks.queue {
if t.dest.ID == NodeID {
tasks.queue = append(tasks.queue[:i], tasks.queue[i+1:]...)
break
}
}
}
log.Info("[after remove]Consensus dialed task list after RemoveTask operation", "task queue", tasks.description())
return nil
}
func (tasks *dialedTasks) ListTask() []*dialTask {
log.Info("[after list]Consensus dialed task list after ListTask operation", "task queue", tasks.description())
return tasks.queue
}
// adding new task to the end of the queue
func (tasks *dialedTasks) offer(task *dialTask) {
tasks.queue = append(tasks.queue, task)
}
// remove the first task in the queue
func (tasks *dialedTasks) poll() *dialTask {
if tasks.isEmpty() {
log.Info("dialedTasks is empty!")
return nil
}
pollTask := tasks.queue[0]
tasks.queue = tasks.queue[1:]
return pollTask
}
// remove the specify index task in the queue
func (tasks *dialedTasks) pollIndex(index int) *dialTask {
if tasks.isEmpty() {
log.Info("dialedTasks is empty!")
return nil
}
pollTask := tasks.queue[index]
tasks.queue = append(tasks.queue[:index], tasks.queue[index+1:]...)
return pollTask
}
// index of task in the queue
func (tasks *dialedTasks) index(task *dialTask) int {
for i, t := range tasks.queue {
if t.dest.ID == task.dest.ID {
return i
}
}
return -1
}
// queue size
func (tasks *dialedTasks) size() int {
return len(tasks.queue)
}
// clear queue
func (tasks *dialedTasks) clear() bool {
if tasks.isEmpty() {
log.Info("queue is empty!")
return false
}
for i := 0; i < tasks.size(); i++ {
tasks.queue[i] = nil
}
tasks.queue = nil
return true
}
// whether the queue is empty
func (tasks *dialedTasks) isEmpty() bool {
if len(tasks.queue) == 0 {
return true
}
return false
}
func (tasks *dialedTasks) description() string {
var description []string
for _, t := range tasks.queue {
description = append(description, fmt.Sprintf("%x", t.dest.ID[:8]))
}
return strings.Join(description, ",")
}