-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
134 lines (115 loc) · 2.64 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package internal
import (
"net"
"sync"
"time"
"v2ray.com/core/common/signal"
)
// ConnectionRecyler is the interface for recycling connections.
type ConnectionRecyler interface {
// Put returns a connection back to a connection pool.
Put(ConnectionID, net.Conn)
}
type NoOpConnectionRecyler struct{}
func (NoOpConnectionRecyler) Put(ConnectionID, net.Conn) {}
// ExpiringConnection is a connection that will expire in certain time.
type ExpiringConnection struct {
conn net.Conn
expire time.Time
}
// Expired returns true if the connection has expired.
func (ec *ExpiringConnection) Expired() bool {
return ec.expire.Before(time.Now())
}
// Pool is a connection pool.
type Pool struct {
sync.RWMutex
connsByDest map[ConnectionID][]*ExpiringConnection
cleanupToken *signal.Semaphore
}
// NewConnectionPool creates a new Pool.
func NewConnectionPool() *Pool {
p := &Pool{
connsByDest: make(map[ConnectionID][]*ExpiringConnection),
cleanupToken: signal.NewSemaphore(1),
}
return p
}
// Get returns a connection with matching connection ID. Nil if not found.
func (p *Pool) Get(id ConnectionID) net.Conn {
p.Lock()
defer p.Unlock()
list, found := p.connsByDest[id]
if !found {
return nil
}
connIdx := -1
for idx, conn := range list {
if !conn.Expired() {
connIdx = idx
break
}
}
if connIdx == -1 {
return nil
}
listLen := len(list)
conn := list[connIdx]
if connIdx != listLen-1 {
list[connIdx] = list[listLen-1]
}
list = list[:listLen-1]
p.connsByDest[id] = list
return conn.conn
}
func (p *Pool) isEmpty() bool {
p.RLock()
defer p.RUnlock()
return len(p.connsByDest) == 0
}
func (p *Pool) cleanup() {
defer p.cleanupToken.Signal()
for !p.isEmpty() {
time.Sleep(time.Second * 5)
expiredConns := make([]net.Conn, 0, 16)
p.Lock()
for dest, list := range p.connsByDest {
validConns := make([]*ExpiringConnection, 0, len(list))
for _, conn := range list {
if conn.Expired() {
expiredConns = append(expiredConns, conn.conn)
} else {
validConns = append(validConns, conn)
}
}
if len(validConns) != len(list) {
p.connsByDest[dest] = validConns
}
}
p.Unlock()
for _, conn := range expiredConns {
conn.Close()
}
}
}
// Put implements ConnectionRecyler.Put().
func (p *Pool) Put(id ConnectionID, conn net.Conn) {
expiringConn := &ExpiringConnection{
conn: conn,
expire: time.Now().Add(time.Second * 4),
}
p.Lock()
defer p.Unlock()
list, found := p.connsByDest[id]
if !found {
list = []*ExpiringConnection{expiringConn}
} else {
list = append(list, expiringConn)
}
p.connsByDest[id] = list
select {
case <-p.cleanupToken.Wait():
go p.cleanup()
default:
}
}