Skip to content

Commit

Permalink
Merge branch 'master' into feat/gid
Browse files Browse the repository at this point in the history
  • Loading branch information
woorui committed Oct 20, 2023
2 parents caa2b14 + d4c8685 commit 7f0b6ea
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 187 deletions.
72 changes: 32 additions & 40 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
// Client is the abstraction of a YoMo-Client. a YoMo-Client can be
// Source, Upstream Zipper or StreamFunction.
type Client struct {
zipperAddr string
name string // name of the client
clientID string // id of the client
clientType ClientType // type of the client
processor func(*frame.DataFrame) // function to invoke when data arrived
receiver func(*frame.BackflowFrame) // function to invoke when data is processed
errorfn func(error) // function to invoke when error occured
opts *clientOptions
logger *slog.Logger
Logger *slog.Logger
tracerProvider oteltrace.TracerProvider

// ctx and ctxCancel manage the lifecycle of client.
Expand All @@ -39,30 +40,27 @@ type Client struct {
}

// NewClient creates a new YoMo-Client.
func NewClient(appName string, clientType ClientType, opts ...ClientOption) *Client {
func NewClient(appName, zipperAddr string, clientType ClientType, opts ...ClientOption) *Client {
option := defaultClientOption()

for _, o := range opts {
o(option)
}
clientID := id.New()

logger := option.logger.With("component", clientType.String(), "client_id", clientID, "client_name", appName)

if option.credential != nil {
logger.Info("use credential", "credential_name", option.credential.Name())
}
logger := option.logger

ctx, ctxCancel := context.WithCancelCause(context.Background())

return &Client{
zipperAddr: zipperAddr,
name: appName,
clientID: clientID,
processor: func(df *frame.DataFrame) { logger.Warn("the processor has not been set") },
receiver: func(bf *frame.BackflowFrame) { logger.Warn("the receiver has not been set") },
clientType: clientType,
opts: option,
logger: logger,
Logger: logger,
tracerProvider: option.tracerProvider,
errorfn: func(err error) { logger.Error("client err", "err", err) },
writeFrameChan: make(chan frame.Frame),
Expand All @@ -85,8 +83,8 @@ func newConnectResult(conn quic.Connection, fs *FrameStream, err error) *connect
}
}

func (c *Client) connect(ctx context.Context, addr string) *connectResult {
conn, err := quic.DialAddr(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig)
func (c *Client) connect(ctx context.Context) *connectResult {
conn, err := quic.DialAddr(ctx, c.zipperAddr, c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
return newConnectResult(conn, nil, err)
}
Expand All @@ -98,6 +96,10 @@ func (c *Client) connect(ctx context.Context, addr string) *connectResult {

fs := NewFrameStream(stream, y3codec.Codec(), y3codec.PacketReadWriter())

if credential := c.opts.credential; credential != nil {
c.Logger.Info("use credential", "credential_name", credential.Name())
}

hf := &frame.HandshakeFrame{
Name: c.name,
ID: c.clientID,
Expand Down Expand Up @@ -130,7 +132,7 @@ func (c *Client) connect(ctx context.Context, addr string) *connectResult {
}
}

func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Connection, fs *FrameStream) {
func (c *Client) runBackground(ctx context.Context, conn quic.Connection, fs *FrameStream) {
reconnection := make(chan struct{})

go c.handleReadFrames(fs, reconnection)
Expand All @@ -149,12 +151,12 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne
}
case <-reconnection:
reconnect:
cr := c.connect(ctx, addr)
cr := c.connect(ctx)
if err := cr.err; err != nil {
if errors.As(err, new(ErrAuthenticateFailed)) {
return
}
c.logger.Error("reconnect to zipper error", "err", cr.err)
c.Logger.Error("reconnect to zipper error", "err", cr.err)
time.Sleep(time.Second)
goto reconnect
}
Expand All @@ -165,27 +167,21 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne
}

// Connect connect client to server.
func (c *Client) Connect(ctx context.Context, addr string) error {
if c.clientType == ClientTypeStreamFunction && len(c.opts.observeDataTags) == 0 {
return errors.New("yomo: streamFunction cannot observe data because the required tag has not been set")
}

c.logger = c.logger.With("zipper_addr", addr)

func (c *Client) Connect(ctx context.Context) error {
connect:
result := c.connect(ctx, addr)
result := c.connect(ctx)
if result.err != nil {
if c.opts.connectUntilSucceed {
c.logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err)
c.Logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err)
time.Sleep(time.Second)
goto connect
}
c.logger.Error("can not connect to zipper", "err", result.err)
c.Logger.Error("can not connect to zipper", "err", result.err)
return result.err
}
c.logger.Info("connected to zipper")
c.Logger.Info("connected to zipper")

go c.runBackground(ctx, addr, result.conn, result.fs)
go c.runBackground(ctx, result.conn, result.fs)

return nil
}
Expand Down Expand Up @@ -217,7 +213,7 @@ func (c *Client) nonBlockWriteFrame(f frame.Frame) error {
return nil
default:
err := errors.New("yomo: client has lost connection")
c.logger.Debug("failed to write frame", "frame_type", f.Type().String(), "error", err)
c.Logger.Debug("failed to write frame", "frame_type", f.Type().String(), "error", err)
return err
}
}
Expand Down Expand Up @@ -275,7 +271,7 @@ func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) {
buf = buf[:runtime.Stack(buf, false)]

perr := fmt.Errorf("%v", e)
c.logger.Error("stream panic", "err", perr)
c.Logger.Error("stream panic", "err", perr)
c.errorfn(fmt.Errorf("yomo: stream panic: %v\n%s", perr, buf))
}
}()
Expand All @@ -287,14 +283,14 @@ func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) {
func (c *Client) handleFrame(f frame.Frame) {
switch ff := f.(type) {
case *frame.RejectedFrame:
c.logger.Error("rejected error", "err", ff.Message)
c.Logger.Error("rejected error", "err", ff.Message)
_ = c.Close()
case *frame.DataFrame:
c.processor(ff)
case *frame.BackflowFrame:
c.receiver(ff)
default:
c.logger.Warn("received unexpected frame", "frame_type", f.Type().String())
c.Logger.Warn("received unexpected frame", "frame_type", f.Type().String())
}
}

Expand All @@ -313,15 +309,10 @@ func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
c.opts.observeDataTags = tag
}

// Logger get client's logger instance, you can customize this using `yomo.WithLogger`
func (c *Client) Logger() *slog.Logger {
return c.logger
}

// SetErrorHandler set error handler
func (c *Client) SetErrorHandler(fn func(err error)) {
c.errorfn = fn
c.logger.Debug("the error handler has been set")
c.Logger.Debug("the error handler has been set")
}

// ClientID returns the ID of client.
Expand All @@ -330,13 +321,14 @@ func (c *Client) ClientID() string { return c.clientID }
// Name returns the name of client.
func (c *Client) Name() string { return c.name }

// FrameWriterConnection represents a frame writer that can connect to an addr.
type FrameWriterConnection interface {
// Downstream represents a frame writer that can connect to an addr.
type Downstream interface {
frame.Writer
ClientID() string
Name() string
ID() string
LocalName() string
RemoteName() string
Close() error
Connect(context.Context, string) error
Connect(context.Context) error
}

// TracerProvider returns the tracer provider of client.
Expand Down
Loading

0 comments on commit 7f0b6ea

Please sign in to comment.