Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename IpfsDHT structure to KadDHT #338

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ var logger = logging.Logger("dht")
// collect members of the routing table.
const NumBootstrapQueries = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// KadDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
type KadDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore pstore.Peerstore // Peer Registry
Expand Down Expand Up @@ -70,15 +70,15 @@ type IpfsDHT struct {
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
_ routing.IpfsRouting = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
_ routing.ContentRouting = (*KadDHT)(nil)
_ routing.IpfsRouting = (*KadDHT)(nil)
_ routing.PeerRouting = (*KadDHT)(nil)
_ routing.PubKeyFetcher = (*KadDHT)(nil)
_ routing.ValueStore = (*KadDHT)(nil)
)

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
func New(ctx context.Context, h host.Host, options ...opts.Option) (*KadDHT, error) {
var cfg opts.Options
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
Expand Down Expand Up @@ -106,9 +106,9 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
}

// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// KadDHT's initialized with this function will respond to DHT requests,
// whereas KadDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT {
dht, err := New(ctx, h, opts.Datastore(dstore))
if err != nil {
panic(err)
Expand All @@ -117,18 +117,18 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// host. KadDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT {
dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true))
if err != nil {
panic(err)
}
return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *IpfsDHT {
func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *KadDHT {
rt := kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()
Expand All @@ -139,7 +139,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
cmgr.UntagPeer(p, "kbucket")
}

dht := &IpfsDHT{
dht := &KadDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
Expand All @@ -158,7 +158,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
func (dht *KadDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
Expand All @@ -182,7 +182,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {
func (dht *KadDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {

pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
Expand Down Expand Up @@ -217,7 +217,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
}

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
func (dht *KadDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
meta := logging.LoggableMap{
"key": key,
"peer": p,
Expand All @@ -241,7 +241,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
}

// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *KadDHT) getLocal(key string) (*recpb.Record, error) {
logger.Debugf("getLocal %s", key)
rec, err := dht.getRecordFromDatastore(mkDsKey(key))
if err != nil {
Expand All @@ -259,7 +259,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
func (dht *KadDHT) putLocal(key string, rec *recpb.Record) error {
logger.Debugf("putLocal: %v %v", key, rec)
data, err := proto.Marshal(rec)
if err != nil {
Expand All @@ -272,13 +272,13 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {

// Update signals the routingTable to Update its last-seen status
// on the given peer.
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
func (dht *KadDHT) Update(ctx context.Context, p peer.ID) {
logger.Event(ctx, "updatePeer", p)
dht.routingTable.Update(p)
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
func (dht *KadDHT) FindLocal(id peer.ID) pstore.PeerInfo {
switch dht.host.Network().Connectedness(id) {
case inet.Connected, inet.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand All @@ -288,7 +288,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
func (dht *KadDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findPeerSingle",
logging.LoggableMap{
"peer": p,
Expand All @@ -310,7 +310,7 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
func (dht *KadDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, key)
defer eip.Done()

Expand All @@ -329,13 +329,13 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
func (dht *KadDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}

// betterPeersToQuery returns nearestPeersToQuery, but if and only if closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
func (dht *KadDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -365,26 +365,26 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
func (dht *KadDHT) Context() context.Context {
return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
func (dht *KadDHT) Process() goprocess.Process {
return dht.proc
}

// RoutingTable return dht's routingTable
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
func (dht *KadDHT) RoutingTable() *kb.RoutingTable {
return dht.routingTable
}

// Close calls Process Close
func (dht *IpfsDHT) Close() error {
func (dht *KadDHT) Close() error {
return dht.proc.Close()
}

func (dht *IpfsDHT) protocolStrs() []string {
func (dht *KadDHT) protocolStrs() []string {
pstrs := make([]string, len(dht.protocols))
for idx, proto := range dht.protocols {
pstrs[idx] = string(proto)
Expand All @@ -397,19 +397,19 @@ func mkDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}

func (dht *IpfsDHT) PeerID() peer.ID {
func (dht *KadDHT) PeerID() peer.ID {
return dht.self
}

func (dht *IpfsDHT) PeerKey() []byte {
func (dht *KadDHT) PeerKey() []byte {
return kb.ConvertPeerID(dht.self)
}

func (dht *IpfsDHT) Host() host.Host {
func (dht *KadDHT) Host() host.Host {
return dht.host
}

func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
func (dht *KadDHT) Ping(ctx context.Context, p peer.ID) error {
req := pb.NewMessage(pb.Message_PING, nil, 0)
resp, err := dht.sendRequest(ctx, p, req)
if err != nil {
Expand All @@ -424,7 +424,7 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
func (dht *KadDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
Expand Down
18 changes: 9 additions & 9 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ var DefaultBootstrapConfig = BootstrapConfig{

// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
// the default bootstrap config.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
func (dht *KadDHT) Bootstrap(ctx context.Context) error {
return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig)
}

// Runs cfg.Queries bootstrap queries every cfg.Period.
func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
func (dht *KadDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
// Because this method is not synchronous, we have to duplicate sanity
// checks on the config so that callers aren't oblivious.
if cfg.Queries <= 0 {
Expand All @@ -97,7 +97,7 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig

// This is a synchronous bootstrap. cfg.Queries queries will run each with a
// timeout of cfg.Timeout. cfg.Period is not used.
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
func (dht *KadDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
if cfg.Queries <= 0 {
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
Expand All @@ -112,15 +112,15 @@ func newRandomPeerId() peer.ID {
}

// Traverse the DHT toward the given ID.
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) {
func (dht *KadDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) {
// TODO: Extract the query action (traversal logic?) inside FindPeer,
// don't actually call through the FindPeer machinery, which can return
// things out of the peer store etc.
return dht.FindPeer(ctx, target)
}

// Traverse the DHT toward a random ID.
func (dht *IpfsDHT) randomWalk(ctx context.Context) error {
func (dht *KadDHT) randomWalk(ctx context.Context) error {
id := newRandomPeerId()
p, err := dht.walk(ctx, id)
switch err {
Expand All @@ -137,7 +137,7 @@ func (dht *IpfsDHT) randomWalk(ctx context.Context) error {
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
func (dht *KadDHT) selfWalk(ctx context.Context) error {
_, err := dht.walk(ctx, dht.self)
if err == routing.ErrNotFound {
return nil
Expand All @@ -146,7 +146,7 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
}

// runBootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
func (dht *KadDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
doQuery := func(n int, target string, f func(context.Context) error) error {
logger.Infof("starting bootstrap query (%d/%d) to %s (routing table size was %d)",
n, cfg.Queries, target, dht.routingTable.Size())
Expand Down Expand Up @@ -175,10 +175,10 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), dht.selfWalk)
}

func (dht *IpfsDHT) BootstrapRandom(ctx context.Context) error {
func (dht *KadDHT) BootstrapRandom(ctx context.Context) error {
return dht.randomWalk(ctx)
}

func (dht *IpfsDHT) BootstrapSelf(ctx context.Context) error {
func (dht *KadDHT) BootstrapSelf(ctx context.Context) error {
return dht.selfWalk(ctx)
}
14 changes: 7 additions & 7 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (w *bufferedDelimitedWriter) Flush() error {
}

// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
func (dht *KadDHT) handleNewStream(s inet.Stream) {
defer s.Reset()
if dht.handleNewMessage(s) {
// Gracefully close the stream for writes.
Expand All @@ -65,7 +65,7 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
}

// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
func (dht *KadDHT) handleNewMessage(s inet.Stream) bool {
ctx := dht.ctx

cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
Expand Down Expand Up @@ -141,7 +141,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
func (dht *KadDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

ms, err := dht.messageSenderForPeer(ctx, p)
Expand Down Expand Up @@ -175,7 +175,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
}

// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
func (dht *KadDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

ms, err := dht.messageSenderForPeer(ctx, p)
Expand All @@ -198,7 +198,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
func (dht *KadDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) > 0 {
Expand All @@ -207,7 +207,7 @@ func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Me
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
func (dht *KadDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
if ok {
Expand Down Expand Up @@ -244,7 +244,7 @@ type messageSender struct {
r ggio.ReadCloser
lk sync.Mutex
p peer.ID
dht *IpfsDHT
dht *KadDHT

invalid bool
singleMes int
Expand Down
Loading