Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename IpfsDHT to DHT #191

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 31 additions & 26 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ var log = logging.Logger("dht")
// collect members of the routing table.
const NumBootstrapQueries = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// DHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
type DHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore pstore.Peerstore // Peer Registry
Expand All @@ -66,8 +66,13 @@ type IpfsDHT struct {
protocols []protocol.ID // DHT protocols
}

// IpfsDHT is a type alias for backwards compatibility.
//
// Deprecated: Use the DHT type directly.
type IpfsDHT = DHT

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
func New(ctx context.Context, h host.Host, options ...opts.Option) (*DHT, error) {
var cfg opts.Options
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
Expand Down Expand Up @@ -95,9 +100,9 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
}

// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// DHT's initialized with this function will respond to DHT requests,
// whereas DHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *DHT {
dht, err := New(ctx, h, opts.Datastore(dstore))
if err != nil {
panic(err)
Expand All @@ -106,18 +111,18 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// host. DHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *DHT {
dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true))
if err != nil {
panic(err)
}
return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *IpfsDHT {
func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *DHT {
rt := kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()
Expand All @@ -128,7 +133,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
cmgr.UntagPeer(p, "kbucket")
}

return &IpfsDHT{
return &DHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
Expand All @@ -143,7 +148,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
func (dht *DHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
Expand All @@ -167,7 +172,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {
func (dht *DHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {

pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
Expand Down Expand Up @@ -202,7 +207,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
}

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
func (dht *DHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
meta := logging.LoggableMap{
"key": key,
"peer": p,
Expand All @@ -226,7 +231,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
}

// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *DHT) getLocal(key string) (*recpb.Record, error) {
log.Debugf("getLocal %s", key)
rec, err := dht.getRecordFromDatastore(mkDsKey(key))
if err != nil {
Expand All @@ -245,7 +250,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {

// getOwnPrivateKey attempts to load the local peers private
// key from the peerstore.
func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
func (dht *DHT) getOwnPrivateKey() (ci.PrivKey, error) {
sk := dht.peerstore.PrivKey(dht.self)
if sk == nil {
log.Warningf("%s dht cannot get own private key!", dht.self)
Expand All @@ -255,7 +260,7 @@ func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
func (dht *DHT) putLocal(key string, rec *recpb.Record) error {
log.Debugf("putLocal: %v %v", key, rec)
data, err := proto.Marshal(rec)
if err != nil {
Expand All @@ -268,13 +273,13 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {

// Update signals the routingTable to Update its last-seen status
// on the given peer.
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
func (dht *DHT) Update(ctx context.Context, p peer.ID) {
log.Event(ctx, "updatePeer", p)
dht.routingTable.Update(p)
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
func (dht *DHT) FindLocal(id peer.ID) pstore.PeerInfo {
switch dht.host.Network().Connectedness(id) {
case inet.Connected, inet.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand All @@ -284,7 +289,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
func (dht *DHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
eip := log.EventBegin(ctx, "findPeerSingle",
logging.LoggableMap{
"peer": p,
Expand All @@ -306,7 +311,7 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid.Cid) (*pb.Message, error) {
func (dht *DHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid.Cid) (*pb.Message, error) {
eip := log.EventBegin(ctx, "findProvidersSingle", p, key)
defer eip.Done()

Expand All @@ -325,13 +330,13 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
func (dht *DHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}

// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
func (dht *DHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -361,21 +366,21 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
func (dht *DHT) Context() context.Context {
return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
func (dht *DHT) Process() goprocess.Process {
return dht.proc
}

// Close calls Process Close
func (dht *IpfsDHT) Close() error {
func (dht *DHT) Close() error {
return dht.proc.Close()
}

func (dht *IpfsDHT) protocolStrs() []string {
func (dht *DHT) protocolStrs() []string {
pstrs := make([]string, len(dht.protocols))
for idx, proto := range dht.protocols {
pstrs[idx] = string(proto)
Expand Down
10 changes: 5 additions & 5 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var DefaultBootstrapConfig = BootstrapConfig{
// These parameters are configurable.
//
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
func (dht *DHT) Bootstrap(ctx context.Context) error {
proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig)
if err != nil {
return err
Expand All @@ -72,7 +72,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
// These parameters are configurable.
//
// BootstrapWithConfig returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
func (dht *DHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
if cfg.Queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
Expand All @@ -98,7 +98,7 @@ func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process,
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
func (dht *DHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
if cfg.Queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
Expand All @@ -112,7 +112,7 @@ func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Ti
return proc, nil
}

func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
func (dht *DHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
return func(worker goprocess.Process) {
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
Expand All @@ -126,7 +126,7 @@ func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.P
}

// runBootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
func (dht *DHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
bslog := func(msg string) {
log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size())
}
Expand Down
14 changes: 7 additions & 7 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func (w *bufferedDelimitedWriter) Flush() error {
}

// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
func (dht *DHT) handleNewStream(s inet.Stream) {
go dht.handleNewMessage(s)
}

func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
func (dht *DHT) handleNewMessage(s inet.Stream) {
ctx := dht.Context()
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
Expand Down Expand Up @@ -110,7 +110,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
func (dht *DHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {

ms, err := dht.messageSenderForPeer(p)
if err != nil {
Expand All @@ -133,7 +133,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
}

// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
func (dht *DHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
ms, err := dht.messageSenderForPeer(p)
if err != nil {
return err
Expand All @@ -146,7 +146,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
func (dht *DHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) > 0 {
Expand All @@ -155,7 +155,7 @@ func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Me
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
func (dht *DHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
if ok {
Expand Down Expand Up @@ -193,7 +193,7 @@ type messageSender struct {
w bufferedWriteCloser
lk sync.Mutex
p peer.ID
dht *IpfsDHT
dht *DHT

invalid bool
singleMes int
Expand Down
22 changes: 11 additions & 11 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (testValidator) Validate(_ string, b []byte) error {
return nil
}

func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
func setupDHT(ctx context.Context, t *testing.T, client bool) *DHT {
d, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
Expand All @@ -85,9 +85,9 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
return d
}

func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) {
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*DHT) {
addrs := make([]ma.Multiaddr, n)
dhts := make([]*IpfsDHT, n)
dhts := make([]*DHT, n)
peers := make([]peer.ID, n)

sanityAddrsMap := make(map[string]struct{})
Expand All @@ -113,7 +113,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
return addrs, peers, dhts
}

func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func connectNoSync(t *testing.T, ctx context.Context, a, b *DHT) {
t.Helper()

idB := b.self
Expand All @@ -129,7 +129,7 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}
}

func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func wait(t *testing.T, ctx context.Context, a, b *DHT) {
t.Helper()

// loop until connection notification has been received.
Expand All @@ -143,14 +143,14 @@ func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}
}

func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func connect(t *testing.T, ctx context.Context, a, b *DHT) {
t.Helper()
connectNoSync(t, ctx, a, b)
wait(t, ctx, a, b)
wait(t, ctx, b, a)
}

func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
func bootstrap(t *testing.T, ctx context.Context, dhts []*DHT) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -177,7 +177,7 @@ func TestValueGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var dhts [5]*IpfsDHT
var dhts [5]*DHT

for i := range dhts {
dhts[i] = setupDHT(ctx, t, false)
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestLocalProvides(t *testing.T) {
}

// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
func waitForWellFormedTables(t *testing.T, dhts []*DHT, minPeers, avgPeers int, timeout time.Duration) bool {
// test "well-formed-ness" (>= minPeers peers in every routing table)

checkTables := func() bool {
Expand Down Expand Up @@ -597,7 +597,7 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i
}
}

func printRoutingTables(dhts []*IpfsDHT) {
func printRoutingTables(dhts []*DHT) {
// the routing tables should be full now. let's inspect them.
fmt.Printf("checking routing table of %d\n", len(dhts))
for _, dht := range dhts {
Expand Down Expand Up @@ -794,7 +794,7 @@ func TestProvidesMany(t *testing.T) {
defer cancel()

var wg sync.WaitGroup
getProvider := func(dht *IpfsDHT, k *cid.Cid) {
getProvider := func(dht *DHT, k *cid.Cid) {
defer wg.Done()

expected := providers[k.KeyString()]
Expand Down
Loading