Skip to content

Commit

Permalink
Merge b68a830 into 2c500e7
Browse files Browse the repository at this point in the history
  • Loading branch information
Dylan Lott committed Aug 6, 2018
2 parents 2c500e7 + b68a830 commit 246c78e
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 43 additions & 30 deletions pkg/overlay/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package overlay

import (
"context"
"log"
"crypto/rand"

"github.com/gogo/protobuf/proto"
"github.com/zeebo/errs"
Expand Down Expand Up @@ -108,56 +110,67 @@ func (o *Cache) Bootstrap(ctx context.Context) error {
return err
}
}
// called after kademlia is bootstrapped
// needs to take RoutingTable and start to persist it into the cache
// take bootstrap node
// get their route table
// loop through nodes in RT and get THEIR route table
// keep going forever and ever

// Other Possibilities: Randomly generate node ID's to ask for?

_, err = o.DHT.GetRoutingTable(ctx)

return err
}

// Refresh walks the network looking for new nodes and pings existing nodes to eliminate stale addresses
// Refresh updates the cache db with the current DHT.
// We currently do not penalize nodes that are unresponsive,
// but should in the future.
func (o *Cache) Refresh(ctx context.Context) error {
// iterate over all nodes
// compare responses to find new nodes
// listen for responses from existing nodes
// if no response from existing, then mark it as offline for time period
// if responds, it refreshes in DHT
_, rtErr := o.DHT.GetRoutingTable(ctx)

if rtErr != nil {
return rtErr
log.Print("starting cache refresh")
r, err := randomID()
if err != nil {
return err
}

_, err := o.DHT.GetNodes(ctx, "0", 128)

rid := kademlia.NodeID(r)
near, err := o.DHT.GetNodes(ctx, rid.String(), 128)
if err != nil {
return err
}

return nil
}

// Walk iterates over buckets to traverse the network
func (o *Cache) Walk(ctx context.Context) error {
nodes, err := o.DHT.GetNodes(ctx, "0", 128)
for _, node := range near {
pinged, err := o.DHT.Ping(ctx, *node)
if err != nil {
return err
}
err = o.DB.Put([]byte(pinged.Id), []byte(pinged.Address.Address))
if err != nil {
return err
}
}

// TODO: Kademlia hooks to do this automatically rather than at interval
nodes, err := o.DHT.GetNodes(ctx, "", 128)
if err != nil {
return err
}

for _, v := range nodes {
_, err := o.DHT.FindNode(ctx, kademlia.StringToNodeID(v.Id))
for _, node := range nodes {
pinged, err := o.DHT.Ping(ctx, *node)
if err != nil {
zap.Error(ErrNodeNotFound)
return err
} else {
err := o.DB.Put([]byte(pinged.Id), []byte(pinged.Address.Address))
if err != nil {
return err
}
}
}

return err
}

// Walk iterates over each node in each bucket to traverse the network
func (o *Cache) Walk(ctx context.Context) error {
// TODO: This should walk the cache, rather than be a duplicate of refresh
return nil
}

func randomID() ([]byte, error) {
result := make([]byte, 64)
_, err := rand.Read(result)
return result, err
}
95 changes: 95 additions & 0 deletions pkg/overlay/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package overlay
import (
"context"
"fmt"
"math/rand"
"os"
"os/exec"
"path/filepath"
"strconv"
"testing"
"time"

Expand All @@ -17,6 +19,8 @@ import (
"github.com/zeebo/errs"

"storj.io/storj/internal/test"
"storj.io/storj/pkg/dht"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/utils"
"storj.io/storj/protos/overlay"
"storj.io/storj/storage"
Expand All @@ -36,8 +40,66 @@ const (
mock dbClient = iota
bolt
_redis
testNetSize = 30
)

func newTestKademlia(t *testing.T, ip, port string, d dht.DHT, b overlay.Node) *kademlia.Kademlia {
i, err := kademlia.NewID()
assert.NoError(t, err)
id := kademlia.NodeID(*i)
n := []overlay.Node{b}
kad, err := kademlia.NewKademlia(&id, n, ip, port)
assert.NoError(t, err)

return kad
}

func bootstrapTestNetwork(t *testing.T, ip, port string) ([]dht.DHT, overlay.Node) {
bid, err := kademlia.NewID()
assert.NoError(t, err)

bnid := kademlia.NodeID(*bid)
dhts := []dht.DHT{}

p, err := strconv.Atoi(port)
pm := strconv.Itoa(p)
assert.NoError(t, err)
intro, err := kademlia.GetIntroNode(bnid.String(), ip, pm)
assert.NoError(t, err)

boot, err := kademlia.NewKademlia(&bnid, []overlay.Node{*intro}, ip, pm)

assert.NoError(t, err)
rt, err := boot.GetRoutingTable(context.Background())
bootNode := rt.Local()

err = boot.ListenAndServe()
assert.NoError(t, err)
p++

err = boot.Bootstrap(context.Background())
assert.NoError(t, err)
for i := 0; i < testNetSize; i++ {
gg := strconv.Itoa(p)

nid, err := kademlia.NewID()
assert.NoError(t, err)
id := kademlia.NodeID(*nid)

dht, err := kademlia.NewKademlia(&id, []overlay.Node{bootNode}, ip, gg)
assert.NoError(t, err)

p++
dhts = append(dhts, dht)
err = dht.ListenAndServe()
assert.NoError(t, err)
err = dht.Bootstrap(context.Background())
assert.NoError(t, err)
}

return dhts, bootNode
}

var (
getCases = []struct {
testID string
Expand Down Expand Up @@ -146,6 +208,20 @@ var (
data: test.KvStore{},
},
}

refreshCases = []struct {
testID string
expectedTimesCalled int
expectedErr error
data test.KvStore
}{
{
testID: "valid update",
expectedTimesCalled: 1,
expectedErr: nil,
data: test.KvStore{},
},
}
)

func redisTestClient(t *testing.T, data test.KvStore) storage.KeyValueStore {
Expand Down Expand Up @@ -313,6 +389,25 @@ func TestMockPut(t *testing.T) {
}
}

func TestRefresh(t *testing.T) {
for _, c := range refreshCases {
t.Run(c.testID, func(t *testing.T) {
dhts, b := bootstrapTestNetwork(t, "127.0.0.1", "3000")
ctx := context.Background()
db := test.NewMockKeyValueStore(c.data)
dht := newTestKademlia(t, "127.0.0.1", "2999", dhts[rand.Intn(testNetSize)], b)

_cache := &Cache{
DB: db,
DHT: dht,
}

err := _cache.Refresh(ctx)
assert.Equal(t, err, c.expectedErr)
})
}
}

func TestNewRedisOverlayCache(t *testing.T) {
cases := []struct {
testName, address string
Expand Down
33 changes: 19 additions & 14 deletions pkg/overlay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"net/url"
"strconv"
"time"

"github.com/zeebo/errs"
"go.uber.org/zap"
Expand All @@ -26,7 +27,8 @@ var (
// Config is a configuration struct for everything you need to start the
// Overlay cache responsibility.
type Config struct {
DatabaseURL string `help:"the database connection string to use" default:"bolt://$CONFDIR/overlay.db"`
DatabaseURL string `help:"the database connection string to use" default:"bolt://$CONFDIR/overlay.db"`
RefreshInterval time.Duration `help:"the interval at which the cache refreshes itself in seconds" default:"30s"`
}

// Run implements the provider.Responsibility interface. Run assumes a
Expand Down Expand Up @@ -72,11 +74,23 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
return err
}

ticker := time.NewTicker(time.Duration(c.RefreshInterval))
defer ticker.Stop()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
// TODO(jt): should there be a for loop here?
err := cache.Refresh(ctx)
if err != nil {
zap.S().Fatal("cache refreshes stopped", zap.Error(err))
for {
select {
case <-ticker.C:
err := cache.Refresh(ctx)
if err != nil {
zap.S().Error("Error with cache refresh: ", err)
}
case <- ctx.Done():
return
}
}
}()

Expand All @@ -89,15 +103,6 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (
metrics: monkit.Default,
})

go func() {
// TODO(jt): should there be a for loop here?
// TODO(jt): how is this different from Refresh?
err := cache.Walk(ctx)
if err != nil {
zap.S().Fatal("cache walking stopped", zap.Error(err))
}
}()

return server.Run(ctx)
}

Expand Down

0 comments on commit 246c78e

Please sign in to comment.