Skip to content

Commit

Permalink
Merge pull request #176 from liftbridge-io/activity_stream
Browse files Browse the repository at this point in the history
Activity stream
  • Loading branch information
tylertreat committed Mar 31, 2020
2 parents 2a93537 + dffdda8 commit 4967b1c
Show file tree
Hide file tree
Showing 22 changed files with 454 additions and 187 deletions.
14 changes: 14 additions & 0 deletions documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ An example configuration file is shown below.
---
listen: localhost:9293
data.dir: /tmp/liftbridge/server-2
activity.stream.enabled: true

# Configure logging.
logging:
Expand Down Expand Up @@ -125,6 +126,7 @@ the setting in the configuration file and the CLI flag if it exists.
| nats | | NATS configuration. | map | | [See below](#nats-configuration-settings) |
| streams | | Write-ahead log configuration for message streams. | map | | [See below](#streams-configuration-settings) |
| clustering | | Broker cluster configuration. | map | | [See below](#clustering-configuration-settings) |
| activity | | Meta activity event stream configuration. | map | | [See below](#activity-configuration-settings) |
### NATS Configuration Settings
Expand Down Expand Up @@ -172,3 +174,15 @@ the configuration file.
| replica.max.idle.wait | | The maximum amount of time a follower will wait before making a replication request once the follower is caught up with the leader. This value should always be less than `replica.max.lag.time` to avoid frequent shrinking of ISR for low-throughput streams. | duration | 10s | |
| replica.fetch.timeout | | Timeout duration for follower replication requests. | duration | 3s | |
| min.insync.replicas | | Specifies the minimum number of replicas that must acknowledge a stream write before it can be committed. If the ISR drops below this size, messages cannot be committed. | int | 1 | [1,...] |
### Activity Configuration Settings
Below is the list of the configuration settings for the `activity` part of
the configuration file.
| Name | Flag | Description | Type | Default | Valid Values |
|:----|:----|:----|:----|:----|:----|
| stream.enabled | | Enables the activity stream. This will create an internal stream called `__activity` which events will be published to. | bool | false | |
| stream.publish.timeout | | The timeout for publishes to the activity stream. This is the time to wait for an ack from the activity stream, which means it's related to `stream.publish.ack.policy`. If the ack policy is `none`, this has no effect. | duration | 5s | |
| stream.publish.ack.policy | | The ack policy to use for publishes to the activity stream. The value `none` means publishes will not wait for an ack, `leader` means publishes will wait for the ack sent when the leader has committed the event, and `all` means publishes will wait for the ack sent when all replicas have committed the event. | string | all | [none, leader, all] |
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/hashicorp/raft v1.1.2
github.com/hashicorp/raft-boltdb v0.0.0-20191021154308-4207f1bf0617
github.com/liftbridge-io/go-liftbridge v1.0.0-beta.0.20200326225148-69c47c098aea
github.com/liftbridge-io/liftbridge-api v1.0.0-beta.0.20200326224922-0afea69beb86
github.com/liftbridge-io/liftbridge-api v1.0.0-beta.0.20200331034816-58c8311cddaf
github.com/liftbridge-io/nats-on-a-log v0.0.0-20200303015016-68120bc11e03
github.com/natefinch/atomic v0.0.0-20150920032501-a62ce929ffcc
github.com/nats-io/nats-server/v2 v2.1.4
Expand All @@ -22,5 +22,7 @@ require (
github.com/spf13/viper v1.6.2
github.com/stretchr/testify v1.4.0
github.com/urfave/cli v1.22.3
golang.org/x/sys v0.0.0-20200327173247-9dae0f8f5775 // indirect
google.golang.org/genproto v0.0.0-20200330113809-af700f360a68 // indirect
google.golang.org/grpc v1.28.0
)
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ github.com/liftbridge-io/liftbridge-api v0.0.0-20200118015119-3db283f59b10 h1:k1
github.com/liftbridge-io/liftbridge-api v0.0.0-20200118015119-3db283f59b10/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.0.0-alpha h1:NwtHTgIdYNQXkLdmLHm9rguNvbtT4OHAs6FSSrHTq1Q=
github.com/liftbridge-io/liftbridge-api v1.0.0-alpha/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.0.0-beta/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.0.0-beta.0.20200312161101-bf413bcbd765 h1:ainowikclNBfxawwBRZjm1ZF2GXIqTArUO2S2Vd+UA0=
github.com/liftbridge-io/liftbridge-api v1.0.0-beta.0.20200312161101-bf413bcbd765/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.0.0-beta.0.20200326224922-0afea69beb86 h1:Q29vm1buKRIYHfJH8ErDBTMsuLe34wVIsg8lnlKdObg=
github.com/liftbridge-io/liftbridge-api v1.0.0-beta.0.20200326224922-0afea69beb86/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-api v1.0.0-beta.0.20200331034816-58c8311cddaf h1:eQTD+X9IeJtU9qgC5/ObJxHvXRpSvA+qoadpNWv6rGg=
github.com/liftbridge-io/liftbridge-api v1.0.0-beta.0.20200331034816-58c8311cddaf/go.mod h1:6IFEFZ4ncnOgeDVjSt0vh1lKNhlJ5YT9xnG1eRa9LC8=
github.com/liftbridge-io/liftbridge-grpc v0.0.0-20190829220806-66e3ee4b7943/go.mod h1:ObGO38WdO4ldLsa2oUFcultUk0rggc+yZWcBb7qjnDI=
github.com/liftbridge-io/liftbridge-grpc v0.0.0-20190910222614-5694b15f251d/go.mod h1:ObGO38WdO4ldLsa2oUFcultUk0rggc+yZWcBb7qjnDI=
github.com/liftbridge-io/nats-on-a-log v0.0.0-20180718011723-80d0727461af/go.mod h1:4tC6R+N3facyfCwDuuuLkFF/25ceiZEwoQUzIez2dVo=
Expand Down Expand Up @@ -443,6 +444,8 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 h1:uYVVQ9WP/Ds2ROhcaGPeIdVq0
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200327173247-9dae0f8f5775 h1:TC0v2RSO1u2kn1ZugjrFXkRZAEaqMN/RW+OTZkBzmLE=
golang.org/x/sys v0.0.0-20200327173247-9dae0f8f5775/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -495,6 +498,8 @@ google.golang.org/genproto v0.0.0-20200313141609-30c55424f95d h1:pyQjO6BnPvrPMld
google.golang.org/genproto v0.0.0-20200313141609-30c55424f95d/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200326112834-f447254575fd h1:DVCc2PgW9UrvHGZGEv4Mt3uSeQtUrrs7r8pUw+bVwWI=
google.golang.org/genproto v0.0.0-20200326112834-f447254575fd/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200330113809-af700f360a68 h1:ay2fio+sR6N1ccqZQgr/bUoo6pwgbxU8imlLkQc9Nlo=
google.golang.org/genproto v0.0.0-20200330113809-af700f360a68/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:generate protoc -I=. -I=$GOPATH/src --gofast_out=. ./server/proto/internal.proto
//go:generate protoc -I=. -I=$GOPATH/src --gofast_out=. ./server/protocol/internal.proto

package main

Expand Down
2 changes: 1 addition & 1 deletion server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

client "github.com/liftbridge-io/liftbridge-api/go"
"github.com/liftbridge-io/liftbridge/server/commitlog"
"github.com/liftbridge-io/liftbridge/server/proto"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

const raftApplyTimeout = 30 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion server/commitlog/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/nsip/gommap"
"github.com/pkg/errors"

"github.com/liftbridge-io/liftbridge/server/proto"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

var errIndexCorrupt = errors.New("corrupt index file")
Expand Down
92 changes: 77 additions & 15 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"

client "github.com/liftbridge-io/liftbridge-api/go"
)

const (
Expand All @@ -27,21 +29,23 @@ const (

// Config setting defaults.
const (
defaultListenAddress = "0.0.0.0"
defaultConnectionAddress = "localhost"
defaultReplicaMaxLagTime = 15 * time.Second
defaultReplicaMaxLeaderTimeout = 15 * time.Second
defaultReplicaMaxIdleWait = 10 * time.Second
defaultRaftSnapshots = 2
defaultRaftCacheSize = 512
defaultMetadataCacheMaxAge = 2 * time.Minute
defaultBatchMaxMessages = 1024
defaultReplicaFetchTimeout = 3 * time.Second
defaultMinInsyncReplicas = 1
defaultRetentionMaxAge = 7 * 24 * time.Hour
defaultCleanerInterval = 5 * time.Minute
defaultMaxSegmentBytes = 1024 * 1024 * 256 // 256MB
defaultMaxSegmentAge = defaultRetentionMaxAge
defaultListenAddress = "0.0.0.0"
defaultConnectionAddress = "localhost"
defaultReplicaMaxLagTime = 15 * time.Second
defaultReplicaMaxLeaderTimeout = 15 * time.Second
defaultReplicaMaxIdleWait = 10 * time.Second
defaultRaftSnapshots = 2
defaultRaftCacheSize = 512
defaultMetadataCacheMaxAge = 2 * time.Minute
defaultBatchMaxMessages = 1024
defaultReplicaFetchTimeout = 3 * time.Second
defaultMinInsyncReplicas = 1
defaultRetentionMaxAge = 7 * 24 * time.Hour
defaultCleanerInterval = 5 * time.Minute
defaultMaxSegmentBytes = 1024 * 1024 * 256 // 256MB
defaultMaxSegmentAge = defaultRetentionMaxAge
defaultActivityStreamPublishTimeout = 5 * time.Second
defaultActivityStreamPublishAckPolicy = client.AckPolicy_ALL
)

// Config setting key names.
Expand Down Expand Up @@ -89,6 +93,10 @@ const (
configClusteringReplicaMaxIdleWait = "clustering.replica.max.idle.wait"
configClusteringReplicaFetchTimeout = "clustering.replica.fetch.timeout"
configClusteringMinInsyncReplicas = "clustering.min.insync.replicas"

configActivityStreamEnabled = "activity.stream.enabled"
configActivityStreamPublishTimeout = "activity.stream.publish.timeout"
configActivityStreamPublishAckPolicy = "activity.stream.publish.ack.policy"
)

var configKeys = map[string]struct{}{
Expand Down Expand Up @@ -129,6 +137,9 @@ var configKeys = map[string]struct{}{
configClusteringReplicaMaxIdleWait: {},
configClusteringReplicaFetchTimeout: {},
configClusteringMinInsyncReplicas: {},
configActivityStreamEnabled: {},
configActivityStreamPublishTimeout: {},
configActivityStreamPublishAckPolicy: {},
}

// StreamsConfig contains settings for controlling the message log for streams.
Expand Down Expand Up @@ -184,6 +195,14 @@ type ClusteringConfig struct {
MinISR int
}

// ActivityStreamConfig contains settings for controlling activity stream
// behavior.
type ActivityStreamConfig struct {
Enabled bool
PublishTimeout time.Duration
PublishAckPolicy client.AckPolicy
}

// Config contains all settings for a Liftbridge Server.
type Config struct {
Listen HostPort
Expand All @@ -204,6 +223,7 @@ type Config struct {
NATS nats.Options
Streams StreamsConfig
Clustering ClusteringConfig
ActivityStream ActivityStreamConfig
}

// NewDefaultConfig creates a new Config with default settings.
Expand All @@ -228,6 +248,8 @@ func NewDefaultConfig() *Config {
config.Streams.SegmentMaxAge = defaultMaxSegmentAge
config.Streams.RetentionMaxAge = defaultRetentionMaxAge
config.Streams.CleanerInterval = defaultCleanerInterval
config.ActivityStream.PublishTimeout = defaultActivityStreamPublishTimeout
config.ActivityStream.PublishAckPolicy = defaultActivityStreamPublishAckPolicy
return config
}

Expand Down Expand Up @@ -391,6 +413,7 @@ func NewConfig(configFile string) (*Config, error) { // nolint: gocyclo
parseNATSConfig(&config.NATS, v)
parseStreamsConfig(config, v)
parseClusteringConfig(config, v)
parseActivityStreamConfig(config, v)

// If SegmentMaxAge is not set, default it to the retention time.
if config.Streams.SegmentMaxAge == 0 {
Expand Down Expand Up @@ -511,6 +534,29 @@ func parseClusteringConfig(config *Config, v *viper.Viper) error { // nolint: go
return nil
}

// parseActivityStreamConfig parses the `activitystream` section of a config
// file and populates the given Config.
func parseActivityStreamConfig(config *Config, v *viper.Viper) error { // nolint: gocyclo
if v.IsSet(configActivityStreamEnabled) {
config.ActivityStream.Enabled = v.GetBool(configActivityStreamEnabled)
}

if v.IsSet(configActivityStreamPublishTimeout) {
config.ActivityStream.PublishTimeout = v.GetDuration(configActivityStreamPublishTimeout)
}

if v.IsSet(configActivityStreamPublishAckPolicy) {
ackPolicy, err := parseAckPolicy(v)
if err != nil {
return err
}

config.ActivityStream.PublishAckPolicy = ackPolicy
}

return nil
}

// HostPort is simple struct to hold parsed listen/addr strings.
type HostPort struct {
Host string
Expand Down Expand Up @@ -538,3 +584,19 @@ func parseListen(v *viper.Viper) (*HostPort, error) {
}
return hp, nil
}

// parseAckPolicy will parse the activity stream's `ack.policy` option
// containing the ack policy to use when publishing activity events.
func parseAckPolicy(v *viper.Viper) (client.AckPolicy, error) {
ackPolicy := v.GetString(configActivityStreamPublishAckPolicy)
switch ackPolicy {
case "none":
return client.AckPolicy_NONE, nil
case "leader":
return client.AckPolicy_LEADER, nil
case "all":
return client.AckPolicy_ALL, nil
default:
return defaultActivityStreamPublishAckPolicy, fmt.Errorf("Unknown activity stream publish ack policy %q", ackPolicy)
}
}
6 changes: 6 additions & 0 deletions server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/stretchr/testify/require"

client "github.com/liftbridge-io/liftbridge-api/go"
)

// Ensure NewConfig properly parses config files.
Expand Down Expand Up @@ -45,6 +47,10 @@ func TestNewConfigFromFile(t *testing.T) {
require.Equal(t, 3*time.Second, config.Clustering.ReplicaFetchTimeout)
require.Equal(t, 1, config.Clustering.MinISR)

require.Equal(t, true, config.ActivityStream.Enabled)
require.Equal(t, time.Minute, config.ActivityStream.PublishTimeout)
require.Equal(t, client.AckPolicy_LEADER, config.ActivityStream.PublishAckPolicy)

require.Equal(t, []string{"nats://localhost:4222"}, config.NATS.Servers)
require.Equal(t, "user", config.NATS.User)
require.Equal(t, "pass", config.NATS.Password)
Expand Down
6 changes: 5 additions & 1 deletion server/configs/full.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
---
listen: localhost:9293
host: 0.0.0.0
port: 5050
Expand Down Expand Up @@ -46,6 +45,11 @@ clustering:
fetch.timeout: 3s
min.insync.replicas: '1'

activity.stream:
enabled: true
publish.timeout: 1m
publish.ack.policy: leader

nats:
servers:
- nats://localhost:4222
Expand Down
2 changes: 1 addition & 1 deletion server/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/hashicorp/raft"
"github.com/pkg/errors"

"github.com/liftbridge-io/liftbridge/server/proto"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

// recoverLatestCommittedFSMLog returns the last committed Raft FSM log entry.
Expand Down
23 changes: 22 additions & 1 deletion server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

client "github.com/liftbridge-io/liftbridge-api/go"

"github.com/liftbridge-io/liftbridge/server/proto"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

const (
Expand Down Expand Up @@ -339,6 +339,17 @@ func (m *metadataAPI) CreatePartition(ctx context.Context, req *proto.CreatePart
// Wait for leader to create partition (best effort).
m.waitForPartitionLeader(ctx, req.Partition.Stream, leader, req.Partition.Id)

err := m.publishActivityEvent(client.ActivityStreamEvent{
Op: client.ActivityStreamOp_CREATE_PARTITION,
CreatePartitionOp: &client.CreatePartitionOp{
Stream: req.Partition.Stream,
Partition: req.Partition.Id,
},
})
if err != nil {
return status.Newf(codes.Internal, "Failed to publish on the activity stream: %v", err.Error())
}

return nil
}

Expand Down Expand Up @@ -381,6 +392,16 @@ func (m *metadataAPI) DeleteStream(ctx context.Context, req *proto.DeleteStreamO
return status.New(code, err.Error())
}

err := m.publishActivityEvent(client.ActivityStreamEvent{
Op: client.ActivityStreamOp_DELETE_STREAM,
DeleteStreamOp: &client.DeleteStreamOp{
Stream: req.Stream,
},
})
if err != nil {
return status.Newf(codes.Internal, "Failed to publish on the activity stream: %v", err.Error())
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

client "github.com/liftbridge-io/liftbridge-api/go"
"github.com/liftbridge-io/liftbridge/server/commitlog"
"github.com/liftbridge-io/liftbridge/server/proto"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

// recvChannelSize specifies the size of the channel that feeds the leader
Expand Down
2 changes: 1 addition & 1 deletion server/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require"

client "github.com/liftbridge-io/liftbridge-api/go"
"github.com/liftbridge-io/liftbridge/server/proto"
proto "github.com/liftbridge-io/liftbridge/server/protocol"
)

func createServer(leader bool) *Server {
Expand Down
2 changes: 1 addition & 1 deletion server/proto/envelope.go → server/protocol/envelope.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proto
package protocol

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proto
package protocol

import (
"bytes"
Expand Down
Loading

0 comments on commit 4967b1c

Please sign in to comment.