From 1c69c5c274caf8d6b3a7953c67e5de51c55378be Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Mon, 28 May 2018 19:41:51 -0400 Subject: [PATCH] replicator: set min bytes to 1 --- jocko/broker.go | 5 +++++ jocko/replicator.go | 12 +++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/jocko/broker.go b/jocko/broker.go index d59738d4..8d86d54e 100644 --- a/jocko/broker.go +++ b/jocko/broker.go @@ -958,6 +958,10 @@ func (b *Broker) startReplica(replica *Replica) protocol.Error { state := b.fsm.State() _, topic, _ := state.GetTopic(replica.Partition.Topic) + if topic == nil { + return protocol.ErrUnknownTopicOrPartition + } + if replica.Log == nil { log, err := commitlog.New(commitlog.Options{ Path: filepath.Join(b.config.DataDir, "data", fmt.Sprintf("%d", replica.Partition.ID)), @@ -990,6 +994,7 @@ func (b *Broker) createTopic(ctx context.Context, topic *protocol.CreateTopicReq for _, partition := range ps { tt.Partitions[partition.ID] = partition.AR } + // TODO: create/set topic config here if _, err := b.raftApply(structs.RegisterTopicRequestType, structs.RegisterTopicRequest{Topic: tt}); err != nil { return protocol.ErrUnknown.WithErr(err) } diff --git a/jocko/replicator.go b/jocko/replicator.go index 46aa7599..d531e5db 100644 --- a/jocko/replicator.go +++ b/jocko/replicator.go @@ -18,9 +18,7 @@ type Replicator struct { config ReplicatorConfig logger log.Logger replica *Replica - minBytes int32 fetchSize int32 - maxWaitTime int32 highwaterMarkOffset int64 offset int64 msgs chan []byte @@ -29,12 +27,16 @@ type Replicator struct { } type ReplicatorConfig struct { - MinBytes int32 + MinBytes int32 + // todo: make this a time.Duration MaxWaitTime int32 } // NewReplicator returns a new replicator instance. func NewReplicator(config ReplicatorConfig, replica *Replica, leader client, logger log.Logger) *Replicator { + if config.MinBytes == 0 { + config.MinBytes = 1 + } r := &Replicator{ config: config, logger: logger, @@ -60,8 +62,8 @@ func (r *Replicator) fetchMessages() { default: fetchRequest := &protocol.FetchRequest{ ReplicaID: r.replica.BrokerID, - MaxWaitTime: r.maxWaitTime, - MinBytes: r.minBytes, + MaxWaitTime: r.config.MaxWaitTime, + MinBytes: r.config.MinBytes, Topics: []*protocol.FetchTopic{{ Topic: r.replica.Partition.Topic, Partitions: []*protocol.FetchPartition{{