Skip to content

Commit

Permalink
fix(conn): sfn reconnect and connection close (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
venjiang committed Aug 9, 2022
1 parent 3b121bd commit 2c6f12e
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 25 deletions.
1 change: 1 addition & 0 deletions core/client.go
Expand Up @@ -154,6 +154,7 @@ func (c *Client) handleFrame() {
c.logger.Errorf("%sconnection timeout, err=%v, zipper=%s", ClientLogPrefix, e, c.addr)
c.setState(ConnStateDisconnected)
} else if e, ok := err.(*quic.ApplicationError); ok {
c.setState(ConnStateDisconnected)
c.logger.Infof("%sapplication error, err=%v, errcode=%v", ClientLogPrefix, e, e.ErrorCode)
if yerr.Is(e.ErrorCode, yerr.ErrorCodeRejected) {
// if connection is rejected(eg: authenticate fails) from server
Expand Down
8 changes: 6 additions & 2 deletions core/connection.go
Expand Up @@ -53,8 +53,12 @@ func newConnection(name string, clientID string, clientType ClientType, metadata
func (c *connection) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
c.closed = true
return c.stream.Close()
var err error
if !c.closed {
c.closed = true
err = c.stream.Close()
}
return err
}

// Name returns the name of the connection, which is set by clients
Expand Down
51 changes: 31 additions & 20 deletions core/server.go
Expand Up @@ -48,6 +48,8 @@ type Server struct {
beforeHandlers []FrameHandler
afterHandlers []FrameHandler
connectionCloseHandlers []ConnectionHandler
listener Listener
wg *sync.WaitGroup
}

// NewServer create a Server instance.
Expand All @@ -56,6 +58,7 @@ func NewServer(name string, opts ...ServerOption) *Server {
name: name,
connector: newConnector(),
downstreams: make(map[string]*Client),
wg: new(sync.WaitGroup),
}
s.Init(opts...)

Expand Down Expand Up @@ -105,56 +108,57 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error {
logger.Errorf("%slistener.Listen: err=%v", ServerLogPrefix, err)
return err
}
defer listener.Close()
s.listener = listener
// defer listener.Close()
logger.Printf("%s✅ [%s][%d] Listening on: %s, MODE: %s, QUIC: %v, AUTH: %s", ServerLogPrefix, s.name, os.Getpid(), listener.Addr(), mode(), listener.Versions(), s.authNames())

s.state = ConnStateConnected
// loop
for {
// create a new connection when new yomo-client connected
sctx, cancel := context.WithCancel(ctx)
defer cancel()

conn, err := listener.Accept(sctx)
conn, err := s.listener.Accept(sctx)
if err != nil {
logger.Errorf("%screate connection error: %v", ServerLogPrefix, err)
logger.Errorf("%slistener accept connections error: %v", ServerLogPrefix, err)
return err
}

// connection close handlers on server shutdown
// defer s.doConnectionCloseHandlers(conn)
s.wg.Add(1)
connID := GetConnID(conn)
logger.Infof("%s❤️1/ new connection: %s", ServerLogPrefix, connID)

go func(ctx context.Context, qconn quic.Connection) {
// connection close handlers
defer func() {
for _, h := range s.connectionCloseHandlers {
h(qconn)
}
}()
// connection close handlers on client connect timeout
defer s.doConnectionCloseHandlers(qconn)
for {
logger.Infof("%s❤️2/ waiting for new stream", ServerLogPrefix)
stream, err := qconn.AcceptStream(ctx)
if err != nil {
// if client close the connection, then we should close the connection
// @CC: when Source close the connection, it won't affect connectors
name := "--"
name := "-"
clientID := "-"
if conn := s.connector.Get(connID); conn != nil {
conn.Close()
// connector
s.connector.Remove(connID)
route := s.router.Route(conn.Metadata())
if route != nil {
route.Remove(connID)
}
name = conn.Name()
clientID = conn.ClientID()
conn.Close()
}
logger.Printf("%s💔 [%s](%s) close the connection: %v", ServerLogPrefix, name, connID, err)
logger.Printf("%s💔 [%s][%s](%s) close the connection: %v", ServerLogPrefix, name, clientID, connID, err)
break
}
defer stream.Close()

logger.Infof("%s❤️3/ [stream:%d] created, connID=%s", ServerLogPrefix, stream.StreamID(), connID)
// process frames on stream
// c := newContext(connID, stream)
c := newContext(conn, stream)
defer c.Clean()
s.handleConnection(c)
Expand All @@ -166,12 +170,10 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error {

// Close will shutdown the server.
func (s *Server) Close() error {
// if s.stream != nil {
// if err := s.stream.Close(); err != nil {
// logger.Errorf("%sClose(): %v", ServerLogPrefix, err)
// return err
// }
// }
// listener
if s.listener != nil {
s.listener.Close()
}
// router
if s.router != nil {
s.router.Clean()
Expand All @@ -180,6 +182,7 @@ func (s *Server) Close() error {
if s.connector != nil {
s.connector.Clean()
}
s.wg.Wait()
return nil
}

Expand Down Expand Up @@ -588,3 +591,11 @@ func authName(name string) string {

return name
}

func (s *Server) doConnectionCloseHandlers(qconn quic.Connection) {
defer s.wg.Done()
logger.Debugf("%s🖤 [%s] quic connection closed", ServerLogPrefix, qconn.RemoteAddr())
for _, h := range s.connectionCloseHandlers {
h(qconn)
}
}
2 changes: 1 addition & 1 deletion core/yerr/errors.go
Expand Up @@ -29,7 +29,7 @@ type ErrorCode uint64

const (
// ErrorCodeClientAbort client abort
ErrorCodeClientAbort ErrorCode = 0x00
ErrorCodeClientAbort ErrorCode = 0xC7
// ErrorCodeUnknown unknown error
ErrorCodeUnknown ErrorCode = 0xC0
// ErrorCodeClosed net closed
Expand Down
6 changes: 4 additions & 2 deletions zipper.go
Expand Up @@ -229,14 +229,16 @@ func (z *zipper) Addr() string {
// Close will close a connection. If zipper is Server, close the server. If zipper is Client, close the client.
func (z *zipper) Close() error {
if z.server != nil {
logger.Debugf("%sserver close()", zipperLogPrefix)
if err := z.server.Close(); err != nil {
logger.Errorf("%s Close(): %v", zipperLogPrefix, err)
logger.Errorf("%sserver close(): %v", zipperLogPrefix, err)
return err
}
}
if z.client != nil {
logger.Debugf("%sclient close()", zipperLogPrefix)
if err := z.client.Close(); err != nil {
logger.Errorf("%s Close(): %v", zipperLogPrefix, err)
logger.Errorf("%sclient close(): %v", zipperLogPrefix, err)
return err
}
}
Expand Down
2 changes: 2 additions & 0 deletions zipper_notwindows.go
Expand Up @@ -26,6 +26,8 @@ func (z *zipper) init() {
logger.Printf("Received signal: %s", p1)
if p1 == syscall.SIGTERM || p1 == syscall.SIGINT {
logger.Printf("graceful shutting down ... %s", p1)
// waiting for the server to finish processing the current request
z.Close()
os.Exit(0)
// close(sgnl)
} else if p1 == syscall.SIGUSR2 {
Expand Down

0 comments on commit 2c6f12e

Please sign in to comment.