Skip to content

Commit

Permalink
Merge pull request #1346 from ipfs/feat/1008-crdt-batch
Browse files Browse the repository at this point in the history
crdt: Add batching support
  • Loading branch information
hsanjuan committed Apr 30, 2021
2 parents 3ce8a5d + 8b33264 commit 3e0f3f1
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 22 deletions.
72 changes: 63 additions & 9 deletions consensus/crdt/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,32 @@ var envConfigKey = "cluster_crdt"

// Default configuration values
var (
DefaultClusterName = "ipfs-cluster"
DefaultPeersetMetric = "ping"
DefaultDatastoreNamespace = "/c" // from "/crdt"
DefaultRebroadcastInterval = time.Minute
DefaultTrustedPeers = []peer.ID{}
DefaultTrustAll = true
DefaultClusterName = "ipfs-cluster"
DefaultPeersetMetric = "ping"
DefaultDatastoreNamespace = "/c" // from "/crdt"
DefaultRebroadcastInterval = time.Minute
DefaultTrustedPeers = []peer.ID{}
DefaultTrustAll = true
DefaultBatchingMaxQueueSize = 50000
)

// BatchingConfig configures parameters for batching multiple pins in a single
// CRDT-put operation.
//
// MaxBatchSize will trigger a commit whenever the number of pins in the batch
// reaches the limit.
//
// MaxBatchAge will trigger a commit when the oldest update in the batch
// reaches it. Setting both values to 0 means batching is disabled.
//
// MaxQueueSize specifies how many items can be waiting to be batched before
// the LogPin/Unpin operations block.
type BatchingConfig struct {
MaxBatchSize int
MaxBatchAge time.Duration
MaxQueueSize int
}

