Skip to content
Permalink
Browse files

Merge pull request #714 from ipfs/feat/monitor-accrual

Monitoring and re-allocation revamp: Accrual failure detection
  • Loading branch information...
hsanjuan committed May 16, 2019
2 parents 13ed787 + 2e8f3ad commit 5be1b6532fb6a7c916667e4d1641f2604ebc05f3
@@ -248,6 +248,16 @@ type GlobalPinInfo struct {
PeerMap map[string]*PinInfo `json:"peer_map" codec:"pm,omitempty"`
}

// String returns the string representation of a GlobalPinInfo.
func (gpi *GlobalPinInfo) String() string {
str := fmt.Sprintf("Cid: %v\n", gpi.Cid.String())
str = str + "Peer:\n"
for _, p := range gpi.PeerMap {
str = str + fmt.Sprintf("\t%+v\n", p)
}
return str
}

// PinInfo holds information about local pins.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
@@ -581,6 +591,19 @@ type Pin struct {
Reference *cid.Cid `json:"reference" codec:"r,omitempty"`
}

// String is a string representation of a Pin.
func (pin *Pin) String() string {
var b strings.Builder
fmt.Fprintf(&b, "cid: %s\n", pin.Cid.String())
fmt.Fprintf(&b, "type: %s\n", pin.Type)
fmt.Fprintf(&b, "allocations: %v\n", pin.Allocations)
fmt.Fprintf(&b, "maxdepth: %d\n", pin.MaxDepth)
if pin.Reference != nil {
fmt.Fprintf(&b, "reference: %s\n", pin.Reference)
}
return b.String()
}

