Skip to content

Commit

Permalink
Merge pull request #142 from ipfs/feat/free-space-allocation-metric
Browse files Browse the repository at this point in the history
`FreeSpace` allocation metric impl
  • Loading branch information
hsanjuan committed Sep 2, 2017
2 parents b911aa7 + b1356cd commit af8a385
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 110 deletions.
88 changes: 16 additions & 72 deletions allocator/ascendalloc/ascendalloc.go
@@ -1,13 +1,11 @@
// Package ascendalloc implements an ipfscluster.Allocator returns allocations
// based on sorting the metrics in ascending order. Thus, peers with smallest
// metrics are first in the list. This allocator can be used with a number
// of informers, as long as they provide a numeric metric value.
// Package ascendalloc implements an ipfscluster.PinAllocator, which returns
// allocations based on sorting the metrics in ascending order. Thus, peers with
// smallest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
package ascendalloc

import (
"sort"
"strconv"

"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"

rpc "github.com/hsanjuan/go-libp2p-gorpc"
Expand All @@ -18,79 +16,25 @@ import (

var logger = logging.Logger("ascendalloc")

// Allocator implements ipfscluster.Allocate.
type Allocator struct{}
// AscendAllocator extends the SimpleAllocator
type AscendAllocator struct{}

// NewAllocator returns an initialized Allocator
func NewAllocator() *Allocator {
return &Allocator{}
// NewAscendAllocator returns an initialized AscendAllocator
func NewAllocator() AscendAllocator {
return AscendAllocator{}
}

// SetClient does nothing in this allocator
func (alloc *Allocator) SetClient(c *rpc.Client) {}
func (alloc AscendAllocator) SetClient(c *rpc.Client) {}

// Shutdown does nothing in this allocator
func (alloc *Allocator) Shutdown() error { return nil }
func (alloc AscendAllocator) Shutdown() error { return nil }

// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the candidates
// based on their metric values (from smallest to largest).
func (alloc *Allocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (smallest to largest).
func (alloc AscendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
sortable := newMetricsSorter(candidates)
sort.Sort(sortable)
return sortable.peers, nil
}

// metricsSorter attaches sort.Interface methods to our metrics and sorts
// a slice of peers in the way that interest us
type metricsSorter struct {
peers []peer.ID
m map[peer.ID]int
}

func newMetricsSorter(m map[peer.ID]api.Metric) *metricsSorter {
vMap := make(map[peer.ID]int)
peers := make([]peer.ID, 0, len(m))
for k, v := range m {
if v.Discard() {
continue
}
val, err := strconv.Atoi(v.Value)
if err != nil {
continue
}
peers = append(peers, k)
vMap[k] = val
}

sorter := &metricsSorter{
m: vMap,
peers: peers,
}
return sorter
}

// Len returns the number of metrics
func (s metricsSorter) Len() int {
return len(s.peers)
}

// Less reports if the element in position i is less than the element in j
func (s metricsSorter) Less(i, j int) bool {
peeri := s.peers[i]
peerj := s.peers[j]

x := s.m[peeri]
y := s.m[peerj]

return x < y
}

// Swap swaps the elements in positions i and j
func (s metricsSorter) Swap(i, j int) {
temp := s.peers[i]
s.peers[i] = s.peers[j]
s.peers[j] = temp
return util.SortNumeric(candidates, false), nil
}
19 changes: 9 additions & 10 deletions allocator/ascendalloc/ascendalloc_test.go
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
Expand All @@ -31,25 +30,25 @@ var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "2",
Expire: inAMinute,
Valid: true,
Expand All @@ -61,13 +60,13 @@ var testCases = []testcase{
{ // filter invalid
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
Expand All @@ -79,13 +78,13 @@ var testCases = []testcase{
{ // filter bad value
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
Expand All @@ -97,7 +96,7 @@ var testCases = []testcase{
}

func Test(t *testing.T) {
alloc := &Allocator{}
alloc := &AscendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates)
Expand Down
40 changes: 40 additions & 0 deletions allocator/descendalloc/descendalloc.go
@@ -0,0 +1,40 @@
// Package descendalloc implements an ipfscluster.util.Allocator returns
// allocations based on sorting the metrics in descending order. Thus, peers
// with largest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
package descendalloc

import (
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"

rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)

var logger = logging.Logger("descendalloc")

// DescendAllocator extends the SimpleAllocator
type DescendAllocator struct{}

// NewDescendAllocator returns an initialized DescendAllocator
func NewAllocator() DescendAllocator {
return DescendAllocator{}
}

// SetClient does nothing in this allocator
func (alloc DescendAllocator) SetClient(c *rpc.Client) {}

// Shutdown does nothing in this allocator
func (alloc DescendAllocator) Shutdown() error { return nil }

// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (largest to smallest).
func (alloc DescendAllocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
return util.SortNumeric(candidates, true), nil
}
115 changes: 115 additions & 0 deletions allocator/descendalloc/descendalloc_test.go
@@ -0,0 +1,115 @@
package descendalloc

import (
"testing"
"time"

"github.com/ipfs/ipfs-cluster/api"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)

type testcase struct {
candidates map[peer.ID]api.Metric
current map[peer.ID]api.Metric
expected []peer.ID
}

var (
peer0 = peer.ID("QmUQ6Nsejt1SuZAu8yL8WgqQZHHAYreLVYYa4VPsLUCed7")
peer1 = peer.ID("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
peer2 = peer.ID("QmPrSBATWGAN56fiiEWEhKX3L1F3mTghEQR7vQwaeo7zHi")
peer3 = peer.ID("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)

var inAMinute = time.Now().Add(time.Minute).Format(time.RFC3339Nano)

var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: api.Metric{
Name: "some-metric",
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: api.Metric{
Name: "some-metric",
Value: "2",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1, peer3, peer2, peer0},
},
{ // filter invalid
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: "some-metric",
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: api.Metric{
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad value
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: "some-metric",
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: "some-metric",
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
}

func Test(t *testing.T) {
alloc := &DescendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates)
if err != nil {
t.Fatal(err)
}
if len(res) == 0 {
t.Fatal("0 allocations")
}
for i, r := range res {
if e := tc.expected[len(res)-i-1]; r != e {
t.Errorf("Expect r[%d]=%s but got %s", i, r, e)
}
}
}
}

0 comments on commit af8a385

Please sign in to comment.