Skip to content

Commit

Permalink
Merge pull request redpanda-data#20 from twmb/stash
Browse files Browse the repository at this point in the history
rewrite the consumer & source code; other bug fixes
  • Loading branch information
twmb committed Dec 31, 2020
2 parents 385cecb + 539b06c commit 4456965
Show file tree
Hide file tree
Showing 12 changed files with 1,923 additions and 1,698 deletions.
62 changes: 2 additions & 60 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ type promisedResp struct {
enqueue time.Time // used to calculate readWait
}

type waitingResp struct {
resp kmsg.Response
promise func(kmsg.Response, error)
err error
}

var unknownMetadata = BrokerMetadata{
NodeID: -1,
}
Expand Down Expand Up @@ -119,21 +113,6 @@ type broker struct {
cxnProduce *brokerCxn
cxnFetch *brokerCxn

// sink and source exist so that metadata updates can copy these
// pointers to a topicPartition's record's sink field and consumption's
// source field.
//
// Brokers are created with these two fields initialized; when a topic
// partition wants to use the broker, it copies these pointers.
sink *sink
source *source

// seqResps, guarded by seqRespsMu, contains responses that must be
// handled sequentially. These responses are handled asyncronously,
// but sequentially.
seqRespsMu sync.Mutex
seqResps []waitingResp

// dieMu guards sending to reqs in case the broker has been
// permanently stopped.
dieMu sync.RWMutex
Expand Down Expand Up @@ -166,8 +145,6 @@ func (cl *Client) newBroker(nodeID int32, host string, port int32, rack *string)

reqs: make(chan promisedReq, 10),
}
br.sink = newSink(cl, br)
br.source = newSource(cl, br)
go br.handleReqs()

return br
Expand Down Expand Up @@ -219,43 +196,6 @@ func (b *broker) do(
}
}

// doSequencedAsyncPromise is the same as do, but all requests using this
// function have their responses handled sequentially.
//
// This is important for example for ordering of produce requests.
//
// Note that the requests may finish out of order (e.g. dead connection kills
// latter request); this is handled appropriately in producing.
func (b *broker) doSequencedAsyncPromise(
ctx context.Context,
req kmsg.Request,
promise func(kmsg.Response, error),
) {
b.do(ctx, req, func(resp kmsg.Response, err error) {
b.seqRespsMu.Lock()
b.seqResps = append(b.seqResps, waitingResp{resp, promise, err})
if len(b.seqResps) == 1 {
go b.handleSeqResp(b.seqResps[0])
}
b.seqRespsMu.Unlock()
})
}

// handleSeqResp handles a sequenced response while there is one.
func (b *broker) handleSeqResp(wr waitingResp) {
more:
wr.promise(wr.resp, wr.err)

b.seqRespsMu.Lock()
b.seqResps = b.seqResps[1:]
if len(b.seqResps) > 0 {
wr = b.seqResps[0]
b.seqRespsMu.Unlock()
goto more
}
b.seqRespsMu.Unlock()
}

// waitResp runs a req, waits for the resp and returns the resp and err.
func (b *broker) waitResp(ctx context.Context, req kmsg.Request) (kmsg.Response, error) {
var resp kmsg.Response
Expand Down Expand Up @@ -734,6 +674,8 @@ func (cxn *brokerCxn) writeRequest(ctx context.Context, writeWait time.Duration,
}
}

// TODO: write in a goroutine, use ctx to allow for early cancel.

buf := cxn.cl.bufPool.get()
defer cxn.cl.bufPool.put(buf)
buf = cxn.cl.reqFormatter.AppendRequest(
Expand Down
29 changes: 25 additions & 4 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ type Client struct {
anyBrokerIdx int
stopBrokers bool // set to true on close to stop updateBrokers

// A sink and a source is created once per node ID and persists
// forever. We expect the list to be small.
//
// The mutex only exists to allow consumer session stopping to read
// sources to notify when starting a session; all writes happen in the
// metadata loop.
sinksAndSourcesMu sync.Mutex
sinksAndSources map[int32]sinkAndSource

reqFormatter *kmsg.RequestFormatter
connTimeoutFn func(kmsg.Request) (time.Duration, time.Duration)

Expand Down Expand Up @@ -88,6 +97,11 @@ type Client struct {
metadone chan struct{}
}

type sinkAndSource struct {
sink *sink
source *source
}

// NewClient returns a new Kafka client with the given options or an error if
// the options are invalid. Connections to brokers are lazily created only when
// requests are written to them.
Expand Down Expand Up @@ -145,6 +159,8 @@ func NewClient(opts ...Opt) (*Client, error) {
controllerID: unknownControllerID,
brokers: make(map[int32]*broker),

sinksAndSources: make(map[int32]sinkAndSource),

reqFormatter: new(kmsg.RequestFormatter),
connTimeoutFn: connTimeoutBuilder(cfg.connTimeoutOverhead),

Expand Down Expand Up @@ -255,7 +271,7 @@ func connTimeoutBuilder(defaultTimeout time.Duration) func(kmsg.Request) (time.D

// broker returns a random broker from all brokers ever known.
func (cl *Client) broker() *broker {
cl.brokersMu.Lock()
cl.brokersMu.Lock() // full lock needed for anyBrokerIdx below
defer cl.brokersMu.Unlock()

if cl.anyBrokerIdx >= len(cl.anyBroker) { // metadata update lost us brokers
Expand Down Expand Up @@ -400,15 +416,19 @@ func (cl *Client) Close() {
cl.stopBrokers = true
for _, broker := range cl.brokers {
broker.stopForever()
broker.sink.maybeDrain() // awaken anything in backoff
broker.source.maybeConsume() // same
}
cl.brokersMu.Unlock()

// Wait for metadata to quit so we know no more erroring topic
// partitions will be created.
// partitions will be created. After metadata has quit, we can
// safely stop sinks and sources, as no more will be made.
<-cl.metadone

for _, sns := range cl.sinksAndSources {
sns.sink.maybeDrain() // awaken anything in backoff
sns.source.maybeConsume() // same
}

// We must manually fail all partitions that never had a sink.
for _, partitions := range cl.loadTopics() {
for _, partition := range partitions.load().all {
Expand Down Expand Up @@ -602,6 +622,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
}
// fetchMetadata does its own retrying, so we do not do
// retrying here.
// TODO also needs auto topic create
br, resp, err := cl.fetchMetadata(ctx, metaReq.Topics == nil, topics)
return shards(shard(br, req, resp, err)), nil

Expand Down
3 changes: 2 additions & 1 deletion pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ func AutoTopicCreation() Opt {
// defaults to 100MiB.
//
// The only Kafka request that could come reasonable close to hitting this
// limit should be produce requests.
// limit should be produce requests, and thus this limit is only enforced for
// produce requests.
func BrokerMaxWriteBytes(v int32) Opt {
return clientOpt{func(cfg *cfg) { cfg.maxBrokerWriteBytes = v }}
}
Expand Down
Loading

0 comments on commit 4456965

Please sign in to comment.