Skip to content

Commit

Permalink
Collect client&server connection count, better debug message.
Browse files Browse the repository at this point in the history
Some debug messages now contain "client: <remote addr>", we can group debug
message by client by grep and sorting on remote addr.
  • Loading branch information
cyfdecyf committed Jul 28, 2013
1 parent 8147afc commit 76dbf0e
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 50 deletions.
4 changes: 2 additions & 2 deletions http.go
Expand Up @@ -449,7 +449,7 @@ func parseRequest(c *clientConn, r *Request) (err error) {
var s []byte
reader := c.bufRd
// make actual timeout a little longer than keep-alive value sent to client
setConnReadTimeout(c,
setConnReadTimeout(c.Conn,
clientConnTimeout+time.Duration(c.timeoutCnt)*time.Second, "parseRequest")
// parse request line
if s, err = reader.ReadSlice('\n'); err != nil {
Expand All @@ -458,7 +458,7 @@ func parseRequest(c *clientConn, r *Request) (err error) {
}
return err
}
unsetConnReadTimeout(c, "parseRequest")
unsetConnReadTimeout(c.Conn, "parseRequest")
// debug.Printf("Request line %s", s)

r.reset()
Expand Down
2 changes: 2 additions & 0 deletions main.go
Expand Up @@ -50,6 +50,8 @@ func main() {
initSiteStat()
initPAC() // initPAC uses siteStat, so must init after site stat

initStat()

if len(parentProxy) == 0 {
info.Println("no parent proxy server, can't handle blocked sites")
} else {
Expand Down
130 changes: 82 additions & 48 deletions proxy.go
Expand Up @@ -175,23 +175,24 @@ func (py *Proxy) Serve(done chan byte) {
debug.Println("client connection:", err)
continue
}
if debug {
debug.Println("new client:", conn.RemoteAddr())
}
c := newClientConn(conn, py)
go c.serve()
}
}

func newClientConn(rwc net.Conn, proxy *Proxy) *clientConn {
func newClientConn(cli net.Conn, proxy *Proxy) *clientConn {
buf := httpBuf.Get()
c := &clientConn{
Conn: rwc,
Conn: cli,
serverConn: map[string]*serverConn{},
buf: buf,
bufRd: bufio.NewReaderFromBuf(rwc, buf),
bufRd: bufio.NewReaderFromBuf(cli, buf),
proxy: proxy,
}
if debug {
debug.Printf("cli(%s) connected, total %d clients\n",
cli.RemoteAddr(), incCliCnt())
}
return c
}

Expand All @@ -204,15 +205,16 @@ func (c *clientConn) releaseBuf() {
}
}

func (c *clientConn) Close() error {
func (c *clientConn) Close() {
c.releaseBuf()
for _, sv := range c.serverConn {
sv.Close()
sv.Close(c)
}
if debug {
debug.Printf("Client %v connection closed\n", c.RemoteAddr())
debug.Printf("cli(%s) closed, total %d clients\n",
c.RemoteAddr(), decCliCnt())
}
return c.Conn.Close()
c.Conn.Close()
}

func isSelfURL(url string) bool {
Expand Down Expand Up @@ -269,6 +271,16 @@ func (c *clientConn) shouldRetry(r *Request, sv *serverConn, re error) bool {
return true
}

func dbgPrintRq(c *clientConn, r *Request) {
if dbgRq {
if verbose {
dbgRq.Printf("cli(%s) request %s\n%s", c.RemoteAddr(), r, r.Verbose())
} else {
dbgRq.Printf("cli(%s) request %s\n", c.RemoteAddr(), r)
}
}
}

func (c *clientConn) serve() {
var r Request
var rp Response
Expand Down Expand Up @@ -296,7 +308,7 @@ func (c *clientConn) serve() {

if err = parseRequest(c, &r); err != nil {
if debug {
debug.Printf("client: %s parse request %v\n", c.RemoteAddr(), err)
debug.Printf("cli(%s) parse request %v\n", c.RemoteAddr(), err)
}
if err == io.EOF || isErrConnReset(err) {
return
Expand All @@ -316,13 +328,7 @@ func (c *clientConn) serve() {
}
// next getRequest should start with timeout count 0
c.timeoutCnt = 0
if dbgRq {
if verbose {
dbgRq.Printf("request from client %s: %s\n%s", c.RemoteAddr(), &r, r.Verbose())
} else {
dbgRq.Printf("request from client %s: %s\n", c.RemoteAddr(), &r)
}
}
dbgPrintRq(c, &r)

if isSelfURL(r.URL.HostPort) {
if err = c.serveSelfURL(&r); err != nil {
Expand Down Expand Up @@ -359,7 +365,7 @@ func (c *clientConn) serve() {
retry:
r.tryOnce()
if bool(debug) && r.isRetry() {
errl.Printf("%s retry request tryCnt=%d %v\n", c.RemoteAddr(), r.tryCnt, &r)
errl.Printf("cli(%s) retry request tryCnt=%d %v\n", c.RemoteAddr(), r.tryCnt, &r)
}
if sv, err = c.getServerConn(&r); err != nil {
// debug.Printf("Failed to get serverConn for %s %v\n", c.RemoteAddr(), r)
Expand All @@ -374,7 +380,7 @@ func (c *clientConn) serve() {

if r.isConnect {
err = sv.doConnect(&r, c)
sv.Close()
sv.Close(c)
if c.shouldRetry(&r, sv, err) {
// connection for CONNECT is not reused, no need to remove
goto retry
Expand All @@ -394,7 +400,9 @@ func (c *clientConn) serve() {
}

if !r.ConnectionKeepAlive {
// debug.Println("close client connection because request has no keep-alive")
if debug {
debug.Println("cli(%s) close connection", c.RemoteAddr())
}
return
}
}
Expand All @@ -417,7 +425,7 @@ func (c *clientConn) handleServerReadError(r *Request, sv *serverConn, err error
var errMsg string
if err == io.EOF {
if debug {
debug.Printf("client %s; %s read from server EOF\n", c.RemoteAddr(), msg)
debug.Printf("cli(%s) %s read from server EOF\n", c.RemoteAddr(), msg)
}
return RetryError{err}
}
Expand All @@ -442,6 +450,18 @@ func (c *clientConn) handleServerWriteError(r *Request, sv *serverConn, err erro
return RetryError{err}
}

func dbgPrintRep(c *clientConn, r *Request, rp *Response) {
if dbgRep {
if verbose {
dbgRep.Printf("cli(%s) response %s %s\n%s",
c.RemoteAddr(), r, rp, rp.Verbose())
} else {
dbgRep.Printf("cli(%s) response %s %s\n",
c.RemoteAddr(), r, rp)
}
}
}

func (c *clientConn) readResponse(sv *serverConn, r *Request, rp *Response) (err error) {
sv.initBuf()
defer func() {
Expand Down Expand Up @@ -476,14 +496,8 @@ func (c *clientConn) readResponse(sv *serverConn, r *Request, rp *Response) (err
if _, err = c.Write(rp.rawResponse()); err != nil {
return err
}
if dbgRep {
if verbose {
// extra space after resposne to align with request debug message
dbgRep.Printf("response to client %v: %s %s\n%s", c.RemoteAddr(), r, rp, rp.Verbose())
} else {
dbgRep.Printf("response to client %v: %s %s\n", c.RemoteAddr(), r, rp)
}
}
dbgPrintRep(c, r, rp)

rp.releaseBuf()

if rp.hasBody(r.Method) {
Expand Down Expand Up @@ -513,13 +527,21 @@ func (c *clientConn) readResponse(sv *serverConn, r *Request, rp *Response) (err
debug.Printf("[Finished] %v request %s %s\n", c.RemoteAddr(), r.Method, r.URL)
}
*/
var remoteAddr string // avoid evaluating c.RemoteAddr() in the following debug call
if debug {
remoteAddr = c.RemoteAddr().String()
}
if rp.ConnectionKeepAlive {
if rp.KeepAlive == time.Duration(0) {
sv.willCloseOn = time.Now().Add(defaultServerConnTimeout)
} else {
sv.willCloseOn = time.Now().Add(rp.KeepAlive - time.Second)
debug.Printf("cli(%s) server %s keep-alive %v\n",
remoteAddr, sv.url.HostPort, rp.KeepAlive)
sv.willCloseOn = time.Now().Add(rp.KeepAlive)
}
} else {
debug.Printf("cli(%s) server %s close connection\n",
remoteAddr, sv.url.HostPort)
c.removeServerConn(sv)
}
return
Expand All @@ -532,16 +554,17 @@ func (c *clientConn) shouldCleanServerConn() bool {

// Remove all maybe closed server connection
func (c *clientConn) cleanServerConn() {
if debug {
debug.Printf("%s client clean up idle server connection", c.RemoteAddr())
}
now := time.Now()
c.cleanedOn = now
for _, sv := range c.serverConn {
if now.After(sv.willCloseOn) {
c.removeServerConn(sv)
}
}
if debug {
debug.Printf("cli(%s) close idle connections, remains %d\n",
c.RemoteAddr(), len(c.serverConn))
}
}

func (c *clientConn) getServerConn(r *Request) (sv *serverConn, err error) {
Expand All @@ -558,7 +581,7 @@ func (c *clientConn) getServerConn(r *Request) (sv *serverConn, err error) {
}

func (c *clientConn) removeServerConn(sv *serverConn) {
sv.Close()
sv.Close(c)
delete(c.serverConn, sv.url.HostPort)
}

Expand All @@ -573,7 +596,7 @@ func connectDirect(url *URL, siteInfo *VisitCnt) (conn, error) {
debug.Printf("error direct connect to: %s %v\n", url.HostPort, err)
return zeroConn, err
}
debug.Println("connected to", url.HostPort)
// debug.Println("directly connected to", url.HostPort)
return conn{ctDirectConn, c, nil}, nil
}

Expand All @@ -588,6 +611,8 @@ func maybeBlocked(err error) bool {
return isErrTimeout(err) || isErrConnReset(err)
}

// Connect to requested server according to whether it's visit count.
// If direct connection fails, try parent proxies.
func (c *clientConn) connect(r *Request, siteInfo *VisitCnt) (srvconn conn, err error) {
var errMsg string
if config.AlwaysProxy {
Expand Down Expand Up @@ -662,6 +687,10 @@ func (c *clientConn) createServerConn(r *Request) (*serverConn, error) {
return sv, nil
}
c.serverConn[sv.url.HostPort] = sv
if debug {
debug.Printf("cli(%s) connected to %s %d concurrent connections\n",
c.RemoteAddr(), sv.url.HostPort, incSrvConnCnt(sv.url.HostPort))
}
// client will connect to differnet servers in a single proxy connection
// debug.Printf("serverConn to for client %v %v\n", c.RemoteAddr(), c.serverConn)
return sv, nil
Expand Down Expand Up @@ -702,14 +731,17 @@ func (sv *serverConn) initBuf() {
}
}

func (sv *serverConn) Close() error {
debug.Println("Closing server conn:", sv.url.HostPort)
func (sv *serverConn) Close(c *clientConn) error {
sv.bufRd = nil
if sv.buf != nil {
// debug.Println("release server buffer")
httpBuf.Put(sv.buf)
sv.buf = nil
}
if debug {
debug.Printf("cli(%s) close connection to %s remains %d concurrent connections\n",
c.RemoteAddr(), sv.url.HostPort, decSrvConnCnt(sv.url.HostPort))
}
return sv.Conn.Close()
}

Expand All @@ -736,11 +768,11 @@ func (sv *serverConn) setReadTimeout(msg string) {
if sv.siteInfo.OnceBlocked() && to > defaultReadTimeout {
to = minReadTimeout
}
setConnReadTimeout(sv, to, msg)
setConnReadTimeout(sv.Conn, to, msg)
}

func (sv *serverConn) unsetReadTimeout(msg string) {
unsetConnReadTimeout(sv, msg)
unsetConnReadTimeout(sv.Conn, msg)
}

func (sv *serverConn) maybeSSLErr(cliStart time.Time) bool {
Expand Down Expand Up @@ -851,7 +883,7 @@ func copyClient2Server(c *clientConn, sv *serverConn, r *Request, srvStopped not
defer func() {
if deadlineIsSet {
// maybe need to retry, should unset timeout here because
unsetConnReadTimeout(c, "cli->srv after err")
unsetConnReadTimeout(c.Conn, "cli->srv after err")
}
done <- 1
}()
Expand Down Expand Up @@ -880,7 +912,8 @@ func copyClient2Server(c *clientConn, sv *serverConn, r *Request, srvStopped not
}
}
if debug {
debug.Printf("cli->srv client %s released read buffer\n", c.RemoteAddr())
debug.Printf("cli(%s)->srv(%s) released read buffer\n",
c.RemoteAddr(), r.URL.HostPort)
}
c.releaseBuf()
}
Expand All @@ -896,11 +929,11 @@ func copyClient2Server(c *clientConn, sv *serverConn, r *Request, srvStopped not
for {
// debug.Println("cli->srv")
if sv.maybeFake() {
setConnReadTimeout(c, time.Second, "cli->srv")
setConnReadTimeout(c.Conn, time.Second, "cli->srv")
deadlineIsSet = true
} else if deadlineIsSet {
// maybeFake may trun to false after timeout, but timeout should be unset
unsetConnReadTimeout(c, "cli->srv before read")
unsetConnReadTimeout(c.Conn, "cli->srv before read")
deadlineIsSet = false
}
if n, err = c.Read(buf); err != nil {
Expand Down Expand Up @@ -942,7 +975,7 @@ func (sv *serverConn) doConnect(r *Request, c *clientConn) (err error) {
// debug.Printf("%s Sending CONNECT request to http proxy server\n", c.RemoteAddr())
if err = sv.sendHTTPProxyRequest(r, c); err != nil {
if debug {
debug.Printf("%s error sending CONNECT request to http proxy server: %v\n",
debug.Printf("cli(%s) error sending CONNECT request to http proxy server: %v\n",
c.RemoteAddr(), err)
}
return err
Expand All @@ -951,7 +984,8 @@ func (sv *serverConn) doConnect(r *Request, c *clientConn) (err error) {
// debug.Printf("send connection confirmation to %s->%s\n", c.RemoteAddr(), r.URL.HostPort)
if _, err = c.Write(connEstablished); err != nil {
if debug {
debug.Printf("%s error sending 200 Connecion established: %v\n", c.RemoteAddr(), err)
debug.Printf("cli(%s) error sending 200 Connecion established: %v\n",
c.RemoteAddr(), err)
}
return err
}
Expand All @@ -963,7 +997,7 @@ func (sv *serverConn) doConnect(r *Request, c *clientConn) (err error) {
go func() {
// debug.Printf("doConnect: cli(%s)->srv(%s)\n", c.RemoteAddr(), r.URL.HostPort)
cli2srvErr = copyClient2Server(c, sv, r, srvStopped, done)
sv.Close() // close sv to force read from server in copyServer2Client return
sv.Close(c) // close sv to force read from server in copyServer2Client return
}()

// debug.Printf("doConnect: srv(%s)->cli(%s)\n", r.URL.HostPort, c.RemoteAddr())
Expand Down Expand Up @@ -1049,7 +1083,7 @@ func (sv *serverConn) doRequest(c *clientConn, r *Request, rp *Response) (err er
return
}
if debug {
debug.Printf("%s %s body sent\n", c.RemoteAddr(), r)
debug.Printf("cli(%s) %s request body sent\n", c.RemoteAddr(), r)
}
}
r.state = rsSent
Expand Down

0 comments on commit 76dbf0e

Please sign in to comment.