diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index 42cbe777392..b778c7a3471 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -5,11 +5,10 @@ import ( "sync" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" - "github.com/jbenet/go-ipfs/peer" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) -// TODO declare thread-safe datastore // TODO niceness should be on a per-peer basis. Use-case: Certain peers are // "trusted" and/or controlled by a single human user. The user may want for // these peers to exchange data freely diff --git a/routing/dht/Makefile b/routing/dht/Makefile deleted file mode 100644 index 563234b1d3c..00000000000 --- a/routing/dht/Makefile +++ /dev/null @@ -1,11 +0,0 @@ - -PB = $(wildcard *.proto) -GO = $(PB:.proto=.pb.go) - -all: $(GO) - -%.pb.go: %.proto - protoc --gogo_out=. --proto_path=../../../../:/usr/local/opt/protobuf/include:. $< - -clean: - rm *.pb.go diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 60032f389af..52ae1f76c4b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -11,6 +11,7 @@ import ( inet "github.com/jbenet/go-ipfs/net" msg "github.com/jbenet/go-ipfs/net/message" peer "github.com/jbenet/go-ipfs/peer" + pb "github.com/jbenet/go-ipfs/routing/dht/pb" kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" @@ -128,7 +129,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N } // deserialize msg - pmes := new(Message) + pmes := new(pb.Message) err := proto.Unmarshal(mData, pmes) if err != nil { log.Error("Error unmarshaling data") @@ -140,7 +141,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N // Print out diagnostic log.Debugf("%s got message type: '%s' from %s", - dht.self, Message_MessageType_name[int32(pmes.GetType())], mPeer) + dht.self, pb.Message_MessageType_name[int32(pmes.GetType())], mPeer) // get handler for this msg type. handler := dht.handlerForMsgType(pmes.GetType()) @@ -174,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N // sendRequest sends out a request using dht.sender, but also makes sure to // measure the RTT for latency measurements. -func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { mes, err := msg.FromObject(p, pmes) if err != nil { @@ -185,7 +186,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) // Print out diagnostic log.Debugf("Sent message type: '%s' to %s", - Message_MessageType_name[int32(pmes.GetType())], p) + pb.Message_MessageType_name[int32(pmes.GetType())], p) rmes, err := dht.sender.SendRequest(ctx, mes) if err != nil { @@ -198,7 +199,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) rtt := time.Since(start) rmes.Peer().SetLatency(rtt) - rpmes := new(Message) + rpmes := new(pb.Message) if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil { return nil, err } @@ -210,7 +211,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer, key string, value []byte) error { - pmes := newMessage(Message_PUT_VALUE, string(key), 0) + pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) pmes.Value = value rpmes, err := dht.sendRequest(ctx, p, pmes) if err != nil { @@ -225,10 +226,10 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer, func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error { - pmes := newMessage(Message_ADD_PROVIDER, string(key), 0) + pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0) // add self as the provider - pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self}) + pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self}) rpmes, err := dht.sendRequest(ctx, p, pmes) if err != nil { @@ -290,9 +291,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, // getValueSingle simply performs the get value RPC with the given parameters func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer, - key u.Key, level int) (*Message, error) { + key u.Key, level int) (*pb.Message, error) { - pmes := newMessage(Message_GET_VALUE, string(key), level) + pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level) return dht.sendRequest(ctx, p, pmes) } @@ -301,7 +302,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer, // one to get the value from? Or just connect to one at a time until we get a // successful connection and request the value from it? func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, - peerlist []*Message_Peer, level int) ([]byte, error) { + peerlist []*pb.Message_Peer, level int) ([]byte, error) { for _, pinfo := range peerlist { p, err := dht.ensureConnectedToPeer(pinfo) @@ -379,17 +380,17 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) { return nil, nil } -func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) { - pmes := newMessage(Message_FIND_NODE, string(id), level) +func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) { + pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level) return dht.sendRequest(ctx, p, pmes) } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) { - pmes := newMessage(Message_GET_PROVIDERS, string(key), level) +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) { + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level) return dht.sendRequest(ctx, p, pmes) } -func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer { +func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer { var provArr []peer.Peer for _, prov := range peers { p, err := dht.peerFromInfo(prov) @@ -413,7 +414,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer { } // nearestPeersToQuery returns the routing tables closest peers. -func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer { +func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer { level := pmes.GetClusterLevel() cluster := dht.routingTables[level] @@ -423,7 +424,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer { } // betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. -func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer { +func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer { closer := dht.nearestPeersToQuery(pmes, count) // no node? nil @@ -462,7 +463,7 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) { return p, nil } -func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) { +func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) { id := peer.ID(pbp.GetId()) @@ -485,7 +486,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) { return p, nil } -func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) { +func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) { p, err := dht.peerFromInfo(pbp) if err != nil { return nil, err diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 43bd34f8a22..be6f17d96ee 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -12,6 +12,7 @@ import ( msg "github.com/jbenet/go-ipfs/net/message" mux "github.com/jbenet/go-ipfs/net/mux" peer "github.com/jbenet/go-ipfs/peer" + pb "github.com/jbenet/go-ipfs/routing/dht/pb" u "github.com/jbenet/go-ipfs/util" "time" @@ -127,13 +128,13 @@ func TestGetFailures(t *testing.T) { // u.POut("NotFound Test\n") // Reply with failures to every message fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage { - pmes := new(Message) + pmes := new(pb.Message) err := proto.Unmarshal(mes.Data(), pmes) if err != nil { t.Fatal(err) } - resp := &Message{ + resp := &pb.Message{ Type: pmes.Type, } m, err := msg.FromObject(mes.Peer(), resp) @@ -153,9 +154,9 @@ func TestGetFailures(t *testing.T) { fs.handlers = nil // Now we test this DHT's handleGetValue failure - typ := Message_GET_VALUE + typ := pb.Message_GET_VALUE str := "hello" - req := Message{ + req := pb.Message{ Type: &typ, Key: &str, Value: []byte{0}, @@ -169,7 +170,7 @@ func TestGetFailures(t *testing.T) { mes = d.HandleMessage(ctx, mes) - pmes := new(Message) + pmes := new(pb.Message) err = proto.Unmarshal(mes.Data(), pmes) if err != nil { t.Fatal(err) @@ -215,21 +216,21 @@ func TestNotFound(t *testing.T) { // Reply with random peers to every message fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage { - pmes := new(Message) + pmes := new(pb.Message) err := proto.Unmarshal(mes.Data(), pmes) if err != nil { t.Fatal(err) } switch pmes.GetType() { - case Message_GET_VALUE: - resp := &Message{Type: pmes.Type} + case pb.Message_GET_VALUE: + resp := &pb.Message{Type: pmes.Type} peers := []peer.Peer{} for i := 0; i < 7; i++ { peers = append(peers, _randPeer()) } - resp.CloserPeers = peersToPBPeers(peers) + resp.CloserPeers = pb.PeersToPBPeers(peers) mes, err := msg.FromObject(mes.Peer(), resp) if err != nil { t.Error(err) @@ -282,17 +283,17 @@ func TestLessThanKResponses(t *testing.T) { // Reply with random peers to every message fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage { - pmes := new(Message) + pmes := new(pb.Message) err := proto.Unmarshal(mes.Data(), pmes) if err != nil { t.Fatal(err) } switch pmes.GetType() { - case Message_GET_VALUE: - resp := &Message{ + case pb.Message_GET_VALUE: + resp := &pb.Message{ Type: pmes.Type, - CloserPeers: peersToPBPeers([]peer.Peer{other}), + CloserPeers: pb.PeersToPBPeers([]peer.Peer{other}), } mes, err := msg.FromObject(mes.Peer(), resp) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 8aeb4251b10..35355b32ff4 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -6,6 +6,7 @@ import ( "time" peer "github.com/jbenet/go-ipfs/peer" + pb "github.com/jbenet/go-ipfs/routing/dht/pb" u "github.com/jbenet/go-ipfs/util" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -14,32 +15,32 @@ import ( var CloserPeerCount = 4 // dhthandler specifies the signature of functions that handle DHT messages. -type dhtHandler func(peer.Peer, *Message) (*Message, error) +type dhtHandler func(peer.Peer, *pb.Message) (*pb.Message, error) -func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler { +func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { switch t { - case Message_GET_VALUE: + case pb.Message_GET_VALUE: return dht.handleGetValue - case Message_PUT_VALUE: + case pb.Message_PUT_VALUE: return dht.handlePutValue - case Message_FIND_NODE: + case pb.Message_FIND_NODE: return dht.handleFindPeer - case Message_ADD_PROVIDER: + case pb.Message_ADD_PROVIDER: return dht.handleAddProvider - case Message_GET_PROVIDERS: + case pb.Message_GET_PROVIDERS: return dht.handleGetProviders - case Message_PING: + case pb.Message_PING: return dht.handlePing default: return nil } } -func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey()) // setup response - resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) + resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // first, is the key even a key? key := pmes.GetKey() @@ -77,7 +78,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) provs := dht.providers.GetProviders(u.Key(pmes.GetKey())) if len(provs) > 0 { log.Debugf("handleGetValue returning %d provider[s]", len(provs)) - resp.ProviderPeers = peersToPBPeers(provs) + resp.ProviderPeers = pb.PeersToPBPeers(provs) } // Find closest peer on given cluster to desired key and reply with that info @@ -89,14 +90,14 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) log.Critical("no addresses on peer being sent!") } } - resp.CloserPeers = peersToPBPeers(closer) + resp.CloserPeers = pb.PeersToPBPeers(closer) } return resp, nil } // Store a value in this peer local storage -func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { dht.dslock.Lock() defer dht.dslock.Unlock() dskey := u.Key(pmes.GetKey()).DsKey() @@ -105,13 +106,13 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error) return pmes, err } -func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { log.Debugf("%s Responding to ping from %s!\n", dht.self, p) return pmes, nil } -func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error) { - resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel()) +func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { + resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) var closest []peer.Peer // if looking for self... special case where we send it on CloserPeers. @@ -136,12 +137,12 @@ func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error) for _, p := range withAddresses { log.Debugf("handleFindPeer: sending back '%s'", p) } - resp.CloserPeers = peersToPBPeers(withAddresses) + resp.CloserPeers = pb.PeersToPBPeers(withAddresses) return resp, nil } -func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, error) { - resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) +func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { + resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // check if we have this value, to add ourselves as provider. log.Debugf("handling GetProviders: '%s'", pmes.GetKey()) @@ -160,13 +161,13 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, er // if we've got providers, send thos those. if providers != nil && len(providers) > 0 { - resp.ProviderPeers = peersToPBPeers(providers) + resp.ProviderPeers = pb.PeersToPBPeers(providers) } // Also send closer peers. closer := dht.betterPeersToQuery(pmes, CloserPeerCount) if closer != nil { - resp.CloserPeers = peersToPBPeers(closer) + resp.CloserPeers = pb.PeersToPBPeers(closer) } return resp, nil @@ -177,7 +178,7 @@ type providerInfo struct { Value peer.Peer } -func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { key := u.Key(pmes.GetKey()) log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key)) diff --git a/routing/dht/pb/Makefile b/routing/dht/pb/Makefile new file mode 100644 index 00000000000..08ac883d0d0 --- /dev/null +++ b/routing/dht/pb/Makefile @@ -0,0 +1,11 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $< + +clean: + rm -f *.pb.go + rm -f *.go diff --git a/routing/dht/messages.pb.go b/routing/dht/pb/dht.pb.go similarity index 92% rename from routing/dht/messages.pb.go rename to routing/dht/pb/dht.pb.go index 2da77e7bc20..6c488c51a6c 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/pb/dht.pb.go @@ -1,19 +1,19 @@ -// Code generated by protoc-gen-go. -// source: messages.proto +// Code generated by protoc-gen-gogo. +// source: dht.proto // DO NOT EDIT! /* -Package dht is a generated protocol buffer package. +Package dht_pb is a generated protocol buffer package. It is generated from these files: - messages.proto + dht.proto It has these top-level messages: Message */ -package dht +package dht_pb -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" +import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. @@ -67,7 +67,7 @@ func (x *Message_MessageType) UnmarshalJSON(data []byte) error { type Message struct { // defines what type of message it is. - Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.Message_MessageType" json:"type,omitempty"` + Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.pb.Message_MessageType" json:"type,omitempty"` // defines what coral cluster level this query/response belongs to. ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"` // Used to specify the key associated with this message. @@ -156,5 +156,5 @@ func (m *Message_Peer) GetAddr() string { } func init() { - proto.RegisterEnum("dht.Message_MessageType", Message_MessageType_name, Message_MessageType_value) + proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) } diff --git a/routing/dht/messages.proto b/routing/dht/pb/dht.proto similarity index 98% rename from routing/dht/messages.proto rename to routing/dht/pb/dht.proto index 0676901504c..e0696e6859a 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/pb/dht.proto @@ -1,4 +1,4 @@ -package dht; +package dht.pb; //run `protoc --go_out=. *.proto` to generate diff --git a/routing/dht/Message.go b/routing/dht/pb/message.go similarity index 87% rename from routing/dht/Message.go rename to routing/dht/pb/message.go index ae78d1f3999..a77a5b9176d 100644 --- a/routing/dht/Message.go +++ b/routing/dht/pb/message.go @@ -1,4 +1,4 @@ -package dht +package dht_pb import ( "errors" @@ -8,7 +8,7 @@ import ( peer "github.com/jbenet/go-ipfs/peer" ) -func newMessage(typ Message_MessageType, key string, level int) *Message { +func NewMessage(typ Message_MessageType, key string, level int) *Message { m := &Message{ Type: &typ, Key: &key, @@ -31,7 +31,7 @@ func peerToPBPeer(p peer.Peer) *Message_Peer { return pbp } -func peersToPBPeers(peers []peer.Peer) []*Message_Peer { +func PeersToPBPeers(peers []peer.Peer) []*Message_Peer { pbpeers := make([]*Message_Peer, len(peers)) for i, p := range peers { pbpeers[i] = peerToPBPeer(p) @@ -53,8 +53,7 @@ func (m *Message_Peer) Address() (ma.Multiaddr, error) { func (m *Message) GetClusterLevel() int { level := m.GetClusterLevelRaw() - 1 if level < 0 { - log.Debug("GetClusterLevel: no routing level specified, assuming 0") - level = 0 + return 0 } return int(level) } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 26d17cbc4c7..64a7edbd65a 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -6,6 +6,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" peer "github.com/jbenet/go-ipfs/peer" + pb "github.com/jbenet/go-ipfs/routing/dht/pb" kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" ) @@ -152,10 +153,10 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int return peerOut } -func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan peer.Peer) { +func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { done := make(chan struct{}) for _, pbp := range peers { - go func(mp *Message_Peer) { + go func(mp *pb.Message_Peer) { defer func() { done <- struct{}{} }() // construct new peer p, err := dht.ensureConnectedToPeer(mp) @@ -258,7 +259,7 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error { // Thoughts: maybe this should accept an ID and do a peer lookup? log.Infof("ping %s start", p) - pmes := newMessage(Message_PING, "", 0) + pmes := pb.NewMessage(pb.Message_PING, "", 0) _, err := dht.sendRequest(ctx, p, pmes) log.Infof("ping %s end (err = %s)", p, err) return err