Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
wujunzhuo committed Aug 17, 2022
1 parent ec24982 commit de0d434
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 81 deletions.
129 changes: 63 additions & 66 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package core
import (
"context"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -33,7 +33,8 @@ type Client struct {
state ConnState // state of the connection
processor func(*frame.DataFrame) // function to invoke when data arrived
receiver func(*frame.BackflowFrame) // function to invoke when data is processed
errfn func(error) // function to invoke when error occured
errorfn func(error) // function to invoke when error occured
closefn func() // function to invoke when client closed
addr string // the address of server connected to
mu sync.Mutex
opts ClientOptions
Expand Down Expand Up @@ -122,55 +123,57 @@ func (c *Client) connect(ctx context.Context, addr string) error {
return err
}

// todo: set ConnStateConnected when AcceptedFrame received
c.state = ConnStateConnected
c.localAddr = conn.LocalAddr().String()

c.logger.Printf("%s❤️ [%s][%s](%s) is connected to YoMo-Zipper %s", ClientLogPrefix, c.name, c.clientID, c.localAddr, addr)

// receiving frames
go func() {
reason, msg := c.handleFrame()
c.logger.Infof("%shandleFrame: %s | %s", ClientLogPrefix, reason, msg)
closeConn, closeClient, err := c.handleFrame()
c.logger.Errorf("%shandleFrame: %v, %v, %v", ClientLogPrefix, closeConn, closeClient, err)

stream.Close()
if closeConn {
c.conn.CloseWithError(yerr.ErrorCodeClientAbort.To(), err.Error())
}

switch reason {
case CloseReasonKeepAliveTimeout:
c.mu.Lock()
if c.state != ConnStateClosed {
c.state = ConnStateDisconnected
}
c.mu.Unlock()
case CloseReasonLocalClosed:
case CloseReasonPeerClosed:
c.closeWithError(false, reason, msg)
default:
c.closeWithError(true, reason, msg)
c.mu.Lock()

if c.state != ConnStateClosed {
c.errc <- err
}

if closeClient {
c.close()
} else if c.state != ConnStateClosed {
c.state = ConnStateDisconnected
}

c.mu.Unlock()
}()

return nil
}

// handleFrame handles the logic when receiving frame from server.
func (c *Client) handleFrame() (CloseReason, string) {
func (c *Client) handleFrame() (bool, bool, error) {
for {
// this will block until a frame is received
f, err := c.fs.ReadFrame()
if err != nil {
if e, ok := err.(*quic.IdleTimeoutError); ok {
return CloseReasonKeepAliveTimeout, e.Error()
if err == io.EOF {
return true, false, err
} else if strings.HasPrefix(err.Error(), "unknown frame type") {
continue
} else if e, ok := err.(*quic.IdleTimeoutError); ok {
return false, false, e
} else if e, ok := err.(*quic.ApplicationError); ok {
if e.Remote {
return CloseReasonPeerClosed, e.ErrorMessage
}
return CloseReasonLocalClosed, e.ErrorMessage
} else if err == io.EOF {
return CloseReasonPeerClosed, "conn read EOF"
return false, e.ErrorCode == yerr.ErrorCodeGoaway.To() || e.ErrorCode == yerr.ErrorCodeRejected.To(), e
} else if errors.Is(err, net.ErrClosed) {
return CloseReasonLocalClosed, err.Error()
return false, false, err
}
return CloseReasonUnknownError, fmt.Sprintf("%T: %s", err, err.Error())
return true, false, err
}

// read frame
Expand All @@ -180,32 +183,28 @@ func (c *Client) handleFrame() (CloseReason, string) {
switch frameType {
case frame.TagOfRejectedFrame:
if v, ok := f.(*frame.RejectedFrame); ok {
return CloseReasonReceivedRejected, v.Message()
return true, true, errors.New(v.Message())
}
case frame.TagOfGoawayFrame:
if v, ok := f.(*frame.GoawayFrame); ok {
return CloseReasonReceivedGoaway, v.Message()
return true, true, errors.New(v.Message())
}
case frame.TagOfDataFrame: // DataFrame carries user's data
if c.state == ConnStateConnected {
if v, ok := f.(*frame.DataFrame); ok {
c.logger.Debugf("%sreceive DataFrame, tag=%#x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage())
if c.processor == nil {
c.logger.Warnf("%sprocessor is nil", ClientLogPrefix)
} else {
c.processor(v)
}
if v, ok := f.(*frame.DataFrame); ok {
c.logger.Debugf("%sreceive DataFrame, tag=%#x, tid=%s, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.TransactionID(), v.GetCarriage())
if c.processor == nil {
c.logger.Warnf("%sprocessor is nil", ClientLogPrefix)
} else {
c.processor(v)
}
}
case frame.TagOfBackflowFrame:
if c.state == ConnStateConnected {
if v, ok := f.(*frame.BackflowFrame); ok {
c.logger.Debugf("%sreceive BackflowFrame, tag=%#x, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.GetCarriage())
if c.receiver == nil {
c.logger.Warnf("%sreceiver is nil", ClientLogPrefix)
} else {
c.receiver(v)
}
if v, ok := f.(*frame.BackflowFrame); ok {
c.logger.Debugf("%sreceive BackflowFrame, tag=%#x, carry=%# x", ClientLogPrefix, v.GetDataTag(), v.GetCarriage())
if c.receiver == nil {
c.logger.Warnf("%sreceiver is nil", ClientLogPrefix)
} else {
c.receiver(v)
}
}
default:
Expand All @@ -216,30 +215,24 @@ func (c *Client) handleFrame() (CloseReason, string) {

// Close the client.
func (c *Client) Close() error {
return c.closeWithError(true, CloseReasonLocalClosed, "client ask to close")
}

func (c *Client) closeWithError(closeConn bool, reason string, msg string) error {
c.mu.Lock()
defer c.mu.Unlock()

c.conn.CloseWithError(yerr.ErrorCodeClientAbort.To(), "client ask to close")
return c.close()
}

func (c *Client) close() error {
if c.state == ConnStateClosed {
return nil
}
c.state = ConnStateClosed

c.logger.Printf("%s💔 close the connection, name:%s, id:%s, addr:%s", ClientLogPrefix, c.name, c.clientID, c.addr)

err := errors.New(reason + " | " + msg)
c.errc <- err
// close error channel so that close handler function will be called
close(c.errc)

if closeConn && c.conn != nil {
if err := c.conn.CloseWithError(yerr.ErrorCodeClientAbort.To(), err.Error()); err != nil {
c.logger.Errorf("%sconnection.Close(): %v", ClientLogPrefix, err)
}
}

c.state = ConnStateClosed
return nil
}

Expand Down Expand Up @@ -281,13 +274,12 @@ func (c *Client) reconnect(ctx context.Context, addr string) {
c.logger.Debugf("%s[%s](%s) context.Done()", ClientLogPrefix, c.name, c.localAddr)
return
case err, ok := <-c.errc:
fmt.Println("wujunzhuo error channel")
if !ok {
c.logger.Debugf("%s[%s](%s) error channel closed", ClientLogPrefix, c.name, c.localAddr)
return
if c.errorfn != nil && err != nil {
c.errorfn(err)
}
if c.errfn != nil && err != nil {
c.errfn(err)
if !ok && c.closefn != nil {
c.closefn()
return
}
case <-t.C:
if c.state == ConnStateDisconnected {
Expand Down Expand Up @@ -377,7 +369,12 @@ func (c *Client) Logger() log.Logger {

// SetErrorHandler set error handler
func (c *Client) SetErrorHandler(fn func(err error)) {
c.errfn = fn
c.errorfn = fn
}

// SetCloseHandler set close handler
func (c *Client) SetCloseHandler(fn func()) {
c.closefn = fn
}

// ClientID return the client ID
Expand Down
14 changes: 0 additions & 14 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,6 @@ const (
ConnStateClosed ConnState = "Closed"
)

// CloseReason represents the reason of the closed connection.
type CloseReason = string

// CloseReason represents the reason of the closed connection.
const (
CloseReasonUnknownError CloseReason = "Unknown Error"
CloseReasonIllegalProtocol CloseReason = "Illegal Protocol"
CloseReasonKeepAliveTimeout CloseReason = "KeepAlive Timeout"
CloseReasonLocalClosed CloseReason = "Local Closed"
CloseReasonPeerClosed CloseReason = "Peer Closed"
CloseReasonReceivedRejected CloseReason = "Received Rejected"
CloseReasonReceivedGoaway CloseReason = "Received Goaway"
)

// Prefix is the prefix for logger.
const (
ClientLogPrefix = "\033[36m[core:client]\033[0m "
Expand Down
1 change: 0 additions & 1 deletion core/stream_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func ParseFrame(stream io.Reader) (frame.Frame, error) {
case 0x80 | byte(frame.TagOfBackflowFrame):
return frame.DecodeToBackflowFrame(buf)
default:
// todo: distinguish between illegal protocol and newer unknown frame from peer side
return nil, fmt.Errorf("unknown frame type, buf[0]=%#x", buf[0])
}
}
Expand Down

0 comments on commit de0d434

Please sign in to comment.