diff --git a/proxy/pong/pong.go b/proxy/pong/pong.go index f05202d..2aaf702 100644 --- a/proxy/pong/pong.go +++ b/proxy/pong/pong.go @@ -48,29 +48,30 @@ var Conn_TimeOut int64 = 1 * 60 var Stream_Idle_TimeOut int64 = 5 * 60 var Stream_Close_Time time.Duration = 3 var Peek_Time time.Duration = 30 +var Max_Live int64 = 1 * 60 * 60 var Frame_Buf_Size uint16 = 8192 var cfg = config.Config func NewPong(p *proxy.Proto) pong { - if cfg != nil { if cfg.PerMaxCount < 10 { Per_Max_Count = cfg.PerMaxCount } else if cfg.PerMaxCount > 300 { Per_Max_Count = 300 } - if cfg.WriteBuffer > 2048 { Frame_Buf_Size = uint16(cfg.WriteBuffer) } } - pg := pong{Proto: p, connPool: &connPool{conns: []*pongConn{}}} + + pg := pong{Proto: p, connPool: &connPool{id: uuid.NewString(), conns: []*pongConn{}}} go pg.start() return pg } type connPool struct { + id string conns []*pongConn mu sync.Mutex ctx context.Context @@ -102,12 +103,24 @@ func (cp *connPool) Remove(id string) { } func (cp *connPool) GetIdle() (pc *pongConn) { - for _, c := range cp.conns { - if c.closing { + var t *pongConn + n := len(cp.conns) + + for i := 0; i < n; { + t = cp.conns[i] + if t.old && i < n-1 { + t = cp.conns[n-1] + cp.conns[n-1] = cp.conns[i] + cp.conns[i] = t + n-- + } + i++ + if t.old { continue } - if Per_Max_Count > len(c.streams) { - pc = c + + if len(t.streams) < Per_Max_Count { + pc = t break } } @@ -115,13 +128,19 @@ func (cp *connPool) GetIdle() (pc *pongConn) { } func (cp *connPool) close() { - for _, c := range cp.conns { - c.CloseWithCode(proxy.ERR_FORCE_CLOSE) - } + cp.clear() if cp.cancel != nil { cp.cancel() } + cp.conns = nil } + +func (cp *connPool) clear() { + for _, c := range cp.conns { + c.CloseWithCode(proxy.ERR_FORCE_CLOSE) + } +} + func (cp *connPool) start() { tiker := time.NewTicker(Peek_Time * time.Second) defer tiker.Stop() @@ -132,10 +151,16 @@ func (cp *connPool) start() { return case <-tiker.C: t1 := time.Now().UnixMilli() + log.Printf("pool[%s] conns cap %d len:%d\n", cp.id, cap(cp.conns), len(cp.conns)) for _, pc := range cp.conns { + log.Printf("raw conn streams len:%d\n", len(pc.streams)) if pc.closing { continue } + + if (t1-pc.created)/1000 > Max_Live { + pc.old = true + } if len(pc.streams) <= 0 { if pc.idling { dt := (t1 - pc.idleTime) / 1000 @@ -153,7 +178,7 @@ func (cp *connPool) start() { dt := min(t1-s.lastRead, t1-s.lastWrite) / 1000 if dt > Stream_Idle_TimeOut { s.CloseWithError(proxy.ERR_CONN_TIMEOUT) - log.Printf("stream[%d]released (idle %d second)(%s)", s.id, dt, s.t.Addr) + log.Printf("stream[%d]released (idle %d second)(%s)\n", s.id, dt, s.t.Addr) } } } @@ -217,6 +242,10 @@ func (p pong) Who(src net.Conn) (ver byte, err error) { return } +func (p *pong) ClearConn() { + p.clear() +} + //override func (p *pong) Open(t *proxy.Tunnel) (err error) { pc := p.GetIdle() @@ -294,6 +323,7 @@ type pongConn struct { created int64 idleTime int64 idling bool + old bool } func (pc *pongConn) NextId() (n uint32) { @@ -397,6 +427,7 @@ func (pc *pongConn) CloseWithCode(code byte) { return } pc.closing = true + pc.old = true pc.errCode = code log.Printf("raw conn close (code=%d)", code) pc.Release() @@ -523,10 +554,13 @@ func (pc *pongConn) ReadFrame() error { if i+m > n { m = n - i } - f := Frames.NewDataFrame(sid, m) - if _, err = io.ReadFull(pc.Conn, f.buf[0:m]); err != nil { + + f := Frames.New(ftype, sid, FRAME_DATA) + f.payload = PayloadPool.Get().(*[]byte) + if _, err = io.ReadFull(pc.Conn, (*f.payload)[0:m]); err != nil { return err } + f.size = m s.frmCh <- f i += m } @@ -543,7 +577,8 @@ func (pc *pongConn) ReadFrame() error { } } } - f := Frames.NewCommandFrame(ftype, sid, code) + // f := Frames.NewCommandFrame(ftype, sid, code) + f := Frames.New(ftype, sid, code) s.frmCh <- f } } @@ -635,8 +670,8 @@ func (s *stream) UnlockClose(code byte) { s.t.AddError(err, "") } s.WriteRaw(FRAME_RST, payload) - s.release() log.Printf("stream[%d] released with error %s (%s)", s.id, err.Error(), s.t.Addr) + s.release() } @@ -659,27 +694,35 @@ func (s *stream) CloseByRemote(code byte) (err error) { } func (s *stream) handleFrame(f *frame) (err error) { - if f.ftype == FRAME_DATA { - if _, err = s.pw.Write(f.buf[:f.size]); err != nil { + + switch f.ftype { + case FRAME_DATA: + buf := *f.payload + if _, err = s.pw.Write(buf[:f.size]); err != nil { s.CloseWithError(proxy.ERR_NET) } - Frames.PutDataFrame(f) - } else { - switch f.ftype { - case FRAME_FINISH: //remote send data over, half close - s.pw.CloseWithError(io.EOF) - case FRAME_CLOSE: - s.Close() - case FRAME_RST: //force close - s.CloseByRemote(f.code) - default: - s.CloseWithError(proxy.ERR_COMMAND_UNKNOWN) - } - Frames.PutCommandFrame(f) - } + case FRAME_FINISH: //remote send data over, half close + s.pw.CloseWithError(io.EOF) + case FRAME_CLOSE: + s.Close() + case FRAME_RST: //force close + s.CloseByRemote(f.code) + default: + s.CloseWithError(proxy.ERR_COMMAND_UNKNOWN) + } + // Frames.PutCommandFrame(f) + + Frames.Put(f) return } +var PayloadPool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, Frame_Buf_Size) + return &buf + }, +} + /** frame header +----+-----+-------+------+-----+----+ @@ -689,11 +732,11 @@ func (s *stream) handleFrame(f *frame) (err error) { +----+-----+-------+------+-----+----+ */ type frame struct { - ftype byte - sid uint32 - buf []byte //payload buf - size uint16 //payload size - code byte + ftype byte + sid uint32 + payload *[]byte //payload buf + size uint16 //payload size + code byte } var Frames = frameCache{ @@ -701,40 +744,29 @@ var Frames = frameCache{ New: func() interface{} { return &frame{} }, - }, - dp: sync.Pool{ - New: func() interface{} { - return &frame{buf: make([]byte, Frame_Buf_Size)} - }, - }, + }, } type frameCache struct { - p sync.Pool - dp sync.Pool + p sync.Pool } -func (fc *frameCache) NewDataFrame(sid uint32, size uint16) *frame { - f := fc.dp.Get().(*frame) - f.size = size - f.sid = sid - return f -} - -func (fc *frameCache) NewCommandFrame(ftype byte, sid uint32, code byte) *frame { +func (fc *frameCache) New(ftype byte, sid uint32, code byte) *frame { f := fc.p.Get().(*frame) - f.ftype = ftype - f.code = code f.sid = sid + f.code = code + f.ftype = ftype + f.size = 0 return f } -func (fc *frameCache) PutCommandFrame(f *frame) { +func (fc *frameCache) Put(f *frame) { + if f.ftype == FRAME_DATA { + PayloadPool.Put(f.payload) + } + f.payload = nil fc.p.Put(f) -} -func (fc *frameCache) PutDataFrame(f *frame) { - fc.dp.Put(f) -} +} type Local struct{ pong } diff --git a/proxy/proxy.go b/proxy/proxy.go index 7f9a363..fafebd7 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -164,6 +164,7 @@ type Peer interface { TunnelEventNotifier GetProto() *Proto Stat() *PointStat + ClearConn() } //all proxy protocol proto implement @@ -217,6 +218,10 @@ func (p *Proto) Close() error { return nil } +func (p *Proto) ClearConn() { + +} + type controller interface { ServiceEventNotifier TunnelEventNotifier @@ -276,6 +281,9 @@ func (c *ctrl) ClearTunnels() { for _, t := range c.tunnels { t.CloseWithError(err) } + for _, p := range c.peers { + p.ClearConn() + } } func (c *ctrl) PutPeer(id string, peer Peer) { c.pmu.Lock() @@ -468,7 +476,7 @@ func (c *ctrl) relay(f Filter, t *Tunnel) (err error) { t1 := time.Now().UnixMilli() t.Connected = true - + go func() { n, err := io.Copy(t.Dst, t.Src) // log.Printf("src -> dst %d bytes\n", n) @@ -541,7 +549,7 @@ func (c *container) Start() { case <-tiker.C: st = c.Stat() i++ - if i > 10 { + if i > 3 { i = 0 c.LogStatus(st) log.Printf("coroutine number %d\n", runtime.NumGoroutine()) @@ -549,7 +557,10 @@ func (c *container) Start() { case <-autoTiker.C: if c.isLocal { rule.LazySave() + } else if st != nil { + c.LogStatus(st) } + log.Printf("coroutine number %d\n", runtime.NumGoroutine()) } } }()