From 0f39f6693cc3b84357e190eab367a35b72abe253 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Jochum?= Date: Fri, 7 Oct 2022 21:47:03 +0200 Subject: [PATCH 1/3] fix(transport/memory): Improve the memory transport, 4x speed --- transport/memory.go | 61 ++++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/transport/memory.go b/transport/memory.go index 09f1f9d97e..fcdbc7c63e 100644 --- a/transport/memory.go +++ b/transport/memory.go @@ -2,8 +2,10 @@ package transport import ( "context" + "encoding/gob" "errors" "fmt" + "io" "math/rand" "net" "sync" @@ -14,8 +16,12 @@ import ( ) type memorySocket struct { - recv chan *Message - send chan *Message + server bool + name string + crecv *gob.Decoder + csend *gob.Encoder + srecv *gob.Decoder + ssend *gob.Encoder // sock exit exit chan bool // listener exit @@ -27,7 +33,6 @@ type memorySocket struct { // for send/recv Timeout timeout time.Duration ctx context.Context - sync.RWMutex } type memoryClient struct { @@ -52,9 +57,6 @@ type memoryTransport struct { } func (ms *memorySocket) Recv(m *Message) error { - ms.RLock() - defer ms.RUnlock() - ctx := ms.ctx if ms.timeout > 0 { var cancel context.CancelFunc @@ -69,9 +71,18 @@ func (ms *memorySocket) Recv(m *Message) error { return errors.New("connection closed") case <-ms.lexit: return errors.New("server connection closed") - case cm := <-ms.recv: - *m = *cm + default: + if ms.server { + if err := ms.srecv.Decode(m); err != nil { + return err + } + } else { + if err := ms.crecv.Decode(m); err != nil { + return err + } + } } + return nil } @@ -84,9 +95,6 @@ func (ms *memorySocket) Remote() string { } func (ms *memorySocket) Send(m *Message) error { - ms.RLock() - defer ms.RUnlock() - ctx := ms.ctx if ms.timeout > 0 { var cancel context.CancelFunc @@ -101,14 +109,22 @@ func (ms *memorySocket) Send(m *Message) error { return errors.New("connection closed") case <-ms.lexit: return errors.New("server connection closed") - case ms.send <- m: + default: + if ms.server { + if err := ms.ssend.Encode(m); err != nil { + return err + } + } else { + if err := ms.csend.Encode(m); err != nil { + return err + } + } } + return nil } func (ms *memorySocket) Close() error { - ms.Lock() - defer ms.Unlock() select { case <-ms.exit: return nil @@ -141,10 +157,12 @@ func (m *memoryListener) Accept(fn func(Socket)) error { return nil case c := <-m.conn: go fn(&memorySocket{ + server: true, + name: "server", lexit: c.lexit, exit: c.exit, - send: c.recv, - recv: c.send, + ssend: c.ssend, + srecv: c.srecv, local: c.Remote(), remote: c.Local(), timeout: m.topts.Timeout, @@ -168,10 +186,17 @@ func (m *memoryTransport) Dial(addr string, opts ...DialOption) (Client, error) o(&options) } + creader, swriter := io.Pipe() + sreader, cwriter := io.Pipe() + client := &memoryClient{ &memorySocket{ - send: make(chan *Message), - recv: make(chan *Message), + server: false, + name: "client", + csend: gob.NewEncoder(cwriter), + crecv: gob.NewDecoder(creader), + ssend: gob.NewEncoder(swriter), + srecv: gob.NewDecoder(sreader), exit: make(chan bool), lexit: listener.exit, local: addr, From 904f4d13eac8fee7dd90a9bd23724af444ed5fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Jochum?= Date: Fri, 7 Oct 2022 21:56:50 +0200 Subject: [PATCH 2/3] fix(transport/memory): comments --- transport/memory.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/transport/memory.go b/transport/memory.go index fcdbc7c63e..9450f89f67 100644 --- a/transport/memory.go +++ b/transport/memory.go @@ -16,12 +16,16 @@ import ( ) type memorySocket struct { + // True server mode, False client mode server bool - name string - crecv *gob.Decoder - csend *gob.Encoder - srecv *gob.Decoder - ssend *gob.Encoder + // Client receiver of io.Pipe with gob + crecv *gob.Decoder + // Client sender of the io.Pipe with gob + csend *gob.Encoder + // Server receiver of the io.Pip with gob + srecv *gob.Decoder + // Server sender of the io.Pip with gob + ssend *gob.Encoder // sock exit exit chan bool // listener exit @@ -158,7 +162,6 @@ func (m *memoryListener) Accept(fn func(Socket)) error { case c := <-m.conn: go fn(&memorySocket{ server: true, - name: "server", lexit: c.lexit, exit: c.exit, ssend: c.ssend, @@ -191,13 +194,11 @@ func (m *memoryTransport) Dial(addr string, opts ...DialOption) (Client, error) client := &memoryClient{ &memorySocket{ - server: false, - name: "client", - csend: gob.NewEncoder(cwriter), - crecv: gob.NewDecoder(creader), - ssend: gob.NewEncoder(swriter), - srecv: gob.NewDecoder(sreader), - exit: make(chan bool), + server: false, + csend: gob.NewEncoder(cwriter), + crecv: gob.NewDecoder(creader), + ssend: gob.NewEncoder(swriter), + srecv: gob.NewDecoder(sreader), exit: make(chan bool), lexit: listener.exit, local: addr, remote: addr, From b58a951f011dc9dc305aed99364268dc406f7777 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Jochum?= Date: Fri, 7 Oct 2022 22:01:35 +0200 Subject: [PATCH 3/3] fix(transport/memory): Replace custom errors with io.EOF --- transport/memory.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/transport/memory.go b/transport/memory.go index 9450f89f67..4cfe47a364 100644 --- a/transport/memory.go +++ b/transport/memory.go @@ -72,9 +72,11 @@ func (ms *memorySocket) Recv(m *Message) error { case <-ctx.Done(): return ctx.Err() case <-ms.exit: - return errors.New("connection closed") + // connection closed + return io.EOF case <-ms.lexit: - return errors.New("server connection closed") + // Server connection closed + return io.EOF default: if ms.server { if err := ms.srecv.Decode(m); err != nil { @@ -110,9 +112,11 @@ func (ms *memorySocket) Send(m *Message) error { case <-ctx.Done(): return ctx.Err() case <-ms.exit: - return errors.New("connection closed") + // connection closed + return io.EOF case <-ms.lexit: - return errors.New("server connection closed") + // Server connection closed + return io.EOF default: if ms.server { if err := ms.ssend.Encode(m); err != nil {