Skip to content

Commit

Permalink
Fix metric expire type. Do not discard metrics in Allocate().
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
  • Loading branch information
hsanjuan committed Apr 5, 2018
1 parent f5f56f2 commit 0069c00
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 27 deletions.
7 changes: 4 additions & 3 deletions allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ func (c *Cluster) allocate(hash *cid.Cid, rplMin, rplMax int, blacklist []peer.I
priorityMetrics := make(map[peer.ID]api.Metric)

// Divide metrics between current and candidates.
// All metrics in metrics are valid (at least the
// moment they were compiled by the monitor)
for _, m := range metrics {
switch {
case m.Discard() || containsPeer(blacklist, m.Peer):
// discard peers with invalid metrics and
// those in the blacklist
case containsPeer(blacklist, m.Peer):
// discard blacklisted peers
continue
case containsPeer(currentAllocs, m.Peer):
currentMetrics[m.Peer] = m
Expand Down
2 changes: 1 addition & 1 deletion allocator/ascendalloc/ascendalloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)

var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano)
var inAMinute = time.Now().Add(time.Minute).UnixNano()

var testCases = []testcase{
{ // regular sort
Expand Down
2 changes: 1 addition & 1 deletion allocator/descendalloc/descendalloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)

var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano)
var inAMinute = time.Now().Add(time.Minute).UnixNano()

var testCases = []testcase{
{ // regular sort
Expand Down
26 changes: 7 additions & 19 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ type Metric struct {
Name string
Peer peer.ID // filled-in by Cluster.
Value string
Expire string // RFC3339Nano
Valid bool // if the metric is not valid it will be discarded
Expire int64 // UnixNano
Valid bool // if the metric is not valid it will be discarded
}

// SetTTL sets Metric to expire after the given seconds
Expand All @@ -615,31 +615,19 @@ func (m *Metric) SetTTL(seconds int) {
// SetTTLDuration sets Metric to expire after the given time.Duration
func (m *Metric) SetTTLDuration(d time.Duration) {
exp := time.Now().Add(d)
m.Expire = exp.UTC().Format(time.RFC3339Nano)
m.Expire = exp.UnixNano()
}

// GetTTL returns the time left before the Metric expires
func (m *Metric) GetTTL() time.Duration {
if m.Expire == "" {
return 0
}
exp, err := time.Parse(time.RFC3339Nano, m.Expire)
if err != nil {
panic(err)
}
return exp.Sub(time.Now())
expDate := time.Unix(0, m.Expire)
return expDate.Sub(time.Now())
}

// Expired returns if the Metric has expired
func (m *Metric) Expired() bool {
if m.Expire == "" {
return true
}
exp, err := time.Parse(time.RFC3339Nano, m.Expire)
if err != nil {
panic(err)
}
return time.Now().After(exp)
expDate := time.Unix(0, m.Expire)
return time.Now().After(expDate)
}

// Discard returns if the metric not valid or has expired
Expand Down
2 changes: 1 addition & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var testingTrackerCfg = []byte(`
`)

var testingMonCfg = []byte(`{
"check_interval": "400ms"
"check_interval": "300ms"
}`)

var testingDiskInfCfg = []byte(`{
Expand Down
2 changes: 1 addition & 1 deletion ipfscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
peerstore "github.com/libp2p/go-libp2p-peerstore"

cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down
2 changes: 1 addition & 1 deletion monitor/basic/peer_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (mon *Monitor) LogMetric(m api.Metric) {
mbyp[peer] = pmets
}

logger.Debugf("logged '%s' metric from '%s'. Expires on %s", name, peer, m.Expire)
logger.Debugf("logged '%s' metric from '%s'. Expires on %d", name, peer, m.Expire)
pmets.add(m)
}

Expand Down
54 changes: 54 additions & 0 deletions monitor/basic/peer_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package basic

import (
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -51,6 +53,58 @@ func TestPeerMonitorShutdown(t *testing.T) {
}
}

func TestLogMetricConcurrent(t *testing.T) {
pm := testPeerMonitor(t)
defer pm.Shutdown()

var wg sync.WaitGroup
wg.Add(3)

f := func() {
defer wg.Done()
for i := 0; i < 25; i++ {
mt := api.Metric{
Name: "test",
Peer: test.TestPeerID1,
Value: fmt.Sprintf("%d", time.Now().UnixNano()),
Valid: true,
}
mt.SetTTLDuration(150 * time.Millisecond)
pm.LogMetric(mt)
time.Sleep(75 * time.Millisecond)
}
}
go f()
go f()
go f()

time.Sleep(150 * time.Millisecond)
last := time.Now().Add(-500 * time.Millisecond)

for i := 0; i <= 20; i++ {
lastMtrcs := pm.LastMetrics("test")

if len(lastMtrcs) != 1 {
t.Error("no valid metrics", len(lastMtrcs), i)
time.Sleep(75 * time.Millisecond)
continue
}

n, err := strconv.Atoi(lastMtrcs[0].Value)
if err != nil {
t.Fatal(err)
}
current := time.Unix(0, int64(n))
if current.Before(last) {
t.Errorf("expected newer metric: Current: %s, Last: %s", current, last)
}
last = current
time.Sleep(75 * time.Millisecond)
}

wg.Wait()
}

func TestPeerMonitorLogMetric(t *testing.T) {
pm := testPeerMonitor(t)
defer pm.Shutdown()
Expand Down
2 changes: 2 additions & 0 deletions peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
t.Fatal("error removing peer:", err)
}

delay()
waitForLeaderAndMetrics(t, clusters)
delay() // this seems to fail when not waiting enough...

for _, icid := range interestingCids {
// Now check that the allocations are new.
Expand Down

0 comments on commit 0069c00

Please sign in to comment.