Skip to content

Commit

Permalink
replicator: set min bytes to 1
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed May 28, 2018
1 parent 983857e commit 1c69c5c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
5 changes: 5 additions & 0 deletions jocko/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,10 @@ func (b *Broker) startReplica(replica *Replica) protocol.Error {
state := b.fsm.State()
_, topic, _ := state.GetTopic(replica.Partition.Topic)

if topic == nil {
return protocol.ErrUnknownTopicOrPartition
}

if replica.Log == nil {
log, err := commitlog.New(commitlog.Options{
Path: filepath.Join(b.config.DataDir, "data", fmt.Sprintf("%d", replica.Partition.ID)),
Expand Down Expand Up @@ -990,6 +994,7 @@ func (b *Broker) createTopic(ctx context.Context, topic *protocol.CreateTopicReq
for _, partition := range ps {
tt.Partitions[partition.ID] = partition.AR
}
// TODO: create/set topic config here
if _, err := b.raftApply(structs.RegisterTopicRequestType, structs.RegisterTopicRequest{Topic: tt}); err != nil {
return protocol.ErrUnknown.WithErr(err)
}
Expand Down
12 changes: 7 additions & 5 deletions jocko/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ type Replicator struct {
config ReplicatorConfig
logger log.Logger
replica *Replica
minBytes int32
fetchSize int32
maxWaitTime int32
highwaterMarkOffset int64
offset int64
msgs chan []byte
Expand All @@ -29,12 +27,16 @@ type Replicator struct {
}

type ReplicatorConfig struct {
MinBytes int32
MinBytes int32
// todo: make this a time.Duration
MaxWaitTime int32
}

// NewReplicator returns a new replicator instance.
func NewReplicator(config ReplicatorConfig, replica *Replica, leader client, logger log.Logger) *Replicator {
if config.MinBytes == 0 {
config.MinBytes = 1
}
r := &Replicator{
config: config,
logger: logger,
Expand All @@ -60,8 +62,8 @@ func (r *Replicator) fetchMessages() {
default:
fetchRequest := &protocol.FetchRequest{
ReplicaID: r.replica.BrokerID,
MaxWaitTime: r.maxWaitTime,
MinBytes: r.minBytes,
MaxWaitTime: r.config.MaxWaitTime,
MinBytes: r.config.MinBytes,
Topics: []*protocol.FetchTopic{{
Topic: r.replica.Partition.Topic,
Partitions: []*protocol.FetchPartition{{
Expand Down

0 comments on commit 1c69c5c

Please sign in to comment.