Skip to content

Commit

Permalink
remove per-broker-cxn buffer, switch to pool
Browse files Browse the repository at this point in the history
With the sticky key partitioner, this pool will save use large memory
usage when producing.
  • Loading branch information
twmb committed Feb 2, 2020
1 parent e9177d8 commit b17c45f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
39 changes: 30 additions & 9 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,18 @@ func (b *broker) handleReqs() {
}
}

// bufPool is used to reuse issued-request buffers across writes to brokers.
type bufPool struct{ p *sync.Pool }

func newBufPool() bufPool {
return bufPool{
p: &sync.Pool{New: func() interface{} { r := make([]byte, 1<<10); return &r }},
}
}

func (p bufPool) get() []byte { return (*p.p.Get().(*[]byte))[:0] }
func (p bufPool) put(b []byte) { p.p.Put(&b) }

// loadConection returns the broker's connection, creating it if necessary
// and returning an error of if that fails.
func (b *broker) loadConnection(reqKey int16) (*brokerCxn, error) {
Expand All @@ -305,6 +317,7 @@ func (b *broker) loadConnection(reqKey int16) (*brokerCxn, error) {
}

cxn := &brokerCxn{
bufPool: b.cl.bufPool,
conn: conn,
clientID: b.cl.cfg.id,
saslCtx: b.cl.ctx,
Expand Down Expand Up @@ -343,8 +356,8 @@ type brokerCxn struct {
mechanism sasl.Mechanism
expiry time.Time

// reqBuf, corrID, and clientID are used in writing requests.
reqBuf []byte
// bufPool, corrID, and clientID are used in writing requests.
bufPool bufPool
corrID int32
clientID *string

Expand Down Expand Up @@ -496,10 +509,17 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
var challenge []byte

if !authenticate {
cxn.reqBuf = append(cxn.reqBuf[:0], 0, 0, 0, 0)
binary.BigEndian.PutUint32(cxn.reqBuf, uint32(len(clientWrite)))
cxn.reqBuf = append(cxn.reqBuf, clientWrite...)
if _, err = cxn.conn.Write(cxn.reqBuf); err != nil {

buf := cxn.bufPool.get()

buf = append(buf[:0], 0, 0, 0, 0)
binary.BigEndian.PutUint32(buf, uint32(len(clientWrite)))
buf = append(buf, clientWrite...)
_, err = cxn.conn.Write(buf)

cxn.bufPool.put(buf)

if err != nil {
return ErrConnDead
}
if challenge, err = readConn(cxn.conn); err != nil {
Expand Down Expand Up @@ -556,13 +576,14 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
// writeRequest writes a message request to the broker connection, bumping the
// connection's correlation ID as appropriate for the next write.
func (cxn *brokerCxn) writeRequest(req kmsg.Request) (int32, error) {
cxn.reqBuf = kmsg.AppendRequest(
cxn.reqBuf[:0],
buf := cxn.bufPool.get()
buf = kmsg.AppendRequest(
buf[:0],
req,
cxn.corrID,
cxn.clientID,
)
if _, err := cxn.conn.Write(cxn.reqBuf); err != nil {
if _, err := cxn.conn.Write(buf); err != nil {
return 0, ErrConnDead
}
id := cxn.corrID
Expand Down
4 changes: 4 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Client struct {
anyBrokerIdx int
stopBrokers bool // set to true on close to stop updateBrokers

bufPool bufPool // for to brokers to share underlying reusable request buffers

controllerID int32 // atomic

producer producer
Expand Down Expand Up @@ -106,6 +108,8 @@ func NewClient(opts ...Opt) (*Client, error) {
controllerID: unknownControllerID,
brokers: make(map[int32]*broker),

bufPool: newBufPool(),

decompressor: newDecompressor(),

coordinators: make(map[coordinatorKey]int32),
Expand Down

0 comments on commit b17c45f

Please sign in to comment.