Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crdt: Add batching support #1346

Merged
merged 4 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 63 additions & 9 deletions consensus/crdt/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,32 @@ var envConfigKey = "cluster_crdt"

// Default configuration values
var (
DefaultClusterName = "ipfs-cluster"
DefaultPeersetMetric = "ping"
DefaultDatastoreNamespace = "/c" // from "/crdt"
DefaultRebroadcastInterval = time.Minute
DefaultTrustedPeers = []peer.ID{}
DefaultTrustAll = true
DefaultClusterName = "ipfs-cluster"
DefaultPeersetMetric = "ping"
DefaultDatastoreNamespace = "/c" // from "/crdt"
DefaultRebroadcastInterval = time.Minute
DefaultTrustedPeers = []peer.ID{}
DefaultTrustAll = true
DefaultBatchingMaxQueueSize = 1000
)

// BatchingConfig configures parameters for batching multiple pins in a single
// CRDT-put operation.
//
// MaxBatchSize will trigger a commit whenever the number of pins in the batch
// reaches the limit.
//
// MaxBatchAge will trigger a commit when the oldest update in the batch
// reaches it. Setting both values to 0 means batching is disabled.
//
// MaxQueueSize specifies how many items can be waiting to be batched before
// the LogPin/Unpin operations block.
hsanjuan marked this conversation as resolved.
Show resolved Hide resolved
type BatchingConfig struct {
MaxBatchSize int
MaxBatchAge time.Duration
MaxQueueSize int
}

