Skip to content

Commit

Permalink
Merge pull request #766 from ipfs/issue_760
Browse files Browse the repository at this point in the history
Separate Identity and Configuration
  • Loading branch information
hsanjuan committed May 16, 2019
2 parents 26b0949 + 50c9aa6 commit f428a3b
Show file tree
Hide file tree
Showing 20 changed files with 492 additions and 160 deletions.
72 changes: 3 additions & 69 deletions cluster_config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ipfscluster

import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
Expand All @@ -15,8 +14,6 @@ import (

"github.com/ipfs/ipfs-cluster/config"

crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
pnet "github.com/libp2p/go-libp2p-pnet"
ma "github.com/multiformats/go-multiaddr"
)
Expand All @@ -25,8 +22,6 @@ const configKey = "cluster"

// Configuration defaults
const (
DefaultConfigCrypto = crypto.RSA
DefaultConfigKeyLength = 2048
DefaultListenAddr = "/ip4/0.0.0.0/tcp/9096"
DefaultStateSyncInterval = 600 * time.Second
DefaultIPFSSyncInterval = 130 * time.Second
Expand All @@ -46,11 +41,6 @@ type Config struct {
lock sync.Mutex
peerstoreLock sync.Mutex

// Libp2p ID and private key for Cluster communication (including)
// the Consensus component.
ID peer.ID
PrivateKey crypto.PrivKey

// User-defined peername for use as human-readable identifier.
Peername string

Expand Down Expand Up @@ -129,9 +119,9 @@ type Config struct {
// saved using JSON. Most configuration keys are converted into simple types
// like strings, and key names aim to be self-explanatory for the user.
type configJSON struct {
ID string `json:"id"`
ID string `json:"id,omitempty"`
Peername string `json:"peername"`
PrivateKey string `json:"private_key"`
PrivateKey string `json:"private_key,omitempty"`
Secret string `json:"secret"`
LeaveOnShutdown bool `json:"leave_on_shutdown"`
ListenMultiaddress string `json:"listen_multiaddress"`
Expand All @@ -153,26 +143,10 @@ func (cfg *Config) ConfigKey() string {

// Default fills in all the Config fields with
// default working values. This means, it will
// generate a valid random ID, PrivateKey and
// Secret.
// generate a Secret.
func (cfg *Config) Default() error {
cfg.setDefaults()

// pid and private key generation --
priv, pub, err := crypto.GenerateKeyPair(
DefaultConfigCrypto,
DefaultConfigKeyLength)
if err != nil {
return err
}
pid, err := peer.IDFromPublicKey(pub)
if err != nil {
return err
}
cfg.ID = pid
cfg.PrivateKey = priv
// --

// cluster secret
clusterSecret, err := pnet.GenerateV1Bytes()
if err != nil {
Expand Down Expand Up @@ -202,18 +176,6 @@ func (cfg *Config) ApplyEnvVars() error {
// Validate will check that the values of this config
// seem to be working ones.
func (cfg *Config) Validate() error {
if cfg.ID == "" {
return errors.New("cluster.ID not set")
}

if cfg.PrivateKey == nil {
return errors.New("no cluster.private_key set")
}

if !cfg.ID.MatchesPrivateKey(cfg.PrivateKey) {
return errors.New("cluster.ID does not match the private_key")
}

if cfg.ListenAddr == nil {
return errors.New("cluster.listen_addr is indefined")
}
Expand Down Expand Up @@ -304,27 +266,8 @@ func (cfg *Config) LoadJSON(raw []byte) error {
func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
config.SetIfNotDefault(jcfg.PeerstoreFile, &cfg.PeerstoreFile)

id, err := peer.IDB58Decode(jcfg.ID)
if err != nil {
err = fmt.Errorf("error decoding cluster ID: %s", err)
return err
}
cfg.ID = id

config.SetIfNotDefault(jcfg.Peername, &cfg.Peername)

pkb, err := base64.StdEncoding.DecodeString(jcfg.PrivateKey)
if err != nil {
err = fmt.Errorf("error decoding private_key: %s", err)
return err
}
pKey, err := crypto.UnmarshalPrivateKey(pkb)
if err != nil {
err = fmt.Errorf("error parsing private_key ID: %s", err)
return err
}
cfg.PrivateKey = pKey

clusterSecret, err := DecodeClusterSecret(jcfg.Secret)
if err != nil {
err = fmt.Errorf("error loading cluster secret from config: %s", err)
Expand Down Expand Up @@ -381,17 +324,8 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {

jcfg = &configJSON{}

// Private Key
pkeyBytes, err := cfg.PrivateKey.Bytes()
if err != nil {
return
}
pKey := base64.StdEncoding.EncodeToString(pkeyBytes)

// Set all configuration fields
jcfg.ID = cfg.ID.Pretty()
jcfg.Peername = cfg.Peername
jcfg.PrivateKey = pKey
jcfg.Secret = EncodeProtectorKey(cfg.Secret)
jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin
jcfg.ReplicationFactorMax = cfg.ReplicationFactorMax
Expand Down
19 changes: 0 additions & 19 deletions cluster_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ func TestLoadJSON(t *testing.T) {
return cfg, nil
}

t.Run("bad id", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ID = "abc" })
if err == nil {
t.Error("expected error decoding ID")
}
})

t.Run("empty default peername", func(t *testing.T) {
cfg, err := loadJSON2(t, func(j *configJSON) { j.Peername = "" })
if err != nil {
Expand All @@ -104,13 +97,6 @@ func TestLoadJSON(t *testing.T) {
}
})

t.Run("bad private key", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.PrivateKey = "abc" })
if err == nil {
t.Error("expected error parsing private key")
}
})

t.Run("bad listen multiaddress", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ListenMultiaddress = "abc" })
if err == nil {
Expand Down Expand Up @@ -212,11 +198,6 @@ func TestApplyEnvVars(t *testing.T) {

func TestValidate(t *testing.T) {
cfg := &Config{}
cfg.Default()
cfg.ID = ""
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}

cfg.Default()
cfg.MonitorPingInterval = 0
Expand Down
20 changes: 14 additions & 6 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ipfscluster
import (
"context"
"errors"
"fmt"
"mime/multipart"
"os"
"path/filepath"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/datastore/inmem"
"github.com/ipfs/ipfs-cluster/informer/numpin"
Expand Down Expand Up @@ -139,17 +141,17 @@ type mockTracer struct {
}

func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) {
clusterCfg, _, _, _, _, raftCfg, _, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ident, clusterCfg, _, _, _, _, raftCfg, _, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()

host, pubsub, dht, err := NewClusterHost(context.Background(), clusterCfg)
host, pubsub, dht, err := NewClusterHost(context.Background(), ident, clusterCfg)
if err != nil {
t.Fatal(err)
}

api := &mockAPI{}
proxy := &mockProxy{}
ipfs := &mockConnector{}
tracker := makePinTracker(t, clusterCfg.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername)
tracker := makePinTracker(t, ident.ID, maptrackerCfg, statelesstrackerCfg, clusterCfg.Peername)
tracer := &mockTracer{}

store := inmem.New()
Expand Down Expand Up @@ -849,9 +851,15 @@ func TestClusterPeers(t *testing.T) {
t.Fatal("expected 1 peer")
}

clusterCfg := &Config{}
clusterCfg.LoadJSON(testingClusterCfg)
if peers[0].ID != clusterCfg.ID {
ident := &config.Identity{}
err := ident.LoadJSON(testingIdentity)
if err != nil {
t.Fatal(err)
}

if peers[0].ID != ident.ID {
fmt.Println(peers[0].ID)
fmt.Println(ident.ID)
t.Error("bad member")
}
}
Expand Down
4 changes: 3 additions & 1 deletion clusterhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"

"github.com/ipfs/ipfs-cluster/config"
libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
Expand All @@ -21,10 +22,11 @@ import (
// the DHT for routing. The resulting DHT is not bootstrapped.
func NewClusterHost(
ctx context.Context,
ident *config.Identity,
cfg *Config,
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) {

h, err := newHost(ctx, cfg.Secret, cfg.PrivateKey, []ma.Multiaddr{cfg.ListenAddr})
h, err := newHost(ctx, cfg.Secret, ident.PrivateKey, []ma.Multiaddr{cfg.ListenAddr})
if err != nil {
return nil, nil, nil, err
}
Expand Down
37 changes: 33 additions & 4 deletions cmd/ipfs-cluster-service/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,39 @@ func makeConfigs() (*config.Manager, *cfgs) {
}
}

func makeAndLoadConfigs() (*config.Manager, *cfgs) {
func makeAndLoadConfigs() (*config.Manager, *config.Identity, *cfgs) {
ident := loadIdentity()
cfgMgr, cfgs := makeConfigs()
checkErr("reading configuration", cfgMgr.LoadJSONFileAndEnv(configPath))
return cfgMgr, cfgs
return cfgMgr, ident, cfgs
}

func loadIdentity() *config.Identity {
_, err := os.Stat(identityPath)

ident := &config.Identity{}
if os.IsNotExist(err) {
clusterConfig, err := config.GetClusterConfig(configPath)
checkErr("couldn not get cluster config", err)
err = ident.LoadJSON(clusterConfig)
checkErr("could not load identity from cluster config", err)

err = ident.SaveJSON(identityPath)
checkErr("could not save identity.json ", err)

err = ident.ApplyEnvVars()
checkErr("could not apply environment variables tot the identity ", err)

return ident
}

err = ident.LoadJSONFromFile(identityPath)
checkErr("could not load identity from identity.json", err)

err = ident.ApplyEnvVars()
checkErr("could not apply environment variables to the identity ", err)

return ident
}

func saveConfig(cfg *config.Manager) {
Expand All @@ -98,15 +127,15 @@ func saveConfig(cfg *config.Manager) {
out("%s configuration written to %s\n", programName, configPath)
}

func propagateTracingConfig(cfgs *cfgs, tracingFlag bool) *cfgs {
func propagateTracingConfig(ident *config.Identity, cfgs *cfgs, tracingFlag bool) *cfgs {
// tracingFlag represents the cli flag passed to ipfs-cluster-service daemon.
// It takes priority. If false, fallback to config file value.
tracingValue := tracingFlag
if !tracingFlag {
tracingValue = cfgs.tracingCfg.EnableTracing
}
// propagate to any other interested configuration
cfgs.tracingCfg.ClusterID = cfgs.clusterCfg.ID.Pretty()
cfgs.tracingCfg.ClusterID = ident.ID.Pretty()
cfgs.tracingCfg.ClusterPeername = cfgs.clusterCfg.Peername
cfgs.tracingCfg.EnableTracing = tracingValue
cfgs.clusterCfg.Tracing = tracingValue
Expand Down
19 changes: 12 additions & 7 deletions cmd/ipfs-cluster-service/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"syscall"
"time"

"github.com/ipfs/ipfs-cluster/config"

ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
Expand Down Expand Up @@ -56,15 +58,16 @@ func daemon(c *cli.Context) error {
locker.lock()
defer locker.tryUnlock()

// Load all the configurations
cfgMgr, cfgs := makeAndLoadConfigs()
// Load all the configurations and identity
cfgMgr, ident, cfgs := makeAndLoadConfigs()

defer cfgMgr.Shutdown()

if c.Bool("stats") {
cfgs.metricsCfg.EnableStats = true
}

cfgs = propagateTracingConfig(cfgs, c.Bool("tracing"))
cfgs = propagateTracingConfig(ident, cfgs, c.Bool("tracing"))

// Cleanup state if bootstrapping
raftStaging := false
Expand All @@ -77,7 +80,7 @@ func daemon(c *cli.Context) error {
cfgs.clusterCfg.LeaveOnShutdown = true
}

cluster, err := createCluster(ctx, c, cfgs, raftStaging)
cluster, err := createCluster(ctx, c, ident, cfgs, raftStaging)
checkErr("starting cluster", err)

// noop if no bootstraps
Expand All @@ -96,11 +99,12 @@ func daemon(c *cli.Context) error {
func createCluster(
ctx context.Context,
c *cli.Context,
ident *config.Identity,
cfgs *cfgs,
raftStaging bool,
) (*ipfscluster.Cluster, error) {

host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgs.clusterCfg)
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, ident, cfgs.clusterCfg)
checkErr("creating libP2P Host", err)

peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath())
Expand Down Expand Up @@ -143,7 +147,7 @@ func createCluster(
tracer, err := observations.SetupTracing(cfgs.tracingCfg)
checkErr("setting up Tracing", err)

store := setupDatastore(c.String("consensus"), cfgs)
store := setupDatastore(c.String("consensus"), ident, cfgs)

cons, err := setupConsensus(
c.String("consensus"),
Expand Down Expand Up @@ -294,9 +298,10 @@ func setupPinTracker(

func setupDatastore(
consensus string,
ident *config.Identity,
cfgs *cfgs,
) ds.Datastore {
stmgr := newStateManager(consensus, cfgs)
stmgr := newStateManager(consensus, ident, cfgs)
store, err := stmgr.GetStore()
checkErr("creating datastore", err)
return store
Expand Down
Loading

0 comments on commit f428a3b

Please sign in to comment.