-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.go
131 lines (116 loc) · 2.79 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package httpc
import (
"container/heap"
"fmt"
"http"
"os"
)
// Manages connection pools for all domains.
type client struct {
limitGlobal int
limitPerDomain int
reqs chan *clientRequest
poolGetter chan poolPromise
}
type clientRequest struct {
r *http.Request
success chan *http.Response
failure chan os.Error
}
type poolPromise struct {
name string
promise chan *pool
}
func (c *client) getPool(domain string) *pool {
pp := poolPromise{domain, make(chan *pool)}
c.poolGetter <- pp
return <-pp.promise
}
func (c *client) managePools(poolMaker func(string) *pool) {
pools := make(map[string]*pool)
for {
pp := <-c.poolGetter
p, ok := pools[pp.name]
if !ok {
p = poolMaker(pp.name)
pools[pp.name] = p
}
pp.promise <- p
}
}
func (c *client) accept() {
for {
r := <-c.reqs
d := r.r.URL.Host
p := c.getPool(d)
p.reqs <- r
}
}
func (c *client) drive(incReq, decReq chan *pool) {
q := new(poolQueue)
heap.Init(q)
active := 0
for {
var p *pool
select {
case p = <-incReq:
p.pending++
case p = <-decReq:
p.active--
active--
}
if p.pos >= 0 {
heap.Remove(q, p.pos)
}
// Don't want pri changing consurrently under our nose, so we copy it for ourselves.
p.pri = p.wantPri
if p.pending > 0 && p.active < cap(p.conns) {
heap.Push(q, p)
}
for active < c.limitGlobal && q.Len() > 0 {
p = heap.Pop(q).(*pool)
p.pending--
p.active++
active++
go func() {
x := (p.execute <- true)
x = x
}()
}
}
}
// A client sends http requests over the wire. It makes connections as
// necessary.
//
// If the global or per-domain connection limit has been hit, requests will
// queue up. Waiting requests will be sent in order of priority (first
// low, then high). A request's priority is 5000 by default, and it can be set
// with the X-Pri header:
// X-Pri: 2000
// sets the priority of the request to 2000. X-Pri will never be sent over the
// wire. It is used by the client only internally.
func NewClient(limitGlobal, limitPerDomain int) Sender {
c := &client{limitGlobal, limitPerDomain, make(chan *clientRequest), make(chan poolPromise)}
incReq := make(chan *pool)
decReq := make(chan *pool)
go c.managePools(func(addr string) *pool { return newPool(addr, c.limitPerDomain, incReq, decReq) })
go c.accept()
go c.drive(incReq, decReq)
return c
}
func (c *client) Send(req *http.Request) (resp *http.Response, err os.Error) {
if req.URL, err = http.ParseURL(req.RawURL); err != nil {
return
}
if req.URL.Scheme != "http" {
return nil, os.ErrorString(fmt.Sprintf("bad scheme %s", req.URL.Scheme))
}
cr := &clientRequest{req, make(chan *http.Response), make(chan os.Error)}
c.reqs <- cr
select {
case resp = <-cr.success:
case err = <-cr.failure:
}
return
}
func shouldRedirect(status int) bool { return false }