Skip to content

Commit

Permalink
Merge pull request #308 from liftbridge-io/configure_replication_max_…
Browse files Browse the repository at this point in the history
…size

Add clustering.replication.max.bytes config
  • Loading branch information
tylertreat committed Dec 28, 2020
2 parents 5a639e6 + 1654154 commit 285140e
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 26 deletions.
1 change: 1 addition & 0 deletions documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ the configuration file.
| replica.max.idle.wait | | The maximum amount of time a follower will wait before making a replication request once the follower is caught up with the leader. This value should always be less than `replica.max.lag.time` to avoid frequent shrinking of ISR for low-throughput streams. | duration | 10s | |
| replica.fetch.timeout | | Timeout duration for follower replication requests. | duration | 3s | |
| min.insync.replicas | | Specifies the minimum number of replicas that must acknowledge a stream write before it can be committed. If the ISR drops below this size, messages cannot be committed. | int | 1 | [1,...] |
| replication.max.bytes | | The maximum payload size, in bytes, a leader can send to followers for replication messages. This controls the amount of data that can be transferred for individual replication requests. If a leader receives a published message larger than this size, it will return an ack error to the client. Because replication is done over NATS, this cannot exceed the [`max_payload`](https://docs.nats.io/nats-server/configuration#limits) limit configured on the NATS cluster. Thus, this defaults to 1MB, which is the default value for `max_payload`. This should generally be set to match the value of `max_payload`. Setting it too low will preclude the replication of messages larger than it and negatively impact performance. This value should also be the same for all servers in the cluster. | int | 1048576 | |
### Activity Configuration Settings
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/raft v1.1.2
github.com/liftbridge-io/go-liftbridge/v2 v2.0.2-0.20201118225953-b849cccb6467
github.com/liftbridge-io/liftbridge-api v1.4.1
github.com/liftbridge-io/liftbridge-api v1.4.2-0.20201228201911-4b2d99797dbb
github.com/liftbridge-io/nats-on-a-log v0.0.0-20200818183806-bb17516cf3a3
github.com/liftbridge-io/raft-boltdb v0.0.0-20200414234651-aaf6e08d8f73
github.com/mattn/go-colorable v0.1.7 // indirect
Expand All @@ -40,8 +40,8 @@ require (
github.com/urfave/cli v1.22.4
go.etcd.io/bbolt v1.3.5 // indirect
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de // indirect
golang.org/x/net v0.0.0-20201216054612-986b41b23924 // indirect
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e // indirect
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/sys v0.0.0-20201223074533-0d417f636930 // indirect
golang.org/x/text v0.3.4 // indirect
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d // indirect
google.golang.org/grpc v1.34.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ github.com/liftbridge-io/go-liftbridge/v2 v2.0.2-0.20201118225953-b849cccb6467 h
github.com/liftbridge-io/go-liftbridge/v2 v2.0.2-0.20201118225953-b849cccb6467/go.mod h1:24NMu02Ba2sMO2y+IYstP1UFKVA4a6/p54Lc7KccSLc=
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256 h1:2pZtC3v6IBTwE70xfb/k0DPlOJ6BlXpthCUWrxCnhwo=
github.com/liftbridge-io/liftbridge-api v1.1.1-0.20201029165056-10f2aa65f256/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.4.1 h1:7LUThKH8z9Nr1Es6Arec4r5yI3JFOz166el5WFepp7A=
github.com/liftbridge-io/liftbridge-api v1.4.1/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.4.2-0.20201228201911-4b2d99797dbb h1:XdwKrJh9gUJ+kaOwYfU6Kt+mZxqOeJQ8oLFbVjpoNuA=
github.com/liftbridge-io/liftbridge-api v1.4.2-0.20201228201911-4b2d99797dbb/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/nats-on-a-log v0.0.0-20200818183806-bb17516cf3a3 h1:O4mg1NEmukgY8hxen3grrG5RY34LadMTzpbjf8kM2tA=
github.com/liftbridge-io/nats-on-a-log v0.0.0-20200818183806-bb17516cf3a3/go.mod h1:wmIIYVq+psahPlB1rvtTkGiltdihsKJbqwE1DkIPwj4=
github.com/liftbridge-io/raft-boltdb v0.0.0-20200414234651-aaf6e08d8f73 h1:8r/ReB1ns87pVDwSnPj87HIbOu/5y0uDyGChx9mUGSQ=
Expand Down Expand Up @@ -386,8 +386,8 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c h1:dk0ukUIHmGHqASjP0iue2261isepFCC6XRCSd1nHgDw=
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c/go.mod h1:iQL9McJNjoIa5mjH6nYTCTZXUN6RP+XW3eib7Ya3XcI=
golang.org/x/net v0.0.0-20201216054612-986b41b23924 h1:QsnDpLLOKwHBBDa8nDws4DYNc/ryVW2vCpxCs09d4PY=
golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -426,8 +426,8 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e h1:AyodaIpKjppX+cBfTASF2E1US3H2JFBj920Ot3rtDjs=
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201223074533-0d417f636930 h1:vRgIt+nup/B/BwIS0g2oC0haq0iqbV3ZA+u6+0TlNCo=
golang.org/x/sys v0.0.0-20201223074533-0d417f636930/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
39 changes: 39 additions & 0 deletions server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (a *apiServer) Publish(ctx context.Context, req *client.PublishRequest) (
}

if e := a.ensureStreamNotReadonly(req.Stream, req.Partition); e != nil {
a.logger.Errorf("api: Failed to publish message: %v", e.Message)
return nil, convertPublishAsyncError(e)
}

Expand Down Expand Up @@ -315,6 +316,13 @@ func (a *apiServer) Publish(ctx context.Context, req *client.PublishRequest) (
return nil, err
}

if ack != nil {
if e := convertAckError(ack.AckError); e != nil {
a.logger.Errorf("api: Published message was rejected: %v", e.Message)
return nil, convertPublishAsyncError(e)
}
}

resp.Ack = ack
return resp, nil
}
Expand Down Expand Up @@ -800,6 +808,30 @@ func convertPublishAsyncError(err *client.PublishAsyncError) error {
return status.Error(code, err.Message)
}

func convertAckError(ackError client.Ack_Error) *client.PublishAsyncError {
var (
code client.PublishAsyncError_Code
message string
)
switch ackError {
case client.Ack_OK:
return nil
case client.Ack_INCORRECT_OFFSET:
code = client.PublishAsyncError_INCORRECT_OFFSET
message = "incorrect expected offset"
case client.Ack_TOO_LARGE:
code = client.PublishAsyncError_BAD_REQUEST
message = "message exceeds max replication size"
default:
code = client.PublishAsyncError_UNKNOWN
message = "unknown error"
}
return &client.PublishAsyncError{
Code: code,
Message: message,
}
}

// publishAsyncSession maintains state for long-lived PublishAsync RPCs.
type publishAsyncSession struct {
*apiServer
Expand Down Expand Up @@ -833,6 +865,13 @@ func (p *publishAsyncSession) dispatchAcks() error {
p.inflight = 0
}
p.mu.Unlock()

if e := convertAckError(ack.AckError); e != nil {
p.logger.Errorf("api: Published async message was rejected: %v", e.Message)
p.sendPublishAsyncError(ack.CorrelationId, e)
return
}

if err := p.stream.Send(&client.PublishResponse{CorrelationId: ack.CorrelationId, Ack: ack}); err != nil {
p.logger.Errorf("api: Failed to send PublishAsync response: %v", err)
}
Expand Down
20 changes: 20 additions & 0 deletions server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func TestStreamPublishSubscribe(t *testing.T) {

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.Clustering.ReplicationMaxBytes = 1024
s1 := runServerWithConfig(t, s1Config)
defer s1.Stop()

Expand Down Expand Up @@ -647,6 +648,13 @@ func TestStreamPublishSubscribe(t *testing.T) {
case <-time.After(10 * time.Second):
t.Fatal("Did not receive all expected messages")
}

// Publishing a message whose size is larger than max replication size
// returns an error.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.Publish(ctx, name, make([]byte, 1024+1))
require.Error(t, err)
}

// Ensure legacy Publish endpoint works.
Expand All @@ -659,6 +667,7 @@ func TestLegacyPublish(t *testing.T) {

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.Clustering.ReplicationMaxBytes = 1024
s1 := runServerWithConfig(t, s1Config)
defer s1.Stop()

Expand Down Expand Up @@ -708,6 +717,17 @@ func TestLegacyPublish(t *testing.T) {
})
require.NoError(t, err)
require.Nil(t, resp.Ack)

// Publishing a message whose size is larger than max replication size
// returns an error.
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = apiClient.Publish(ctx, &proto.PublishRequest{
Stream: "foo",
Partition: 1,
Value: make([]byte, 1024+1),
})
require.Error(t, err)
}

// Ensure publishing to a NATS subject works.
Expand Down
9 changes: 9 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
defaultReplicaMaxLagTime = 15 * time.Second
defaultReplicaMaxLeaderTimeout = 15 * time.Second
defaultReplicaMaxIdleWait = 10 * time.Second
defaultReplicationMaxBytes = 1024 * 1024 // 1MB
defaultRaftSnapshots = 2
defaultRaftCacheSize = 512
defaultMetadataCacheMaxAge = 2 * time.Minute
Expand Down Expand Up @@ -103,6 +104,7 @@ const (
configClusteringReplicaMaxIdleWait = "clustering.replica.max.idle.wait"
configClusteringReplicaFetchTimeout = "clustering.replica.fetch.timeout"
configClusteringMinInsyncReplicas = "clustering.min.insync.replicas"
configClusteringReplicationMaxBytes = "clustering.replication.max.bytes"

configActivityStreamEnabled = "activity.stream.enabled"
configActivityStreamPublishTimeout = "activity.stream.publish.timeout"
Expand Down Expand Up @@ -155,6 +157,7 @@ var configKeys = map[string]struct{}{
configClusteringReplicaMaxIdleWait: {},
configClusteringReplicaFetchTimeout: {},
configClusteringMinInsyncReplicas: {},
configClusteringReplicationMaxBytes: {},
configActivityStreamEnabled: {},
configActivityStreamPublishTimeout: {},
configActivityStreamPublishAckPolicy: {},
Expand Down Expand Up @@ -280,6 +283,7 @@ type ClusteringConfig struct {
ReplicaFetchTimeout time.Duration
ReplicaMaxIdleWait time.Duration
MinISR int
ReplicationMaxBytes int64
}

// ActivityStreamConfig contains settings for controlling activity stream
Expand Down Expand Up @@ -339,6 +343,7 @@ func NewDefaultConfig() *Config {
config.Clustering.RaftSnapshots = defaultRaftSnapshots
config.Clustering.RaftCacheSize = defaultRaftCacheSize
config.Clustering.MinISR = defaultMinInsyncReplicas
config.Clustering.ReplicationMaxBytes = defaultReplicationMaxBytes
config.Streams.SegmentMaxBytes = defaultMaxSegmentBytes
config.Streams.SegmentMaxAge = defaultMaxSegmentAge
config.Streams.RetentionMaxAge = defaultRetentionMaxAge
Expand Down Expand Up @@ -676,6 +681,10 @@ func parseClusteringConfig(config *Config, v *viper.Viper) error { // nolint: go
config.Clustering.MinISR = v.GetInt(configClusteringMinInsyncReplicas)
}

if v.IsSet(configClusteringReplicationMaxBytes) {
config.Clustering.ReplicationMaxBytes = v.GetInt64(configClusteringReplicationMaxBytes)
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestNewConfigFromFile(t *testing.T) {
require.Equal(t, 2*time.Second, config.Clustering.ReplicaMaxIdleWait)
require.Equal(t, 3*time.Second, config.Clustering.ReplicaFetchTimeout)
require.Equal(t, 1, config.Clustering.MinISR)
require.Equal(t, int64(1024), config.Clustering.ReplicationMaxBytes)

require.Equal(t, true, config.ActivityStream.Enabled)
require.Equal(t, time.Minute, config.ActivityStream.PublishTimeout)
Expand Down
1 change: 1 addition & 0 deletions server/configs/full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ clustering:
idle.wait: 2s
fetch.timeout: 3s
min.insync.replicas: '1'
replication.max.bytes: 1024

activity.stream:
enabled: true
Expand Down
4 changes: 3 additions & 1 deletion server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ func (m *metadataAPI) fetchBrokerInfo(ctx context.Context, numPeers int) ([]*cli
if err != nil {
panic(err)
}
m.ncRaft.PublishRequest(m.getServerInfoInbox(), inbox, queryReq)
if err := m.ncRaft.PublishRequest(m.getServerInfoInbox(), inbox, queryReq); err != nil {
return nil, status.New(codes.Internal, err.Error())
}

// Gather responses.
for i := 0; i < numPeers; i++ {
Expand Down
51 changes: 48 additions & 3 deletions server/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,11 @@ func (p *partition) messageProcessingLoop(recvChan <-chan *nats.Msg, stop <-chan
p.mu.Unlock()

m := natsToProtoMessage(msg, leaderEpoch)
// Reject messages that are larger than the max replication size.
if int64(len(msg.Data)) > p.srv.config.Clustering.ReplicationMaxBytes {
p.sendTooLargeNack(m)
continue
}
msgBatch = append(msgBatch, m)
remaining := batchSize - 1

Expand All @@ -814,12 +819,18 @@ func (p *partition) messageProcessingLoop(recvChan <-chan *nats.Msg, stop <-chan
chanLen = remaining
}

added := 0
for i := 0; i < chanLen; i++ {
msg = <-recvChan
m := natsToProtoMessage(msg, leaderEpoch)
if int64(len(msg.Data)) > p.srv.config.Clustering.ReplicationMaxBytes {
p.sendTooLargeNack(m)
continue
}
msgBatch = append(msgBatch, m)
added++
}
remaining -= chanLen
remaining -= added
}

// Write uncommitted messages to log.
Expand Down Expand Up @@ -970,7 +981,38 @@ func (p *partition) sendAck(ack *client.Ack) {
if err != nil {
panic(err)
}
p.srv.ncAcks.Publish(ack.AckInbox, data)
if err := p.srv.ncAcks.Publish(ack.AckInbox, data); err != nil {
p.srv.logger.Errorf("Error sending ack for partition %s: %v", p, err)
}
}

// sendTooLargeNack publishes an ack containing an error indicating the message
// exceeded the max replication size to the specified AckInbox. If no AckInbox
// is set, this does nothing.
func (p *partition) sendTooLargeNack(msg *commitlog.Message) {
p.srv.logger.Errorf(
"Rejecting message received on partition %s that exceeds clustering.replication.max.bytes (%d)",
p, p.srv.config.Clustering.ReplicationMaxBytes)
if msg.AckInbox == "" {
return
}
ack := &client.Ack{
Stream: p.Stream,
PartitionSubject: p.Subject,
MsgSubject: string(msg.Headers["subject"]),
AckInbox: msg.AckInbox,
CorrelationId: msg.CorrelationID,
AckPolicy: msg.AckPolicy,
ReceptionTimestamp: msg.Timestamp,
AckError: client.Ack_TOO_LARGE,
}
data, err := proto.MarshalAck(ack)
if err != nil {
panic(err)
}
if err := p.srv.ncAcks.Publish(ack.AckInbox, data); err != nil {
p.srv.logger.Errorf("Error sending ack for partition %s: %v", p, err)
}
}

// replicationRequestLoop is a long-running loop which sends replication
Expand Down Expand Up @@ -1323,7 +1365,10 @@ func (p *partition) sendPartitionNotification(replica string) {
if err != nil {
panic(err)
}
p.srv.ncRepl.Publish(p.srv.getPartitionNotificationInbox(replica), req)
if err := p.srv.ncRepl.Publish(p.srv.getPartitionNotificationInbox(replica), req); err != nil {
p.srv.logger.Errorf("Error sending new data notification to replica %s for partition %s: %v",
replica, p, err)
}
}

// pauseReplication stops replication on the leader. This is for unit testing
Expand Down
4 changes: 3 additions & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ func (s *Server) detectBootstrapMisconfig() {
case <-s.shutdownCh:
return
case <-ticker.C:
s.ncRaft.PublishRequest(subj, inbox, srvID)
if err := s.ncRaft.PublishRequest(subj, inbox, srvID); err != nil {
s.logger.Errorf("Error publishing bootstrap misconfiguration detection message: %v", err)
}
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions server/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,9 @@ import (
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

const (
// replicationMaxSize is the max payload size to send in replication
// messages to followers. The default NATS max message size is 1MB, so
// we'll use that.
replicationMaxSize = 1024 * 1024

// replicationOverhead is the non-data size overhead of replication
// messages: 8 bytes for the leader epoch and 8 bytes for the HW.
replicationOverhead = 16
)
// replicationOverhead is the non-data size overhead of replication messages: 8
// bytes for the leader epoch and 8 bytes for the HW.
const replicationOverhead = 16

// replicationRequest wraps a ReplicationRequest protobuf and a NATS subject
// where responses should be sent.
Expand Down Expand Up @@ -231,7 +224,7 @@ func (r *replicator) replicate(
message commitlog.SerializedMessage
err error
)
for offset < newestOffset && r.writer.Len() < replicationMaxSize {
for offset < newestOffset && int64(r.writer.Len()) < r.partition.srv.config.Clustering.ReplicationMaxBytes {
message, offset, _, _, err = reader.ReadMessage(ctx, r.headersBuf[:])
if err != nil {
r.partition.srv.logger.Errorf("Failed to read message while replicating: %v", err)
Expand All @@ -240,7 +233,8 @@ func (r *replicator) replicate(

// Check if this message will put us over the batch size limit. If it
// does, flush the batch now.
if uint32(len(message))+uint32(len(r.headersBuf))+uint32(r.writer.Len()) > replicationMaxSize {
batchSize := int64(len(message)) + int64(len(r.headersBuf)) + int64(r.writer.Len())
if batchSize > r.partition.srv.config.Clustering.ReplicationMaxBytes {
break
}

Expand Down
Loading

0 comments on commit 285140e

Please sign in to comment.