Skip to content

Commit

Permalink
fix frame cache and conn pool
Browse files Browse the repository at this point in the history
  • Loading branch information
pingworlds committed Mar 6, 2022
1 parent ef0a3ca commit bc2e7a9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 61 deletions.
150 changes: 91 additions & 59 deletions proxy/pong/pong.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,30 @@ var Conn_TimeOut int64 = 1 * 60
var Stream_Idle_TimeOut int64 = 5 * 60
var Stream_Close_Time time.Duration = 3
var Peek_Time time.Duration = 30
var Max_Live int64 = 1 * 60 * 60

var Frame_Buf_Size uint16 = 8192
var cfg = config.Config

func NewPong(p *proxy.Proto) pong {

if cfg != nil {
if cfg.PerMaxCount < 10 {
Per_Max_Count = cfg.PerMaxCount
} else if cfg.PerMaxCount > 300 {
Per_Max_Count = 300
}

if cfg.WriteBuffer > 2048 {
Frame_Buf_Size = uint16(cfg.WriteBuffer)
}
}
pg := pong{Proto: p, connPool: &connPool{conns: []*pongConn{}}}

pg := pong{Proto: p, connPool: &connPool{id: uuid.NewString(), conns: []*pongConn{}}}
go pg.start()
return pg
}

