forked from zenazn/goji
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn.go
149 lines (119 loc) · 2.61 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package listener
import (
"errors"
"io"
"net"
"sync"
"time"
)
type conn struct {
net.Conn
shard *shard
mode mode
mu sync.Mutex // Protects the state machine below
busy bool // connection is in use (i.e., not idle)
closed bool // connection is closed
disowned bool // if true, this connection is no longer under our management
}
// This intentionally looks a lot like the one in package net.
var errClosing = errors.New("use of closed network connection")
func (c *conn) init() error {
c.shard.wg.Add(1)
if shouldExit := c.shard.track(c); shouldExit {
c.Close()
return errClosing
}
return nil
}
func (c *conn) Read(b []byte) (n int, err error) {
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
if c.disowned {
return
}
// This protects against a Close/Read race. We're not really
// concerned about the general case (it's fundamentally racy),
// but are mostly trying to prevent a race between a new request
// getting read off the wire in one thread while the connection
// is being gracefully shut down in another.
if c.closed && err == nil {
n = 0
err = errClosing
return
}
if c.mode != Manual && !c.busy && !c.closed {
c.busy = true
c.shard.markInUse(c)
}
}()
return c.Conn.Read(b)
}
func (c *conn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed || c.disowned {
return errClosing
}
c.closed = true
c.shard.disown(c)
defer c.shard.wg.Done()
return c.Conn.Close()
}
func (c *conn) SetReadDeadline(t time.Time) error {
c.mu.Lock()
if !c.disowned && c.mode == Deadline {
defer c.markIdle()
}
c.mu.Unlock()
return c.Conn.SetReadDeadline(t)
}
func (c *conn) ReadFrom(r io.Reader) (int64, error) {
return io.Copy(c.Conn, r)
}
func (c *conn) markIdle() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.busy {
return
}
c.busy = false
if exit := c.shard.markIdle(c); exit && !c.closed && !c.disowned {
c.closed = true
c.shard.disown(c)
defer c.shard.wg.Done()
c.Conn.Close()
return
}
}
func (c *conn) markInUse() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.busy && !c.closed && !c.disowned {
c.busy = true
c.shard.markInUse(c)
}
}
func (c *conn) closeIfIdle() error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.busy && !c.closed && !c.disowned {
c.closed = true
c.shard.disown(c)
defer c.shard.wg.Done()
return c.Conn.Close()
}
return nil
}
var errAlreadyDisowned = errors.New("listener: conn already disowned")
func (c *conn) disown() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.disowned {
return errAlreadyDisowned
}
c.shard.disown(c)
c.disowned = true
c.shard.wg.Done()
return nil
}