// Config is the configuration object for Consensus.
type Config struct {
config.Saver
Expand All @@ -45,6 +63,10 @@ type Config struct {
// for this peer that are forbidden for other peers.
TrustedPeers []peer.ID

// Specifies whether to batch CRDT updates for increased
// performance.
Batching BatchingConfig

// The interval before re-announcing the current state
// to the network when no activity is observed.
RebroadcastInterval time.Duration
Expand All @@ -60,10 +82,17 @@ type Config struct {
Tracing bool
}

type batchingConfigJSON struct {
MaxBatchSize int `json:"max_batch_size"`
MaxBatchAge string `json:"max_batch_age"`
MaxQueueSize int `json:"max_queue_size,omitempty"`
}

type jsonConfig struct {
ClusterName string `json:"cluster_name"`
TrustedPeers []string `json:"trusted_peers"`
RebroadcastInterval string `json:"rebroadcast_interval,omitempty"`
ClusterName string `json:"cluster_name"`
TrustedPeers []string `json:"trusted_peers"`
Batching batchingConfigJSON `json:"batching"`
RebroadcastInterval string `json:"rebroadcast_interval,omitempty"`

PeersetMetric string `json:"peerset_metric,omitempty"`
DatastoreNamespace string `json:"datastore_namespace,omitempty"`
Expand All @@ -87,6 +116,10 @@ func (cfg *Config) Validate() error {
if cfg.RebroadcastInterval <= 0 {
return errors.New("crdt.rebroadcast_interval is invalid")
}

if cfg.Batching.MaxQueueSize <= 0 {
return errors.New("crdt.batching.max_queue_size is invalid")
}
hsanjuan marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down Expand Up @@ -123,11 +156,15 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
cfg.TrustedPeers = append(cfg.TrustedPeers, pid)
}

cfg.Batching.MaxBatchSize = jcfg.Batching.MaxBatchSize

config.SetIfNotDefault(jcfg.Batching.MaxQueueSize, &cfg.Batching.MaxQueueSize)
config.SetIfNotDefault(jcfg.PeersetMetric, &cfg.PeersetMetric)
config.SetIfNotDefault(jcfg.DatastoreNamespace, &cfg.DatastoreNamespace)
config.ParseDurations(
"crdt",
&config.DurationOpt{Duration: jcfg.RebroadcastInterval, Dst: &cfg.RebroadcastInterval, Name: "rebroadcast_interval"},
&config.DurationOpt{Duration: jcfg.Batching.MaxBatchAge, Dst: &cfg.Batching.MaxBatchAge, Name: "max_batch_age"},
)
return cfg.Validate()
}
Expand All @@ -152,6 +189,13 @@ func (cfg *Config) toJSONConfig() *jsonConfig {
jcfg.TrustedPeers = api.PeersToStrings(cfg.TrustedPeers)
}

jcfg.Batching.MaxBatchSize = cfg.Batching.MaxBatchSize
jcfg.Batching.MaxBatchAge = cfg.Batching.MaxBatchAge.String()
if cfg.Batching.MaxQueueSize != DefaultBatchingMaxQueueSize {
jcfg.Batching.MaxQueueSize = cfg.Batching.MaxQueueSize
// otherwise leave as 0/hidden
}

if cfg.PeersetMetric != DefaultPeersetMetric {
jcfg.PeersetMetric = cfg.PeersetMetric
// otherwise leave empty/hidden
Expand All @@ -177,6 +221,11 @@ func (cfg *Config) Default() error {
cfg.DatastoreNamespace = DefaultDatastoreNamespace
cfg.TrustedPeers = DefaultTrustedPeers
cfg.TrustAll = DefaultTrustAll
cfg.Batching = BatchingConfig{
MaxBatchSize: 0,
MaxBatchAge: 0,
MaxQueueSize: DefaultBatchingMaxQueueSize,
}
return nil
}

Expand All @@ -197,3 +246,8 @@ func (cfg *Config) ApplyEnvVars() error {
func (cfg *Config) ToDisplayJSON() ([]byte, error) {
return config.DisplayJSON(cfg.toJSONConfig())
}

func (cfg *Config) batchingEnabled() bool {
return cfg.Batching.MaxBatchSize > 0 &&
cfg.Batching.MaxBatchAge > 0
}
37 changes: 35 additions & 2 deletions consensus/crdt/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package crdt
import (
"os"
"testing"
"time"
)

var cfgJSON = []byte(`
{
"cluster_name": "test",
"trusted_peers": ["QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6"]
"trusted_peers": ["QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6"],
"batching": {
"max_batch_size": 30,
"max_batch_age": "5s",
"max_queue_size": 150
}
}
`)

Expand All @@ -22,6 +28,12 @@ func TestLoadJSON(t *testing.T) {
t.Error("TrustAll should not be enabled when peers in trusted peers")
}

if cfg.Batching.MaxBatchSize != 30 ||
cfg.Batching.MaxBatchAge != 5*time.Second ||
cfg.Batching.MaxQueueSize != 150 {
t.Error("Batching options were not parsed correctly")
}

cfg = &Config{}
err = cfg.LoadJSON([]byte(`
{
Expand Down Expand Up @@ -59,6 +71,10 @@ func TestLoadJSON(t *testing.T) {
if !cfg.TrustAll {
t.Error("expected TrustAll to be true")
}

if cfg.Batching.MaxQueueSize != DefaultBatchingMaxQueueSize {
t.Error("MaxQueueSize should be default when unset")
}
}

func TestToJSON(t *testing.T) {
Expand Down Expand Up @@ -99,15 +115,32 @@ func TestDefault(t *testing.T) {
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}

cfg.Default()
cfg.Batching.MaxQueueSize = -3
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}

func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_CRDT_CLUSTERNAME", "test2")
os.Setenv("CLUSTER_CRDT_BATCHING_MAXBATCHSIZE", "5")
os.Setenv("CLUSTER_CRDT_BATCHING_MAXBATCHAGE", "10s")

cfg := &Config{}
cfg.Default()
cfg.ApplyEnvVars()

if cfg.ClusterName != "test2" {
t.Fatal("failed to override cluster_name with env var")
t.Error("failed to override cluster_name with env var")
}

if cfg.Batching.MaxBatchSize != 5 {
t.Error("MaxBatchSize as env var does not work")
}

if cfg.Batching.MaxBatchAge != 10*time.Second {
t.Error("MaxBatchAge as env var does not work")
}
}
Loading