Skip to content

Commit

Permalink
Merge pull request #4 from renproject/feat/kv
Browse files Browse the repository at this point in the history
Use KV storage v1.0.0
  • Loading branch information
jazg committed Aug 23, 2019
2 parents f4580e6 + 54f5aa1 commit c5a15ba
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 17 deletions.
6 changes: 3 additions & 3 deletions broadcast/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ type Storage interface {
}

type storage struct {
store kv.Store
store kv.Table
}

func NewStorage(store kv.Store) Storage {
func NewStorage(store kv.Table) Storage {
return &storage{store: store}
}

Expand All @@ -24,7 +24,7 @@ func (storage *storage) InsertMessageHash(hash protocol.MessageHash) error {

func (storage *storage) MessageHash(hash protocol.MessageHash) (bool, error) {
var exists bool
if err := storage.store.Get(hash.String(), &exists); err != nil && err != kv.ErrNotFound {
if err := storage.store.Get(hash.String(), &exists); err != nil && err != kv.ErrKeyNotFound {
return false, err
}
return exists, nil
Expand Down
9 changes: 3 additions & 6 deletions dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type DHT interface {
type dht struct {
me protocol.PeerAddress
codec protocol.PeerAddressCodec
store kv.Iterable
store kv.Table

inMemCacheMu *sync.RWMutex
inMemCache map[string]protocol.PeerAddress
Expand All @@ -34,7 +34,7 @@ type dht struct {
// New DHT that stores peer addresses in the given store. It will cache all peer
// addresses in memory for fast access. It is safe for concurrent use,
// regardless of the underlying store.
func New(me protocol.PeerAddress, codec protocol.PeerAddressCodec, store kv.Iterable, bootstrapAddrs ...protocol.PeerAddress) (DHT, error) {
func New(me protocol.PeerAddress, codec protocol.PeerAddressCodec, store kv.Table, bootstrapAddrs ...protocol.PeerAddress) (DHT, error) {
dht := &dht{
me: me,
codec: codec,
Expand Down Expand Up @@ -152,10 +152,7 @@ func (dht *dht) addPeerAddressWithoutLock(peerAddr protocol.PeerAddress) error {
}

func (dht *dht) fillInMemCache() error {
iter, err := dht.store.Iterator()
if err != nil {
return fmt.Errorf("error initialising dht iterator: %v", err)
}
iter := dht.store.Iterator()
for iter.Next() {
var peerAddr protocol.PeerAddress
if err := iter.Value(&peerAddr); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ethereum/go-ethereum v1.9.2
github.com/onsi/ginkgo v1.8.0
github.com/onsi/gomega v1.5.0
github.com/renproject/kv v0.1.0
github.com/renproject/kv v1.0.0
github.com/renproject/phi v0.1.0
github.com/sirupsen/logrus v1.4.2
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/ethereum/go-ethereum v1.9.2 h1:RMIHDO/diqXEgORSVzYx8xW9x2+S32PoAX5lQwya0Lw=
github.com/ethereum/go-ethereum v1.9.2/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -45,6 +47,9 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/renproject/kv v0.1.0 h1:i4XpBL2vkKMpZz7dhYhaPf/I3/3DNfSZkz9GY96UryE=
github.com/renproject/kv v0.1.0/go.mod h1:1kC6F4r7jhMeyZtqc7Of047E/DpfHDphRg/bQHt08hA=
github.com/renproject/kv v1.0.0 h1:Zk/cH8yZtoK8B/bsTkAZTKSNYPKjvA47SYpkN221g9s=
github.com/renproject/kv v1.0.0/go.mod h1:B+LKu5xLj9t65LrbsSyzTcgLcPdWCd0qxFEtNB7NRo8=
github.com/renproject/phi v0.0.0-20190713013721-51f586bc4816/go.mod h1:Hrxx2ONVpfByficRjyRd1trecalYr0lo7Z0akx8UXqg=
github.com/renproject/phi v0.1.0 h1:ZOn7QeDribk/uV46OhQWcTLxyuLg7P+xR1Hfl5cOQuI=
github.com/renproject/phi v0.1.0/go.mod h1:Hrxx2ONVpfByficRjyRd1trecalYr0lo7Z0akx8UXqg=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
Expand Down
13 changes: 6 additions & 7 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/renproject/aw/protocol"
"github.com/renproject/aw/tcp"
"github.com/renproject/kv"
"github.com/renproject/kv/db"
"github.com/renproject/phi"
"github.com/sirupsen/logrus"
)
Expand All @@ -35,8 +34,8 @@ type PeerOptions struct {
BootstrapWorkers int `json:"bootstrapWorkers"` // Defaults to 2x the number of CPUs
BootstrapDuration time.Duration `json:"bootstrapDuration"` // Defaults to 1 hour

DHTStore db.Iterable // Defaults to using in memory store
BroadcasterStore db.Iterable // Defaults to using in memory store
DHTStore kv.Table // Defaults to using in memory store
BroadcasterStore kv.Table // Defaults to using in memory store
SignVerifier protocol.SignVerifier // Defaults to nil
RunFns []RunFn // Defaults to nil
}
Expand Down Expand Up @@ -275,14 +274,14 @@ func Default(options PeerOptions, receiver MessageReceiver, sender MessageSender
}

if options.DHTStore == nil {
options.DHTStore = kv.NewMemDB()
options.DHTStore = kv.NewTable(kv.NewMemDB(kv.GobCodec), "dht")
}

if options.BroadcasterStore == nil {
options.BroadcasterStore = kv.NewMemDB()
options.BroadcasterStore = kv.NewTable(kv.NewMemDB(kv.GobCodec), "broadcaster")
}

dht, err := dht.New(options.Me, options.Codec, kv.NewGob(options.DHTStore), options.BootstrapAddresses...)
dht, err := dht.New(options.Me, options.Codec, options.DHTStore, options.BootstrapAddresses...)
if err != nil {
panic(fmt.Errorf("failed to initialize DHT: %v", err))
}
Expand All @@ -294,6 +293,6 @@ func Default(options PeerOptions, receiver MessageReceiver, sender MessageSender
pingpong.NewPingPonger(dht, sender, events, options.Codec, options.Logger),
cast.NewCaster(dht, sender, events, options.Logger),
multicast.NewMulticaster(dht, sender, events, options.Logger),
broadcast.NewBroadcaster(broadcast.NewStorage(kv.NewGob(options.BroadcasterStore)), dht, sender, events, options.Logger),
broadcast.NewBroadcaster(broadcast.NewStorage(options.BroadcasterStore), dht, sender, events, options.Logger),
)
}

0 comments on commit c5a15ba

Please sign in to comment.