Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename IpfsDHT to Node #223

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ var log = logging.Logger("dht")
// collect members of the routing table.
const NumBootstrapQueries = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
type IpfsDHT = Node

// Node is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
type Node struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're keeping this struct public, I'd rather prefix it with DHT, and remove the DHT from the functions below.
But actually I wonder if there's any use case to keep Node public; I suspect no. In that case I'd be happy with an unqualified node type demoted to private scope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interface is wider than the routing interface; we would need a new interface for the DHT.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my knowledge, returning a private type allows consumers to access all its public methods, so you don't need to access them through an interface. In this case, the benefit is that it disallows constructing a struct by itself, and it doesn't leak the unqualified Node type to the public namespace.

BTW – I generally agree that there may be more interfaces to extract.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't construct a variable of it's type though, so you can't assign it anywhere which makes it borderline useless.
Note that the interface of the DHT is wider than any extant interface we have.

Copy link
Member

@raulk raulk Jan 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, you can assign it to a local variable with the := operator, but to pass it around in function arguments, you need to declare those args as interfaces. I'd argue that's the correct way, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be a public type, and that type should be returned explicitly by its constructors. See the Go idiom on returning concrete types and accepting interfaces.

host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore pstore.Peerstore // Peer Registry
Expand All @@ -65,7 +67,7 @@ type IpfsDHT struct {
}

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
func New(ctx context.Context, h host.Host, options ...opts.Option) (*Node, error) {
var cfg opts.Options
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
Expand Down Expand Up @@ -95,7 +97,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *Node {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we take the plunge and rename this func to NewNode? It's more idiomatic in Go, as the package is already named dht:

node := dht.NewNode()

We'd need to introduce a method alias, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I concur. I also want to rename NewDHTClient to NewClientNode, or NewPassiveNode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not unnecessarily break existing code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alias would be added for a period for backwards compatibility until the next major version.

dht, err := New(ctx, h, opts.Datastore(dstore))
if err != nil {
panic(err)
Expand All @@ -107,15 +109,15 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *Node {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the above comment, this could become:

func NewClient(ctx context.Context, h host.Host, dstore ds.Batching) *node {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that while you can return private types and still operate on them, they're concealed from docs and a bit dubious. Unfortunately being able to arbitrarily instantiate any public type without going through a constructor is just a Go wart, and not worth trying to evade.

dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true))
if err != nil {
panic(err)
}
return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *IpfsDHT {
func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *Node {
rt := kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()
Expand All @@ -126,7 +128,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
cmgr.UntagPeer(p, "kbucket")
}

return &IpfsDHT{
return &Node{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
Expand All @@ -141,7 +143,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
func (dht *Node) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
Expand All @@ -165,7 +167,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {
func (dht *Node) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {

pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
Expand Down Expand Up @@ -200,7 +202,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
}

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
func (dht *Node) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
meta := logging.LoggableMap{
"key": key,
"peer": p,
Expand All @@ -224,7 +226,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
}

// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *Node) getLocal(key string) (*recpb.Record, error) {
log.Debugf("getLocal %s", key)
rec, err := dht.getRecordFromDatastore(mkDsKey(key))
if err != nil {
Expand All @@ -243,7 +245,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {

// getOwnPrivateKey attempts to load the local peers private
// key from the peerstore.
func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
func (dht *Node) getOwnPrivateKey() (ci.PrivKey, error) {
sk := dht.peerstore.PrivKey(dht.self)
if sk == nil {
log.Warningf("%s dht cannot get own private key!", dht.self)
Expand All @@ -253,7 +255,7 @@ func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
func (dht *Node) putLocal(key string, rec *recpb.Record) error {
log.Debugf("putLocal: %v %v", key, rec)
data, err := proto.Marshal(rec)
if err != nil {
Expand All @@ -266,13 +268,13 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {

// Update signals the routingTable to Update its last-seen status
// on the given peer.
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
func (dht *Node) Update(ctx context.Context, p peer.ID) {
log.Event(ctx, "updatePeer", p)
dht.routingTable.Update(p)
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
func (dht *Node) FindLocal(id peer.ID) pstore.PeerInfo {
switch dht.host.Network().Connectedness(id) {
case inet.Connected, inet.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand All @@ -282,7 +284,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
func (dht *Node) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
eip := log.EventBegin(ctx, "findPeerSingle",
logging.LoggableMap{
"peer": p,
Expand All @@ -304,7 +306,7 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
func (dht *Node) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
eip := log.EventBegin(ctx, "findProvidersSingle", p, key)
defer eip.Done()

Expand All @@ -323,13 +325,13 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
func (dht *Node) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}

// betterPeersToQuery returns nearestPeersToQuery, but if and only if closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
func (dht *Node) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -359,21 +361,21 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
func (dht *Node) Context() context.Context {
return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
func (dht *Node) Process() goprocess.Process {
return dht.proc
}

// Close calls Process Close
func (dht *IpfsDHT) Close() error {
func (dht *Node) Close() error {
return dht.proc.Close()
}

func (dht *IpfsDHT) protocolStrs() []string {
func (dht *Node) protocolStrs() []string {
pstrs := make([]string, len(dht.protocols))
for idx, proto := range dht.protocols {
pstrs[idx] = string(proto)
Expand Down
10 changes: 5 additions & 5 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var DefaultBootstrapConfig = BootstrapConfig{
// These parameters are configurable.
//
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
func (dht *Node) Bootstrap(ctx context.Context) error {
proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig)
if err != nil {
return err
Expand All @@ -70,7 +70,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
// These parameters are configurable.
//
// BootstrapWithConfig returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
func (dht *Node) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
if cfg.Queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
Expand All @@ -96,7 +96,7 @@ func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process,
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
func (dht *Node) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
if cfg.Queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
Expand All @@ -110,7 +110,7 @@ func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Ti
return proc, nil
}

func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
func (dht *Node) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
return func(worker goprocess.Process) {
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
Expand All @@ -124,7 +124,7 @@ func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.P
}

// runBootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
func (dht *Node) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
bslog := func(msg string) {
log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size())
}
Expand Down
14 changes: 7 additions & 7 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func (w *bufferedDelimitedWriter) Flush() error {
}

// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
func (dht *Node) handleNewStream(s inet.Stream) {
go dht.handleNewMessage(s)
}

func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
func (dht *Node) handleNewMessage(s inet.Stream) {
ctx := dht.Context()
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
Expand Down Expand Up @@ -110,7 +110,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
func (dht *Node) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {

ms, err := dht.messageSenderForPeer(p)
if err != nil {
Expand All @@ -133,7 +133,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
}

// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
func (dht *Node) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
ms, err := dht.messageSenderForPeer(p)
if err != nil {
return err
Expand All @@ -146,7 +146,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
func (dht *Node) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) > 0 {
Expand All @@ -155,7 +155,7 @@ func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Me
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
func (dht *Node) messageSenderForPeer(p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
if ok {
Expand Down Expand Up @@ -193,7 +193,7 @@ type messageSender struct {
w bufferedWriteCloser
lk sync.Mutex
p peer.ID
dht *IpfsDHT
dht *Node

invalid bool
singleMes int
Expand Down
22 changes: 11 additions & 11 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (testValidator) Validate(_ string, b []byte) error {
return nil
}

func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
func setupDHT(ctx context.Context, t *testing.T, client bool) *Node {
d, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
Expand All @@ -85,9 +85,9 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
return d
}

func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) {
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*Node) {
addrs := make([]ma.Multiaddr, n)
dhts := make([]*IpfsDHT, n)
dhts := make([]*Node, n)
peers := make([]peer.ID, n)

sanityAddrsMap := make(map[string]struct{})
Expand All @@ -113,7 +113,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
return addrs, peers, dhts
}

func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func connectNoSync(t *testing.T, ctx context.Context, a, b *Node) {
t.Helper()

idB := b.self
Expand All @@ -129,7 +129,7 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}
}

func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func wait(t *testing.T, ctx context.Context, a, b *Node) {
t.Helper()

// loop until connection notification has been received.
Expand All @@ -143,14 +143,14 @@ func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}
}

func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func connect(t *testing.T, ctx context.Context, a, b *Node) {
t.Helper()
connectNoSync(t, ctx, a, b)
wait(t, ctx, a, b)
wait(t, ctx, b, a)
}

func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
func bootstrap(t *testing.T, ctx context.Context, dhts []*Node) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -177,7 +177,7 @@ func TestValueGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var dhts [5]*IpfsDHT
var dhts [5]*Node

for i := range dhts {
dhts[i] = setupDHT(ctx, t, false)
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestLocalProvides(t *testing.T) {
}

// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
func waitForWellFormedTables(t *testing.T, dhts []*Node, minPeers, avgPeers int, timeout time.Duration) bool {
// test "well-formed-ness" (>= minPeers peers in every routing table)

checkTables := func() bool {
Expand Down Expand Up @@ -597,7 +597,7 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i
}
}

func printRoutingTables(dhts []*IpfsDHT) {
func printRoutingTables(dhts []*Node) {
// the routing tables should be full now. let's inspect them.
fmt.Printf("checking routing table of %d\n", len(dhts))
for _, dht := range dhts {
Expand Down Expand Up @@ -794,7 +794,7 @@ func TestProvidesMany(t *testing.T) {
defer cancel()

var wg sync.WaitGroup
getProvider := func(dht *IpfsDHT, k cid.Cid) {
getProvider := func(dht *Node, k cid.Cid) {
defer wg.Done()

expected := providers[k.KeyString()]
Expand Down
Loading