diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index 61504414b667..0eae9a6bcbd7 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -285,6 +285,18 @@ func (b *broker) handleReqs() { } } +// bufPool is used to reuse issued-request buffers across writes to brokers. +type bufPool struct{ p *sync.Pool } + +func newBufPool() bufPool { + return bufPool{ + p: &sync.Pool{New: func() interface{} { r := make([]byte, 1<<10); return &r }}, + } +} + +func (p bufPool) get() []byte { return (*p.p.Get().(*[]byte))[:0] } +func (p bufPool) put(b []byte) { p.p.Put(&b) } + // loadConection returns the broker's connection, creating it if necessary // and returning an error of if that fails. func (b *broker) loadConnection(reqKey int16) (*brokerCxn, error) { @@ -305,6 +317,7 @@ func (b *broker) loadConnection(reqKey int16) (*brokerCxn, error) { } cxn := &brokerCxn{ + bufPool: b.cl.bufPool, conn: conn, clientID: b.cl.cfg.id, saslCtx: b.cl.ctx, @@ -343,8 +356,8 @@ type brokerCxn struct { mechanism sasl.Mechanism expiry time.Time - // reqBuf, corrID, and clientID are used in writing requests. - reqBuf []byte + // bufPool, corrID, and clientID are used in writing requests. + bufPool bufPool corrID int32 clientID *string @@ -496,10 +509,17 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { var challenge []byte if !authenticate { - cxn.reqBuf = append(cxn.reqBuf[:0], 0, 0, 0, 0) - binary.BigEndian.PutUint32(cxn.reqBuf, uint32(len(clientWrite))) - cxn.reqBuf = append(cxn.reqBuf, clientWrite...) - if _, err = cxn.conn.Write(cxn.reqBuf); err != nil { + + buf := cxn.bufPool.get() + + buf = append(buf[:0], 0, 0, 0, 0) + binary.BigEndian.PutUint32(buf, uint32(len(clientWrite))) + buf = append(buf, clientWrite...) + _, err = cxn.conn.Write(buf) + + cxn.bufPool.put(buf) + + if err != nil { return ErrConnDead } if challenge, err = readConn(cxn.conn); err != nil { @@ -556,13 +576,14 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { // writeRequest writes a message request to the broker connection, bumping the // connection's correlation ID as appropriate for the next write. func (cxn *brokerCxn) writeRequest(req kmsg.Request) (int32, error) { - cxn.reqBuf = kmsg.AppendRequest( - cxn.reqBuf[:0], + buf := cxn.bufPool.get() + buf = kmsg.AppendRequest( + buf[:0], req, cxn.corrID, cxn.clientID, ) - if _, err := cxn.conn.Write(cxn.reqBuf); err != nil { + if _, err := cxn.conn.Write(buf); err != nil { return 0, ErrConnDead } id := cxn.corrID diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 29a13d473a1d..722eb6844786 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -33,6 +33,8 @@ type Client struct { anyBrokerIdx int stopBrokers bool // set to true on close to stop updateBrokers + bufPool bufPool // for to brokers to share underlying reusable request buffers + controllerID int32 // atomic producer producer @@ -106,6 +108,8 @@ func NewClient(opts ...Opt) (*Client, error) { controllerID: unknownControllerID, brokers: make(map[int32]*broker), + bufPool: newBufPool(), + decompressor: newDecompressor(), coordinators: make(map[coordinatorKey]int32),