type connPool struct {
id string
conns []*pongConn
mu sync.Mutex
ctx context.Context
Expand Down Expand Up @@ -102,26 +103,44 @@ func (cp *connPool) Remove(id string) {
}

func (cp *connPool) GetIdle() (pc *pongConn) {
for _, c := range cp.conns {
if c.closing {
var t *pongConn
n := len(cp.conns)

for i := 0; i < n; {
t = cp.conns[i]
if t.old && i < n-1 {
t = cp.conns[n-1]
cp.conns[n-1] = cp.conns[i]
cp.conns[i] = t
n--
}
i++
if t.old {
continue
}
if Per_Max_Count > len(c.streams) {
pc = c

if len(t.streams) < Per_Max_Count {
pc = t
break
}
}
return
}

func (cp *connPool) close() {
for _, c := range cp.conns {
c.CloseWithCode(proxy.ERR_FORCE_CLOSE)
}
cp.clear()
if cp.cancel != nil {
cp.cancel()
}
cp.conns = nil
}

func (cp *connPool) clear() {
for _, c := range cp.conns {
c.CloseWithCode(proxy.ERR_FORCE_CLOSE)
}
}

func (cp *connPool) start() {
tiker := time.NewTicker(Peek_Time * time.Second)
defer tiker.Stop()
Expand All @@ -132,10 +151,16 @@ func (cp *connPool) start() {
return
case <-tiker.C:
t1 := time.Now().UnixMilli()
log.Printf("pool[%s] conns cap %d len:%d\n", cp.id, cap(cp.conns), len(cp.conns))
for _, pc := range cp.conns {
log.Printf("raw conn streams len:%d\n", len(pc.streams))
if pc.closing {
continue
}

if (t1-pc.created)/1000 > Max_Live {
pc.old = true
}
if len(pc.streams) <= 0 {
if pc.idling {
dt := (t1 - pc.idleTime) / 1000
Expand All @@ -153,7 +178,7 @@ func (cp *connPool) start() {
dt := min(t1-s.lastRead, t1-s.lastWrite) / 1000
if dt > Stream_Idle_TimeOut {
s.CloseWithError(proxy.ERR_CONN_TIMEOUT)
log.Printf("stream[%d]released (idle %d second)(%s)", s.id, dt, s.t.Addr)
log.Printf("stream[%d]released (idle %d second)(%s)\n", s.id, dt, s.t.Addr)
}
}
}
Expand Down Expand Up @@ -217,6 +242,10 @@ func (p pong) Who(src net.Conn) (ver byte, err error) {
return
}

func (p *pong) ClearConn() {
p.clear()
}

//override
func (p *pong) Open(t *proxy.Tunnel) (err error) {
pc := p.GetIdle()
Expand Down Expand Up @@ -294,6 +323,7 @@ type pongConn struct {
created int64
idleTime int64
idling bool
old bool
}

func (pc *pongConn) NextId() (n uint32) {
Expand Down Expand Up @@ -397,6 +427,7 @@ func (pc *pongConn) CloseWithCode(code byte) {
return
}
pc.closing = true
pc.old = true
pc.errCode = code
log.Printf("raw conn close (code=%d)", code)
pc.Release()
Expand Down Expand Up @@ -523,10 +554,13 @@ func (pc *pongConn) ReadFrame() error {
if i+m > n {
m = n - i
}
f := Frames.NewDataFrame(sid, m)
if _, err = io.ReadFull(pc.Conn, f.buf[0:m]); err != nil {

f := Frames.New(ftype, sid, FRAME_DATA)
f.payload = PayloadPool.Get().(*[]byte)
if _, err = io.ReadFull(pc.Conn, (*f.payload)[0:m]); err != nil {
return err
}
f.size = m
s.frmCh <- f
i += m
}
Expand All @@ -543,7 +577,8 @@ func (pc *pongConn) ReadFrame() error {
}
}
}
f := Frames.NewCommandFrame(ftype, sid, code)
// f := Frames.NewCommandFrame(ftype, sid, code)
f := Frames.New(ftype, sid, code)
s.frmCh <- f
}
}
Expand Down Expand Up @@ -635,8 +670,8 @@ func (s *stream) UnlockClose(code byte) {
s.t.AddError(err, "")
}
s.WriteRaw(FRAME_RST, payload)
s.release()
log.Printf("stream[%d] released with error %s (%s)", s.id, err.Error(), s.t.Addr)
s.release()

}

Expand All @@ -659,27 +694,35 @@ func (s *stream) CloseByRemote(code byte) (err error) {
}

func (s *stream) handleFrame(f *frame) (err error) {
if f.ftype == FRAME_DATA {
if _, err = s.pw.Write(f.buf[:f.size]); err != nil {

switch f.ftype {
case FRAME_DATA:
buf := *f.payload
if _, err = s.pw.Write(buf[:f.size]); err != nil {
s.CloseWithError(proxy.ERR_NET)
}
Frames.PutDataFrame(f)
} else {
switch f.ftype {
case FRAME_FINISH: //remote send data over, half close
s.pw.CloseWithError(io.EOF)
case FRAME_CLOSE:
s.Close()
case FRAME_RST: //force close
s.CloseByRemote(f.code)
default:
s.CloseWithError(proxy.ERR_COMMAND_UNKNOWN)
}
Frames.PutCommandFrame(f)
}
case FRAME_FINISH: //remote send data over, half close
s.pw.CloseWithError(io.EOF)
case FRAME_CLOSE:
s.Close()
case FRAME_RST: //force close
s.CloseByRemote(f.code)
default:
s.CloseWithError(proxy.ERR_COMMAND_UNKNOWN)
}
// Frames.PutCommandFrame(f)

Frames.Put(f)
return
}

var PayloadPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, Frame_Buf_Size)
return &buf
},
}

/**
frame header
+----+-----+-------+------+-----+----+
Expand All @@ -689,52 +732,41 @@ func (s *stream) handleFrame(f *frame) (err error) {
+----+-----+-------+------+-----+----+
*/
type frame struct {
ftype byte
sid uint32
buf []byte //payload buf
size uint16 //payload size
code byte
ftype byte
sid uint32
payload *[]byte //payload buf
size uint16 //payload size
code byte
}

var Frames = frameCache{
p: sync.Pool{
New: func() interface{} {
return &frame{}
},
},
dp: sync.Pool{
New: func() interface{} {
return &frame{buf: make([]byte, Frame_Buf_Size)}
},
},
},
}

type frameCache struct {
p sync.Pool
dp sync.Pool
p sync.Pool
}

func (fc *frameCache) NewDataFrame(sid uint32, size uint16) *frame {
f := fc.dp.Get().(*frame)
f.size = size
f.sid = sid
return f
}

func (fc *frameCache) NewCommandFrame(ftype byte, sid uint32, code byte) *frame {
func (fc *frameCache) New(ftype byte, sid uint32, code byte) *frame {
f := fc.p.Get().(*frame)
f.ftype = ftype
f.code = code
f.sid = sid
f.code = code
f.ftype = ftype
f.size = 0
return f
}

func (fc *frameCache) PutCommandFrame(f *frame) {
func (fc *frameCache) Put(f *frame) {
if f.ftype == FRAME_DATA {
PayloadPool.Put(f.payload)
}
f.payload = nil
fc.p.Put(f)
}
func (fc *frameCache) PutDataFrame(f *frame) {
fc.dp.Put(f)
}
}

type Local struct{ pong }

Expand Down
15 changes: 13 additions & 2 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type Peer interface {
TunnelEventNotifier
GetProto() *Proto
Stat() *PointStat
ClearConn()
}

//all proxy protocol proto implement
Expand Down Expand Up @@ -217,6 +218,10 @@ func (p *Proto) Close() error {
return nil
}

func (p *Proto) ClearConn() {

}

type controller interface {
ServiceEventNotifier
TunnelEventNotifier
Expand Down Expand Up @@ -276,6 +281,9 @@ func (c *ctrl) ClearTunnels() {
for _, t := range c.tunnels {
t.CloseWithError(err)
}
for _, p := range c.peers {
p.ClearConn()
}
}
func (c *ctrl) PutPeer(id string, peer Peer) {
c.pmu.Lock()
Expand Down Expand Up @@ -468,7 +476,7 @@ func (c *ctrl) relay(f Filter, t *Tunnel) (err error) {

t1 := time.Now().UnixMilli()
t.Connected = true

go func() {
n, err := io.Copy(t.Dst, t.Src)
// log.Printf("src -> dst %d bytes\n", n)
Expand Down Expand Up @@ -541,15 +549,18 @@ func (c *container) Start() {
case <-tiker.C:
st = c.Stat()
i++
if i > 10 {
if i > 3 {
i = 0
c.LogStatus(st)
log.Printf("coroutine number %d\n", runtime.NumGoroutine())
}
case <-autoTiker.C:
if c.isLocal {
rule.LazySave()
} else if st != nil {
c.LogStatus(st)
}
log.Printf("coroutine number %d\n", runtime.NumGoroutine())
}
}
}()
Expand Down

0 comments on commit bc2e7a9

Please sign in to comment.