Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Identity and Configuration #766

Merged
merged 11 commits into from
May 16, 2019
72 changes: 3 additions & 69 deletions cluster_config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ipfscluster

import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
Expand All @@ -15,8 +14,6 @@ import (

"github.com/ipfs/ipfs-cluster/config"

crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
pnet "github.com/libp2p/go-libp2p-pnet"
ma "github.com/multiformats/go-multiaddr"
)
Expand All @@ -25,8 +22,6 @@ const configKey = "cluster"

// Configuration defaults
const (
DefaultConfigCrypto = crypto.RSA
DefaultConfigKeyLength = 2048
DefaultListenAddr = "/ip4/0.0.0.0/tcp/9096"
DefaultStateSyncInterval = 600 * time.Second
DefaultIPFSSyncInterval = 130 * time.Second
Expand All @@ -46,11 +41,6 @@ type Config struct {
lock sync.Mutex
peerstoreLock sync.Mutex

// Libp2p ID and private key for Cluster communication (including)
// the Consensus component.
ID peer.ID
PrivateKey crypto.PrivKey

// User-defined peername for use as human-readable identifier.
Peername string

Expand Down Expand Up @@ -129,9 +119,9 @@ type Config struct {
// saved using JSON. Most configuration keys are converted into simple types
// like strings, and key names aim to be self-explanatory for the user.
type configJSON struct {
ID string `json:"id"`
ID string `json:"id,omitempty"`
Peername string `json:"peername"`
PrivateKey string `json:"private_key"`
PrivateKey string `json:"private_key,omitempty"`
Secret string `json:"secret"`
LeaveOnShutdown bool `json:"leave_on_shutdown"`
ListenMultiaddress string `json:"listen_multiaddress"`
Expand All @@ -153,26 +143,10 @@ func (cfg *Config) ConfigKey() string {

// Default fills in all the Config fields with
// default working values. This means, it will
// generate a valid random ID, PrivateKey and
// Secret.
// generate a Secret.
func (cfg *Config) Default() error {
cfg.setDefaults()

// pid and private key generation --
priv, pub, err := crypto.GenerateKeyPair(
DefaultConfigCrypto,
DefaultConfigKeyLength)
if err != nil {
return err
}
pid, err := peer.IDFromPublicKey(pub)
if err != nil {
return err
}
cfg.ID = pid
cfg.PrivateKey = priv
// --

// cluster secret
clusterSecret, err := pnet.GenerateV1Bytes()
if err != nil {
Expand Down Expand Up @@ -202,18 +176,6 @@ func (cfg *Config) ApplyEnvVars() error {
// Validate will check that the values of this config
// seem to be working ones.
func (cfg *Config) Validate() error {
if cfg.ID == "" {
return errors.New("cluster.ID not set")
}

if cfg.PrivateKey == nil {
return errors.New("no cluster.private_key set")
}

if !cfg.ID.MatchesPrivateKey(cfg.PrivateKey) {
return errors.New("cluster.ID does not match the private_key")
}

if cfg.ListenAddr == nil {
return errors.New("cluster.listen_addr is indefined")
}
Expand Down Expand Up @@ -304,27 +266,8 @@ func (cfg *Config) LoadJSON(raw []byte) error {
func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
config.SetIfNotDefault(jcfg.PeerstoreFile, &cfg.PeerstoreFile)

id, err := peer.IDB58Decode(jcfg.ID)
if err != nil {
err = fmt.Errorf("error decoding cluster ID: %s", err)
return err
}
cfg.ID = id

config.SetIfNotDefault(jcfg.Peername, &cfg.Peername)

pkb, err := base64.StdEncoding.DecodeString(jcfg.PrivateKey)
if err != nil {
err = fmt.Errorf("error decoding private_key: %s", err)
return err
}
pKey, err := crypto.UnmarshalPrivateKey(pkb)
if err != nil {
err = fmt.Errorf("error parsing private_key ID: %s", err)
return err
}
cfg.PrivateKey = pKey

clusterSecret, err := DecodeClusterSecret(jcfg.Secret)
if err != nil {
err = fmt.Errorf("error loading cluster secret from config: %s", err)
Expand Down Expand Up @@ -381,17 +324,8 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {

jcfg = &configJSON{}

// Private Key
pkeyBytes, err := cfg.PrivateKey.Bytes()
if err != nil {
return
}
pKey := base64.StdEncoding.EncodeToString(pkeyBytes)

// Set all configuration fields
jcfg.ID = cfg.ID.Pretty()
jcfg.Peername = cfg.Peername
jcfg.PrivateKey = pKey
jcfg.Secret = EncodeProtectorKey(cfg.Secret)
jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin
jcfg.ReplicationFactorMax = cfg.ReplicationFactorMax
Expand Down
19 changes: 0 additions & 19 deletions cluster_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ func TestLoadJSON(t *testing.T) {
return cfg, nil
}

t.Run("bad id", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ID = "abc" })
if err == nil {
t.Error("expected error decoding ID")
}
})

t.Run("empty default peername", func(t *testing.T) {
cfg, err := loadJSON2(t, func(j *configJSON) { j.Peername = "" })
if err != nil {
Expand All @@ -104,13 +97,6 @@ func TestLoadJSON(t *testing.T) {
}
})

t.Run("bad private key", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.PrivateKey = "abc" })
if err == nil {
t.Error("expected error parsing private key")
}
})

t.Run("bad listen multiaddress", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ListenMultiaddress = "abc" })
if err == nil {
Expand Down Expand Up @@ -212,11 +198,6 @@ func TestApplyEnvVars(t *testing.T) {

func TestValidate(t *testing.T) {
cfg := &Config{}
cfg.Default()
cfg.ID = ""
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}

cfg.Default()
cfg.MonitorPingInterval = 0
Expand Down
20 changes: 14 additions & 6 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ipfscluster
import (
"context"
"errors"
"fmt"
"mime/multipart"
"os"
"path/filepath"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/datastore/inmem"
"github.com/ipfs/ipfs-cluster/informer/numpin"
Expand Down Expand Up @@ -139,17 +141,17 @@ type mockTracer struct {
}

func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) {
clusterCfg, _, _, _, _, raftCfg, _, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ident, clusterCfg, _, _, _, _, raftCfg, _, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()

host, pubsub, dht, err := NewClusterHost(context.Background(), clusterCfg)
host, pubsub, dht, err := NewClusterHost(context.Background(), ident, clusterCfg)
if err != nil {
t.Fatal(err)
}

api := &mockAPI{}
proxy := &mockProxy{}
ipfs := &mockConnector{}
tracker := makePinTracker(t, clusterCfg.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername)
tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername)
tracer := &mockTracer{}

store := inmem.New()
Expand Down Expand Up @@ -849,9 +851,15 @@ func TestClusterPeers(t *testing.T) {
t.Fatal("expected 1 peer")
}

clusterCfg := &Config{}
clusterCfg.LoadJSON(testingClusterCfg)
if peers[0].ID != clusterCfg.ID {
ident := &config.Identity{}
err := ident.LoadJSON(testingIdentity)
if err != nil {
t.Fatal(err)
}

if peers[0].ID != ident.ID {
fmt.Println(peers[0].ID)
fmt.Println(ident.ID)
t.Error("bad member")
}
}
Expand Down
4 changes: 3 additions & 1 deletion clusterhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"

"github.com/ipfs/ipfs-cluster/config"
libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
Expand All @@ -21,10 +22,11 @@ import (
// the DHT for routing. The resulting DHT is not bootstrapped.
func NewClusterHost(
ctx context.Context,
ident *config.Identity,
cfg *Config,
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) {

h, err := newHost(ctx, cfg.Secret, cfg.PrivateKey, []ma.Multiaddr{cfg.ListenAddr})
h, err := newHost(ctx, cfg.Secret, ident.PrivateKey, []ma.Multiaddr{cfg.ListenAddr})
if err != nil {
return nil, nil, nil, err
}
Expand Down
37 changes: 33 additions & 4 deletions cmd/ipfs-cluster-service/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,39 @@ func makeConfigs() (*config.Manager, *cfgs) {
}
}

func makeAndLoadConfigs() (*config.Manager, *cfgs) {
func makeAndLoadConfigs() (*config.Manager, *config.Identity, *cfgs) {
ident := loadIdentity()
cfgMgr, cfgs := makeConfigs()
checkErr("reading configuration", cfgMgr.LoadJSONFileAndEnv(configPath))
return cfgMgr, cfgs
return cfgMgr, ident, cfgs
}

func loadIdentity() *config.Identity {
_, err := os.Stat(identityPath)

ident := &config.Identity{}
if os.IsNotExist(err) {
clusterConfig, err := config.GetClusterConfig(configPath)
checkErr("couldn not get cluster config", err)
err = ident.LoadJSON(clusterConfig)
checkErr("could not load identity from cluster config", err)

err = ident.SaveJSON(identityPath)
checkErr("could not save identity.json ", err)

err = ident.ApplyEnvVars()
checkErr("could not apply environment variables tot the identity ", err)

return ident
}

err = ident.LoadJSONFromFile(identityPath)
checkErr("could not load identity from identity.json", err)

err = ident.ApplyEnvVars()
checkErr("could not apply environment variables to the identity ", err)

return ident
}

func saveConfig(cfg *config.Manager) {
Expand All @@ -98,15 +127,15 @@ func saveConfig(cfg *config.Manager) {
out("%s configuration written to %s\n", programName, configPath)
}

func propagateTracingConfig(cfgs *cfgs, tracingFlag bool) *cfgs {
func propagateTracingConfig(ident *config.Identity, cfgs *cfgs, tracingFlag bool) *cfgs {
// tracingFlag represents the cli flag passed to ipfs-cluster-service daemon.
// It takes priority. If false, fallback to config file value.
tracingValue := tracingFlag
if !tracingFlag {
tracingValue = cfgs.tracingCfg.EnableTracing
}
// propagate to any other interested configuration
cfgs.tracingCfg.ClusterID = cfgs.clusterCfg.ID.Pretty()
cfgs.tracingCfg.ClusterID = ident.ID.Pretty()
cfgs.tracingCfg.ClusterPeername = cfgs.clusterCfg.Peername
cfgs.tracingCfg.EnableTracing = tracingValue
cfgs.clusterCfg.Tracing = tracingValue
Expand Down
19 changes: 12 additions & 7 deletions cmd/ipfs-cluster-service/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"syscall"
"time"

"github.com/ipfs/ipfs-cluster/config"

ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
Expand Down Expand Up @@ -56,15 +58,16 @@ func daemon(c *cli.Context) error {
locker.lock()
defer locker.tryUnlock()

// Load all the configurations
cfgMgr, cfgs := makeAndLoadConfigs()
// Load all the configurations and identity
cfgMgr, ident, cfgs := makeAndLoadConfigs()

defer cfgMgr.Shutdown()

if c.Bool("stats") {
cfgs.metricsCfg.EnableStats = true
}

cfgs = propagateTracingConfig(cfgs, c.Bool("tracing"))
cfgs = propagateTracingConfig(ident, cfgs, c.Bool("tracing"))

// Cleanup state if bootstrapping
raftStaging := false
Expand All @@ -77,7 +80,7 @@ func daemon(c *cli.Context) error {
cfgs.clusterCfg.LeaveOnShutdown = true
}

cluster, err := createCluster(ctx, c, cfgs, raftStaging)
cluster, err := createCluster(ctx, c, ident, cfgs, raftStaging)
checkErr("starting cluster", err)

// noop if no bootstraps
Expand All @@ -96,11 +99,12 @@ func daemon(c *cli.Context) error {
func createCluster(
ctx context.Context,
c *cli.Context,
ident *config.Identity,
cfgs *cfgs,
raftStaging bool,
) (*ipfscluster.Cluster, error) {

host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgs.clusterCfg)
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, ident, cfgs.clusterCfg)
checkErr("creating libP2P Host", err)

peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath())
Expand Down Expand Up @@ -143,7 +147,7 @@ func createCluster(
tracer, err := observations.SetupTracing(cfgs.tracingCfg)
checkErr("setting up Tracing", err)

store := setupDatastore(c.String("consensus"), cfgs)
store := setupDatastore(c.String("consensus"), ident, cfgs)

cons, err := setupConsensus(
c.String("consensus"),
Expand Down Expand Up @@ -294,9 +298,10 @@ func setupPinTracker(

func setupDatastore(
consensus string,
ident *config.Identity,
cfgs *cfgs,
) ds.Datastore {
stmgr := newStateManager(consensus, cfgs)
stmgr := newStateManager(consensus, ident, cfgs)
store, err := stmgr.GetStore()
checkErr("creating datastore", err)
return store
Expand Down
Loading