From 8f98c45b79eb5981121666c735ec1e17a8606bab Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 10 Jan 2019 10:49:11 +1100 Subject: [PATCH 1/2] Rename IpfsDHT->Node --- dht.go | 46 +++++++++++++++++++++++----------------------- dht_bootstrap.go | 10 +++++----- dht_net.go | 14 +++++++------- dht_test.go | 22 +++++++++++----------- handlers.go | 18 +++++++++--------- lookup.go | 2 +- notif.go | 6 +++--- notify_test.go | 2 +- query.go | 4 ++-- records.go | 6 +++--- routing.go | 24 ++++++++++++------------ 11 files changed, 77 insertions(+), 77 deletions(-) diff --git a/dht.go b/dht.go index aa531aa58..003c6931a 100644 --- a/dht.go +++ b/dht.go @@ -37,9 +37,9 @@ var log = logging.Logger("dht") // collect members of the routing table. const NumBootstrapQueries = 5 -// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. +// Node is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base IpfsRouting module. -type IpfsDHT struct { +type Node struct { host host.Host // the network services we need self peer.ID // Local peer (yourself) peerstore pstore.Peerstore // Peer Registry @@ -65,7 +65,7 @@ type IpfsDHT struct { } // New creates a new DHT with the specified host and options. -func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) { +func New(ctx context.Context, h host.Host, options ...opts.Option) (*Node, error) { var cfg opts.Options if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil { return nil, err @@ -95,7 +95,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er // NewDHT creates a new DHT object with the given peer as the 'local' host. // IpfsDHT's initialized with this function will respond to DHT requests, // whereas IpfsDHT's initialized with NewDHTClient will not. -func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { +func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *Node { dht, err := New(ctx, h, opts.Datastore(dstore)) if err != nil { panic(err) @@ -107,7 +107,7 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { // host. IpfsDHT clients initialized with this function will not respond to DHT // requests. If you need a peer to respond to DHT requests, use NewDHT instead. // NewDHTClient creates a new DHT object with the given peer as the 'local' host -func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { +func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *Node { dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true)) if err != nil { panic(err) @@ -115,7 +115,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT return dht } -func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *IpfsDHT { +func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *Node { rt := kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()) cmgr := h.ConnManager() @@ -126,7 +126,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p cmgr.UntagPeer(p, "kbucket") } - return &IpfsDHT{ + return &Node{ datastore: dstore, self: h.ID(), peerstore: h.Peerstore(), @@ -141,7 +141,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p } // putValueToPeer stores the given key/value pair at the peer 'p' -func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { +func (dht *Node) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0) pmes.Record = rec @@ -165,7 +165,7 @@ var errInvalidRecord = errors.New("received invalid record") // key. It returns either the value or a list of closer peers. // NOTE: It will update the dht's peerstore with any new addresses // it finds for the given peer. -func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) { +func (dht *Node) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) { pmes, err := dht.getValueSingle(ctx, p, key) if err != nil { @@ -200,7 +200,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) } // getValueSingle simply performs the get value RPC with the given parameters -func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) { +func (dht *Node) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) { meta := logging.LoggableMap{ "key": key, "peer": p, @@ -224,7 +224,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) ( } // getLocal attempts to retrieve the value from the datastore -func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { +func (dht *Node) getLocal(key string) (*recpb.Record, error) { log.Debugf("getLocal %s", key) rec, err := dht.getRecordFromDatastore(mkDsKey(key)) if err != nil { @@ -243,7 +243,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { // getOwnPrivateKey attempts to load the local peers private // key from the peerstore. -func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) { +func (dht *Node) getOwnPrivateKey() (ci.PrivKey, error) { sk := dht.peerstore.PrivKey(dht.self) if sk == nil { log.Warningf("%s dht cannot get own private key!", dht.self) @@ -253,7 +253,7 @@ func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) { } // putLocal stores the key value pair in the datastore -func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { +func (dht *Node) putLocal(key string, rec *recpb.Record) error { log.Debugf("putLocal: %v %v", key, rec) data, err := proto.Marshal(rec) if err != nil { @@ -266,13 +266,13 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { // Update signals the routingTable to Update its last-seen status // on the given peer. -func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { +func (dht *Node) Update(ctx context.Context, p peer.ID) { log.Event(ctx, "updatePeer", p) dht.routingTable.Update(p) } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. -func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo { +func (dht *Node) FindLocal(id peer.ID) pstore.PeerInfo { switch dht.host.Network().Connectedness(id) { case inet.Connected, inet.CanConnect: return dht.peerstore.PeerInfo(id) @@ -282,7 +282,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo { } // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is -func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) { +func (dht *Node) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) { eip := log.EventBegin(ctx, "findPeerSingle", logging.LoggableMap{ "peer": p, @@ -304,7 +304,7 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( } } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) { +func (dht *Node) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) { eip := log.EventBegin(ctx, "findProvidersSingle", p, key) defer eip.Done() @@ -323,13 +323,13 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid. } // nearestPeersToQuery returns the routing tables closest peers. -func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { +func (dht *Node) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count) return closer } // betterPeersToQuery returns nearestPeersToQuery, but if and only if closer than self. -func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID { +func (dht *Node) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID { closer := dht.nearestPeersToQuery(pmes, count) // no node? nil @@ -359,21 +359,21 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [ } // Context return dht's context -func (dht *IpfsDHT) Context() context.Context { +func (dht *Node) Context() context.Context { return dht.ctx } // Process return dht's process -func (dht *IpfsDHT) Process() goprocess.Process { +func (dht *Node) Process() goprocess.Process { return dht.proc } // Close calls Process Close -func (dht *IpfsDHT) Close() error { +func (dht *Node) Close() error { return dht.proc.Close() } -func (dht *IpfsDHT) protocolStrs() []string { +func (dht *Node) protocolStrs() []string { pstrs := make([]string, len(dht.protocols)) for idx, proto := range dht.protocols { pstrs[idx] = string(proto) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index a448e7f2b..6cc027c48 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -45,7 +45,7 @@ var DefaultBootstrapConfig = BootstrapConfig{ // These parameters are configurable. // // As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface -func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { +func (dht *Node) Bootstrap(ctx context.Context) error { proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig) if err != nil { return err @@ -70,7 +70,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { // These parameters are configurable. // // BootstrapWithConfig returns a process, so the user can stop it. -func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) { +func (dht *Node) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) { if cfg.Queries <= 0 { return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) } @@ -96,7 +96,7 @@ func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, // These parameters are configurable. // // SignalBootstrap returns a process, so the user can stop it. -func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) { +func (dht *Node) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) { if cfg.Queries <= 0 { return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) } @@ -110,7 +110,7 @@ func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Ti return proc, nil } -func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) { +func (dht *Node) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) { return func(worker goprocess.Process) { // it would be useful to be able to send out signals of when we bootstrap, too... // maybe this is a good case for whole module event pub/sub? @@ -124,7 +124,7 @@ func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.P } // runBootstrap builds up list of peers by requesting random peer IDs -func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { +func (dht *Node) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { bslog := func(msg string) { log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size()) } diff --git a/dht_net.go b/dht_net.go index 03a7cfc43..955ec167e 100644 --- a/dht_net.go +++ b/dht_net.go @@ -44,11 +44,11 @@ func (w *bufferedDelimitedWriter) Flush() error { } // handleNewStream implements the inet.StreamHandler -func (dht *IpfsDHT) handleNewStream(s inet.Stream) { +func (dht *Node) handleNewStream(s inet.Stream) { go dht.handleNewMessage(s) } -func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { +func (dht *Node) handleNewMessage(s inet.Stream) { ctx := dht.Context() cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func @@ -110,7 +110,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { // sendRequest sends out a request, but also makes sure to // measure the RTT for latency measurements. -func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { +func (dht *Node) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { ms, err := dht.messageSenderForPeer(p) if err != nil { @@ -133,7 +133,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message } // sendMessage sends out a message -func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { +func (dht *Node) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { ms, err := dht.messageSenderForPeer(p) if err != nil { return err @@ -146,7 +146,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message return nil } -func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error { +func (dht *Node) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error { // Make sure that this node is actually a DHT server, not just a client. protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...) if err == nil && len(protos) > 0 { @@ -155,7 +155,7 @@ func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Me return nil } -func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) { +func (dht *Node) messageSenderForPeer(p peer.ID) (*messageSender, error) { dht.smlk.Lock() ms, ok := dht.strmap[p] if ok { @@ -193,7 +193,7 @@ type messageSender struct { w bufferedWriteCloser lk sync.Mutex p peer.ID - dht *IpfsDHT + dht *Node invalid bool singleMes int diff --git a/dht_test.go b/dht_test.go index dde2f4eb9..47f0708fb 100644 --- a/dht_test.go +++ b/dht_test.go @@ -71,7 +71,7 @@ func (testValidator) Validate(_ string, b []byte) error { return nil } -func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, client bool) *Node { d, err := New( ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), @@ -85,9 +85,9 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { return d } -func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) { +func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*Node) { addrs := make([]ma.Multiaddr, n) - dhts := make([]*IpfsDHT, n) + dhts := make([]*Node, n) peers := make([]peer.ID, n) sanityAddrsMap := make(map[string]struct{}) @@ -113,7 +113,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer return addrs, peers, dhts } -func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) { +func connectNoSync(t *testing.T, ctx context.Context, a, b *Node) { t.Helper() idB := b.self @@ -129,7 +129,7 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) { } } -func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) { +func wait(t *testing.T, ctx context.Context, a, b *Node) { t.Helper() // loop until connection notification has been received. @@ -143,14 +143,14 @@ func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) { } } -func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { +func connect(t *testing.T, ctx context.Context, a, b *Node) { t.Helper() connectNoSync(t, ctx, a, b) wait(t, ctx, a, b) wait(t, ctx, b, a) } -func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { +func bootstrap(t *testing.T, ctx context.Context, dhts []*Node) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -177,7 +177,7 @@ func TestValueGetSet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var dhts [5]*IpfsDHT + var dhts [5]*Node for i := range dhts { dhts[i] = setupDHT(ctx, t, false) @@ -561,7 +561,7 @@ func TestLocalProvides(t *testing.T) { } // if minPeers or avgPeers is 0, dont test for it. -func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool { +func waitForWellFormedTables(t *testing.T, dhts []*Node, minPeers, avgPeers int, timeout time.Duration) bool { // test "well-formed-ness" (>= minPeers peers in every routing table) checkTables := func() bool { @@ -597,7 +597,7 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i } } -func printRoutingTables(dhts []*IpfsDHT) { +func printRoutingTables(dhts []*Node) { // the routing tables should be full now. let's inspect them. fmt.Printf("checking routing table of %d\n", len(dhts)) for _, dht := range dhts { @@ -794,7 +794,7 @@ func TestProvidesMany(t *testing.T) { defer cancel() var wg sync.WaitGroup - getProvider := func(dht *IpfsDHT, k cid.Cid) { + getProvider := func(dht *Node, k cid.Cid) { defer wg.Done() expected := providers[k.KeyString()] diff --git a/handlers.go b/handlers.go index 15e68b482..485329ffd 100644 --- a/handlers.go +++ b/handlers.go @@ -25,7 +25,7 @@ var CloserPeerCount = KValue // dhthandler specifies the signature of functions that handle DHT messages. type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error) -func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { +func (dht *Node) handlerForMsgType(t pb.Message_MessageType) dhtHandler { switch t { case pb.Message_GET_VALUE: return dht.handleGetValue @@ -44,7 +44,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { } } -func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { +func (dht *Node) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { ctx = log.Start(ctx, "handleGetValue") log.SetTag(ctx, "peer", p) defer func() { log.FinishWithErr(ctx, err) }() @@ -86,7 +86,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess return resp, nil } -func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) { +func (dht *Node) checkLocalDatastore(k []byte) (*recpb.Record, error) { log.Debugf("%s handleGetValue looking into ds", dht.self) dskey := convertToDsKey(k) buf, err := dht.datastore.Get(dskey) @@ -145,7 +145,7 @@ func cleanRecord(rec *recpb.Record) { } // Store a value in this peer local storage -func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { +func (dht *Node) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { ctx = log.Start(ctx, "handlePutValue") log.SetTag(ctx, "peer", p) defer func() { log.FinishWithErr(ctx, err) }() @@ -206,7 +206,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess // returns nil, nil when either nothing is found or the value found doesn't properly validate. // returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong) -func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) { +func (dht *Node) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) { buf, err := dht.datastore.Get(dskey) if err == ds.ErrNotFound { return nil, nil @@ -234,12 +234,12 @@ func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) return rec, nil } -func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { +func (dht *Node) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { log.Debugf("%s Responding to ping from %s!\n", dht.self, p) return pmes, nil } -func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { +func (dht *Node) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { ctx = log.Start(ctx, "handleFindPeer") defer func() { log.FinishWithErr(ctx, _err) }() log.SetTag(ctx, "peer", p) @@ -290,7 +290,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess return resp, nil } -func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { +func (dht *Node) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { ctx = log.Start(ctx, "handleGetProviders") defer func() { log.FinishWithErr(ctx, _err) }() log.SetTag(ctx, "peer", p) @@ -338,7 +338,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. return resp, nil } -func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { +func (dht *Node) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { ctx = log.Start(ctx, "handleAddProvider") defer func() { log.FinishWithErr(ctx, _err) }() log.SetTag(ctx, "peer", p) diff --git a/lookup.go b/lookup.go index c3ff09021..3549ea8cd 100644 --- a/lookup.go +++ b/lookup.go @@ -53,7 +53,7 @@ func loggableKey(k string) logging.LoggableMap { // Kademlia 'node lookup' operation. Returns a channel of the K closest peers // to the given key -func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { +func (dht *Node) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { e := log.EventBegin(ctx, "getClosestPeers", loggableKey(key)) tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) if len(tablepeers) == 0 { diff --git a/notif.go b/notif.go index fbcb073be..40580ddb3 100644 --- a/notif.go +++ b/notif.go @@ -7,10 +7,10 @@ import ( ) // netNotifiee defines methods to be used with the IpfsDHT -type netNotifiee IpfsDHT +type netNotifiee Node -func (nn *netNotifiee) DHT() *IpfsDHT { - return (*IpfsDHT)(nn) +func (nn *netNotifiee) DHT() *Node { + return (*Node)(nn) } func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { diff --git a/notify_test.go b/notify_test.go index 855f7d71d..8228b2e13 100644 --- a/notify_test.go +++ b/notify_test.go @@ -74,7 +74,7 @@ func TestNotifieeFuzz(t *testing.T) { connect(t, ctx, d1, d2) } -func checkRoutingTable(a, b *IpfsDHT) bool { +func checkRoutingTable(a, b *Node) bool { // loop until connection notification has been received. // under high load, this may not happen as immediately as we would like. return a.routingTable.Find(b.self) != "" && b.routingTable.Find(a.self) != "" diff --git a/query.go b/query.go index 8794deaa9..b6d6c51da 100644 --- a/query.go +++ b/query.go @@ -21,7 +21,7 @@ import ( var maxQueryConcurrency = AlphaValue type dhtQuery struct { - dht *IpfsDHT + dht *Node key string // the key we're querying for qfunc queryFunc // the function to execute per peer concurrency int // the concurrency parameter @@ -39,7 +39,7 @@ type dhtQueryResult struct { } // constructs query -func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery { +func (dht *Node) newQuery(k string, f queryFunc) *dhtQuery { return &dhtQuery{ key: k, dht: dht, diff --git a/records.go b/records.go index 29dae735d..78ca06eec 100644 --- a/records.go +++ b/records.go @@ -23,7 +23,7 @@ type pubkrs struct { err error } -func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { +func (dht *Node) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { log.Debugf("getPublicKey for: %s", p) // Check locally. Will also try to extract the public key from the peer @@ -74,7 +74,7 @@ func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, err return nil, err } -func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubKey, error) { +func (dht *Node) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubKey, error) { // Only retrieve one value, because the public key is immutable // so there's no need to retrieve multiple versions pkkey := routing.KeyForPublicKey(p) @@ -95,7 +95,7 @@ func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubK return pubk, nil } -func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.PubKey, error) { +func (dht *Node) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.PubKey, error) { // check locally, just in case... pk := dht.peerstore.PubKey(p) if pk != nil { diff --git a/routing.go b/routing.go index c777d2d53..3362649e9 100644 --- a/routing.go +++ b/routing.go @@ -35,7 +35,7 @@ var asyncQueryBuffer = 10 // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT -func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) { +func (dht *Node) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) { eip := log.EventBegin(ctx, "PutValue") defer func() { eip.Append(loggableKey(key)) @@ -110,7 +110,7 @@ type RecvdVal struct { } // GetValue searches for the value corresponding to given Key. -func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) { +func (dht *Node) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) { eip := log.EventBegin(ctx, "GetValue") defer func() { eip.Append(loggableKey(key)) @@ -148,7 +148,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Opti return best, nil } -func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) { +func (dht *Node) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) { var cfg ropts.Options if err := cfg.Apply(opts...); err != nil { return nil, err @@ -250,7 +250,7 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O } // GetValues gets nvals values corresponding to the given key. -func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { +func (dht *Node) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { eip := log.EventBegin(ctx, "GetValues") eip.Append(loggableKey(key)) @@ -270,7 +270,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R return out, ctx.Err() } -func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) { +func (dht *Node) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) { vals := make(chan RecvdVal, 1) done := func(err error) (<-chan RecvdVal, error) { @@ -395,7 +395,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha // locations of the value, similarly to Coral and Mainline DHT. // Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { +func (dht *Node) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst}) defer func() { if err != nil { @@ -435,7 +435,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err wg.Wait() return nil } -func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) { +func (dht *Node) makeProvRecord(skey cid.Cid) (*pb.Message, error) { pi := pstore.PeerInfo{ ID: dht.self, Addrs: dht.host.Addrs(), @@ -453,7 +453,7 @@ func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) { } // FindProviders searches until the context expires. -func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]pstore.PeerInfo, error) { +func (dht *Node) FindProviders(ctx context.Context, c cid.Cid) ([]pstore.PeerInfo, error) { var providers []pstore.PeerInfo for p := range dht.FindProvidersAsync(ctx, c, KValue) { providers = append(providers, p) @@ -464,14 +464,14 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]pstore.Peer // FindProvidersAsync is the same thing as FindProviders, but returns a channel. // Peers will be returned on the channel as soon as they are found, even before // the search query completes. -func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan pstore.PeerInfo { +func (dht *Node) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan pstore.PeerInfo { log.Event(ctx, "findProviders", key) peerOut := make(chan pstore.PeerInfo, count) go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) return peerOut } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan pstore.PeerInfo) { +func (dht *Node) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan pstore.PeerInfo) { defer log.EventBegin(ctx, "findProvidersAsync", key).Done() defer close(peerOut) @@ -567,7 +567,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, } // FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) { +func (dht *Node) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) { eip := log.EventBegin(ctx, "FindPeer", id) defer func() { if err != nil { @@ -644,7 +644,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo } // FindPeersConnectedToPeer searches for peers directly connected to a given peer. -func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) { +func (dht *Node) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) { peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer) peersSeen := make(map[peer.ID]struct{}) From 550ca68fb5021c9032ebc9439f7d78515fa7154b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 10 Jan 2019 11:20:06 +1100 Subject: [PATCH 2/2] Add type alias for IpfsDHT --- dht.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dht.go b/dht.go index 003c6931a..9a513535d 100644 --- a/dht.go +++ b/dht.go @@ -37,6 +37,8 @@ var log = logging.Logger("dht") // collect members of the routing table. const NumBootstrapQueries = 5 +type IpfsDHT = Node + // Node is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base IpfsRouting module. type Node struct {