-
Notifications
You must be signed in to change notification settings - Fork 0
/
P2PPeers.go
174 lines (155 loc) · 4.68 KB
/
P2PPeers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package epsp
import (
"context"
"net"
traditionalnet "net"
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
)
// P2PPeers is array of *P2PPeer
type P2PPeers []*P2PPeer
// NewP2PServers は、P2PServerを立ち上げます
func (pps *P2PPeers) NewP2PServers(ctx context.Context, mypeerid string, myagent []string, port int, codep2mp func(from *P2PPeer, retval []string) error, ConnectedIPPortPeersList func() []string, incoming uint64) (global bool, err error) {
laddr, err := traditionalnet.ResolveTCPAddr("tcp", "127.0.0.1:"+strconv.Itoa(port))
if err != nil {
err = errors.Wrap(err, "ResolveTCPAddr")
return
}
laddr.IP = nil
l, err := net.ListenTCP(`tcp`, laddr)
if err != nil {
err = errors.Wrap(err, "ListenTCP")
return
}
logln(`[DEBUG] LitenTCP: `, laddr, len(laddr.IP), strings.Join(myagent, `:`))
pschan := make(chan *P2PPeer)
go func(l *traditionalnet.TCPListener) {
for {
if ps, err := NewP2PServer(ctx, l, myagent); err != nil {
logln(`[WARN] `, err)
} else {
pschan <- ps
}
}
}(l)
go func() {
var mu sync.Mutex
timer := time.NewTicker(1 * time.Minute)
for {
select {
case ps := <-pschan:
go func() {
mu.Lock()
*pps = append(*pps, ps)
mu.Unlock()
err = ps.NetLoop(ctx, mypeerid, myagent, ConnectedIPPortPeersList, codep2mp)
if err != nil {
logln(`[INFO] ピア`, ps.PeerID+`: サーバ通信異常終了 `+strings.Join(ps.Agent, `:`), err)
} else {
logln(`[INFO] ピア`, ps.PeerID+`: サーバ通信正常終了 `+strings.Join(ps.Agent, `:`))
}
}()
case <-timer.C:
mu.Lock()
pps.deleteClosedFromList()
pps.deleteUnusedPeer()
pps.deleteManyDuplicatePeer(incoming, 100)
mu.Unlock()
case <-ctx.Done():
timer.Stop()
return
}
}
}()
global = len(laddr.IP) != 0
err = nil
return
}
// AddP2PClients は、P2PClientsを追加します
func (pps *P2PPeers) AddP2PClients(ctx context.Context, mypeerid string, otherPeers []string, myagent []string, codep2mp func(from *P2PPeer, retval []string) error, ConnectedIPPortPeersList func() []string, incoming uint64) {
var wg sync.WaitGroup
var mu sync.Mutex
for i := range otherPeers {
wg.Add(1)
go func(i int) {
pc, err := NewP2PClient(ctx, otherPeers[i], ConnectedIPPortPeersList)
if err != nil {
logln(`[INFO] ピア`+pc.GetPeerIDorIPPort()+`: 接続失敗 `, err)
wg.Done()
} else {
mu.Lock()
*pps = append(*pps, pc)
mu.Unlock()
wg.Done()
err = pc.NetLoop(ctx, mypeerid, myagent, ConnectedIPPortPeersList, codep2mp)
if err != nil {
logln(`[INFO] ピア`, pc.PeerID+`: クライアント通信異常終了 `+strings.Join(pc.Agent, `:`), err)
} else {
logln(`[INFO] ピア`, pc.PeerID+`: クライアント通信正常終了 `+strings.Join(pc.Agent, `:`))
}
}
}(i)
}
wg.Wait()
pps.deleteClosedFromList()
pps.deleteUnusedPeer()
pps.deleteManyDuplicatePeer(incoming, 10)
}
func (pps *P2PPeers) deleteClosedFromList() {
for i := 0; i < len(*pps); i++ {
if !(*pps)[i].IsConn() && time.Since(*(*pps)[i].GetDiscTime()) > 1*time.Minute {
*pps = append((*pps)[:i], (*pps)[i+1:]...)
i--
}
}
}
func (pps *P2PPeers) deleteUnusedPeer() {
for i := 0; i < len(*pps); i++ {
if ((*pps)[i].GetPingRecv() != nil && (time.Since(*(*pps)[i].GetPingRecv()) > 1*time.Hour)) || // Delete connection after 1hour from last pong.
((*pps)[i].GetPingRecv() == nil && (time.Since(*(*pps)[i].GetConnTime()) > 1*time.Hour)) { // Delete conntction if no pong and 1hour past.
(*pps)[i].Close()
logln(`[INFO] ピア` + (*pps)[i].PeerID + `: 未通信、終了`)
}
}
}
func (pps *P2PPeers) deleteManyDuplicatePeer(incoming, rxdup uint64) {
// Close connection to the peers who send many duplicate
for i := range *pps {
if (*pps)[i].RxDup > rxdup && (*pps)[i].IsConn() {
if (*pps)[i].GetRXUniqRate() > incoming/2 {
(*pps)[i].Close()
logln(`[INFO] ピア` + (*pps)[i].PeerID + `: 重複過多、終了`)
}
}
}
}
// NumOfConnectedPeers は、接続中ピアの数を返します
func (pps *P2PPeers) NumOfConnectedPeers() (n uint64) {
for i := range *pps {
if (*pps)[i].IsConn() {
n++
}
}
return n
}
// ConnectedPeersList は、接続中ピアのピアIDのリストを返します
func (pps *P2PPeers) ConnectedPeersList() (ss []string) {
for i := range *pps {
if (*pps)[i].IsConn() && (*pps)[i].PeerID != `` {
ss = append(ss, (*pps)[i].PeerID)
}
}
return
}
// ConnectedIPPortPeersList は、接続中ピアのIPアドレス,ポート,ピアIDのリストを返します
func (pps *P2PPeers) ConnectedIPPortPeersList() (ss []string) {
for i := range *pps {
if (*pps)[i].IsConn() {
ss = append(ss, (*pps)[i].GetIPPortPeerID())
}
}
return
}