// PinPath is a wrapper for holding pin options and path of the content.
type PinPath struct {
PinOptions
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"mime/multipart"
"sort"
"sync"
"time"

@@ -83,6 +84,7 @@ type Cluster struct {
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
// if you need to wait until the peer is fully up.
func NewCluster(
ctx context.Context,
host host.Host,
dht *dht.IpfsDHT,
cfg *Config,
@@ -105,7 +107,7 @@ func NewCluster(
return nil, errors.New("cluster host is nil")
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)

listenAddrs := ""
for _, addr := range host.Addrs() {
@@ -297,22 +299,49 @@ func (c *Cluster) alertsHandler() {
case <-c.ctx.Done():
return
case alrt := <-c.monitor.Alerts():
// only the leader handles alerts
leader, err := c.consensus.Leader(c.ctx)
if err == nil && leader == c.id {
logger.Warningf(
"Peer %s received alert for %s in %s",
c.id, alrt.MetricName, alrt.Peer,
)
switch alrt.MetricName {
case pingMetricName:
cState, err := c.consensus.State(c.ctx)
if err != nil {
logger.Warning(err)
return
}
list, err := cState.List(c.ctx)
if err != nil {
logger.Warning(err)
return
}
for _, pin := range list {
if len(pin.Allocations) == 1 && containsPeer(pin.Allocations, alrt.Peer) {
logger.Warning("a pin with only one allocation cannot be repinned")
logger.Warning("to make repinning possible, pin with a replication factor of 2+")
continue
}
if c.shouldPeerRepinCid(alrt.Peer, pin) {
c.repinFromPeer(c.ctx, alrt.Peer)
}
}
}
}
}

// shouldPeerRepinCid returns true if the current peer is the top of the
// allocs list. The failed peer is ignored, i.e. if current peer is
// second and the failed peer is first, the function will still
// return true.
func (c *Cluster) shouldPeerRepinCid(failed peer.ID, pin *api.Pin) bool {
if containsPeer(pin.Allocations, failed) && containsPeer(pin.Allocations, c.id) {
allocs := peer.IDSlice(pin.Allocations)
sort.Sort(allocs)
if allocs[0] == c.id {
return true
}

if allocs[1] == c.id && allocs[0] == failed {
return true
}
}
return false
}

// detects any changes in the peerset and saves the configuration. When it
// detects that we have been removed from the peerset, it shuts down this peer.
func (c *Cluster) watchPeers() {
@@ -140,8 +140,9 @@ type mockTracer struct {

func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracker) {
clusterCfg, _, _, _, _, raftCfg, _, maptrackerCfg, statelesstrackerCfg, psmonCfg, _, _ := testingConfigs()
ctx := context.Background()

host, pubsub, dht, err := NewClusterHost(context.Background(), clusterCfg)
host, pubsub, dht, err := NewClusterHost(ctx, clusterCfg)
if err != nil {
t.Fatal(err)
}
@@ -156,7 +157,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
raftcon, _ := raft.NewConsensus(host, raftCfg, store, false)

psmonCfg.CheckInterval = 2 * time.Second
mon, err := pubsubmon.New(psmonCfg, pubsub, raftcon.Peers)
mon, err := pubsubmon.New(ctx, psmonCfg, pubsub, raftcon.Peers)
if err != nil {
t.Fatal(err)
}
@@ -169,6 +170,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, PinTracke
ReadyTimeout = raftCfg.WaitForLeaderTimeout + 1*time.Second

cl, err := NewCluster(
ctx,
host,
dht,
clusterCfg,
@@ -22,6 +22,7 @@ import (
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"go.opencensus.io/tag"

ds "github.com/ipfs/go-datastore"
host "github.com/libp2p/go-libp2p-host"
@@ -103,6 +104,9 @@ func createCluster(
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgs.clusterCfg)
checkErr("creating libP2P Host", err)

ctx, err = tag.New(ctx, tag.Upsert(observations.HostKey, host.ID().Pretty()))
checkErr("tag context with host id", err)

peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath())
// Import peers but do not connect. We cannot connect to peers until
// everything has been created (dht, pubsub, bitswap). Otherwise things
@@ -164,13 +168,14 @@ func createCluster(
peersF = cons.Peers
}

mon, err := pubsubmon.New(cfgs.pubsubmonCfg, pubsub, peersF)
mon, err := pubsubmon.New(ctx, cfgs.pubsubmonCfg, pubsub, peersF)
if err != nil {
store.Close()
checkErr("setting up PeerMonitor", err)
}

return ipfscluster.NewCluster(
ctx,
host,
dht,
cfgs.clusterCfg,
@@ -102,7 +102,8 @@ var testingTrackerCfg = []byte(`
`)

var testingMonCfg = []byte(`{
"check_interval": "300ms"
"check_interval": "300ms",
"failure_threshold": 5
}`)

var testingDiskInfCfg = []byte(`{
@@ -111,7 +112,10 @@ var testingDiskInfCfg = []byte(`{
}`)

var testingTracerCfg = []byte(`{
"enable_tracing": false
"enable_tracing": false,
"jaeger_agent_endpoint": "/ip4/0.0.0.0/udp/6831",
"sampling_prob": 1,
"service_name": "cluster-daemon"
}`)

func testingConfigs() (*Config, *rest.Config, *ipfsproxy.Config, *ipfshttp.Config, *badger.Config, *raft.Config, *crdt.Config, *maptracker.Config, *stateless.Config, *pubsubmon.Config, *disk.Config, *observations.TracingConfig) {
2 go.mod
@@ -87,5 +87,7 @@ require (
golang.org/x/sync v0.0.0-20190412183630-56d357773e84 // indirect
golang.org/x/sys v0.0.0-20190416152802-12500544f89f // indirect
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 // indirect
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a
gonum.org/v1/plot v0.0.0-20190410204940-3a5f52653745
google.golang.org/grpc v1.19.1 // indirect
)
25 go.sum
@@ -14,6 +14,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMx
github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e h1:2Z+EBRrOJsA3psnUPcEWMIH2EIga1xHflQcr/EZslx8=
github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e/go.mod h1:3oM7gXIttpYDAJXpVNnSCiUMYBLIZ6cb1t+Ip982MRo=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af h1:wVe6/Ea46ZMeNkQjjBW6xcqyQA/j5e0D6GytH95g0gQ=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs=
@@ -69,6 +71,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5/go.mod h1:JpoxHjuQauoxiFMl1ie8Xc/7TfLuMZ5eOCONd1sUBHg=
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90 h1:WXb3TSNmHp2vHoCroCIB1foO/yQ36swABL8aOVeDpgg=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -81,6 +85,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
@@ -255,6 +261,8 @@ github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVY
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5 h1:PJr+ZMXIecYc1Ey2zucXdR73SMBtgjPgwa31099IMv0=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/kelseyhightower/envconfig v1.3.0 h1:IvRS4f2VcIQy6j4ORGIf9145T/AsUB+oY8LyvN8BXNM=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
@@ -567,7 +575,13 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190417170229-92d88b081a49 h1:mE9V9RMa141kxdQR3pfZM3mkg0MPyw+FOPpnciBXkbE=
golang.org/x/crypto v0.0.0-20190417170229-92d88b081a49/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2 h1:y102fOLFqhV41b+4GPiJoa0k/x+pJcEi2/HB1Y5T6fU=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81 h1:00VmoueYNlNz/aHIilyyQz/MHSqGoWJzpFv/HW8xpzI=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -622,9 +636,11 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181219222714-6e267b5cc78e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c h1:vamGzbGri8IKo20MQncCuljcQ5uAO6kaCeawQPVblAI=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
@@ -633,6 +649,13 @@ golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3 h1:P6iTFmrTQqWrqLZPX1VMz
golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373 h1:PPwnA7z1Pjf7XYaBP9GL1VAMZmcIWyFz7QCMSIIa3Bg=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a h1:XffIu/i+IJIC+M8WoBEmJm8N/YYbA8Pvh748YgzU7kI=
gonum.org/v1/gonum v0.0.0-20190321072728-ca4d35bc590a/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190410204940-3a5f52653745 h1:Xaq5xR1I2KM/MWp1vwZxOosUPa1U8wtNN8zRbVko0ZY=
gonum.org/v1/plot v0.0.0-20190410204940-3a5f52653745/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
google.golang.org/api v0.0.0-20181220000619-583d854617af h1:iQMS7JKv/0w/iiWf1M49Cg3dmOkBoBZT5KheqPDpaac=
google.golang.org/api v0.0.0-20181220000619-583d854617af/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.3.1 h1:oJra/lMfmtm13/rgY/8i3MzjFWYXvQIAKjQ3HqofMk8=
@@ -667,3 +690,5 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
rsc.io/pdf v0.1.1 h1:k1MczvYDUvJBe93bYd7wrZLLUEcLZAuF824/I4e5Xr4=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
@@ -56,6 +56,7 @@ func TestDefault(t *testing.T) {
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}

}

func TestApplyEnvVars(t *testing.T) {

0 comments on commit 5be1b65

Please sign in to comment.
You can’t perform that action at this time.