// Config is the configuration object for Consensus.
type Config struct {
config.Saver
Expand All @@ -45,6 +63,10 @@ type Config struct {
// for this peer that are forbidden for other peers.
TrustedPeers []peer.ID

// Specifies whether to batch CRDT updates for increased
// performance.
Batching BatchingConfig

// The interval before re-announcing the current state
// to the network when no activity is observed.
RebroadcastInterval time.Duration
Expand All @@ -60,10 +82,17 @@ type Config struct {
Tracing bool
}

type batchingConfigJSON struct {
MaxBatchSize int `json:"max_batch_size"`
MaxBatchAge string `json:"max_batch_age"`
MaxQueueSize int `json:"max_queue_size,omitempty"`
}

type jsonConfig struct {
ClusterName string `json:"cluster_name"`
TrustedPeers []string `json:"trusted_peers"`
RebroadcastInterval string `json:"rebroadcast_interval,omitempty"`
ClusterName string `json:"cluster_name"`
TrustedPeers []string `json:"trusted_peers"`
Batching batchingConfigJSON `json:"batching"`
RebroadcastInterval string `json:"rebroadcast_interval,omitempty"`

PeersetMetric string `json:"peerset_metric,omitempty"`
DatastoreNamespace string `json:"datastore_namespace,omitempty"`
Expand All @@ -87,6 +116,10 @@ func (cfg *Config) Validate() error {
if cfg.RebroadcastInterval <= 0 {
return errors.New("crdt.rebroadcast_interval is invalid")
}

if cfg.Batching.MaxQueueSize <= 0 {
return errors.New("crdt.batching.max_queue_size is invalid")
}
return nil
}

Expand Down Expand Up @@ -123,11 +156,15 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
cfg.TrustedPeers = append(cfg.TrustedPeers, pid)
}

cfg.Batching.MaxBatchSize = jcfg.Batching.MaxBatchSize

config.SetIfNotDefault(jcfg.Batching.MaxQueueSize, &cfg.Batching.MaxQueueSize)
config.SetIfNotDefault(jcfg.PeersetMetric, &cfg.PeersetMetric)
config.SetIfNotDefault(jcfg.DatastoreNamespace, &cfg.DatastoreNamespace)
config.ParseDurations(
"crdt",
&config.DurationOpt{Duration: jcfg.RebroadcastInterval, Dst: &cfg.RebroadcastInterval, Name: "rebroadcast_interval"},
&config.DurationOpt{Duration: jcfg.Batching.MaxBatchAge, Dst: &cfg.Batching.MaxBatchAge, Name: "max_batch_age"},
)
return cfg.Validate()
}
Expand All @@ -152,6 +189,13 @@ func (cfg *Config) toJSONConfig() *jsonConfig {
jcfg.TrustedPeers = api.PeersToStrings(cfg.TrustedPeers)
}

jcfg.Batching.MaxBatchSize = cfg.Batching.MaxBatchSize
jcfg.Batching.MaxBatchAge = cfg.Batching.MaxBatchAge.String()
if cfg.Batching.MaxQueueSize != DefaultBatchingMaxQueueSize {
jcfg.Batching.MaxQueueSize = cfg.Batching.MaxQueueSize
// otherwise leave as 0/hidden
}

if cfg.PeersetMetric != DefaultPeersetMetric {
jcfg.PeersetMetric = cfg.PeersetMetric
// otherwise leave empty/hidden
Expand All @@ -177,6 +221,11 @@ func (cfg *Config) Default() error {
cfg.DatastoreNamespace = DefaultDatastoreNamespace
cfg.TrustedPeers = DefaultTrustedPeers
cfg.TrustAll = DefaultTrustAll
cfg.Batching = BatchingConfig{
MaxBatchSize: 0,
MaxBatchAge: 0,
MaxQueueSize: DefaultBatchingMaxQueueSize,
}
return nil
}

Expand All @@ -197,3 +246,8 @@ func (cfg *Config) ApplyEnvVars() error {
func (cfg *Config) ToDisplayJSON() ([]byte, error) {
return config.DisplayJSON(cfg.toJSONConfig())
}

func (cfg *Config) batchingEnabled() bool {
return cfg.Batching.MaxBatchSize > 0 &&
cfg.Batching.MaxBatchAge > 0
}
37 changes: 35 additions & 2 deletions consensus/crdt/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package crdt
import (
"os"
"testing"
"time"
)

var cfgJSON = []byte(`
{
"cluster_name": "test",
"trusted_peers": ["QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6"]
"trusted_peers": ["QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6"],
"batching": {
"max_batch_size": 30,
"max_batch_age": "5s",
"max_queue_size": 150
}
}
`)

Expand All @@ -22,6 +28,12 @@ func TestLoadJSON(t *testing.T) {
t.Error("TrustAll should not be enabled when peers in trusted peers")
}

if cfg.Batching.MaxBatchSize != 30 ||
cfg.Batching.MaxBatchAge != 5*time.Second ||
cfg.Batching.MaxQueueSize != 150 {
t.Error("Batching options were not parsed correctly")
}

cfg = &Config{}
err = cfg.LoadJSON([]byte(`
{
Expand Down Expand Up @@ -59,6 +71,10 @@ func TestLoadJSON(t *testing.T) {
if !cfg.TrustAll {
t.Error("expected TrustAll to be true")
}

if cfg.Batching.MaxQueueSize != DefaultBatchingMaxQueueSize {
t.Error("MaxQueueSize should be default when unset")
}
}

func TestToJSON(t *testing.T) {
Expand Down Expand Up @@ -99,15 +115,32 @@ func TestDefault(t *testing.T) {
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}

cfg.Default()
cfg.Batching.MaxQueueSize = -3
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
}

func TestApplyEnvVars(t *testing.T) {
os.Setenv("CLUSTER_CRDT_CLUSTERNAME", "test2")
os.Setenv("CLUSTER_CRDT_BATCHING_MAXBATCHSIZE", "5")
os.Setenv("CLUSTER_CRDT_BATCHING_MAXBATCHAGE", "10s")

cfg := &Config{}
cfg.Default()
cfg.ApplyEnvVars()

if cfg.ClusterName != "test2" {
t.Fatal("failed to override cluster_name with env var")
t.Error("failed to override cluster_name with env var")
}

if cfg.Batching.MaxBatchSize != 5 {
t.Error("MaxBatchSize as env var does not work")
}

if cfg.Batching.MaxBatchAge != 10*time.Second {
t.Error("MaxBatchAge as env var does not work")
}
}
Loading

0 comments on commit 3e0f3f1

Please sign in to comment.