Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redux: refactor(dht/pb) move proto to pb package #205

Merged
merged 3 commits into from
Oct 25, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions exchange/bitswap/strategy/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"sync"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
"github.com/jbenet/go-ipfs/peer"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)

// TODO declare thread-safe datastore
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are
// "trusted" and/or controlled by a single human user. The user may want for
// these peers to exchange data freely
Expand Down
11 changes: 0 additions & 11 deletions routing/dht/Makefile

This file was deleted.

41 changes: 21 additions & 20 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
inet "github.com/jbenet/go-ipfs/net"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"

Expand Down Expand Up @@ -128,7 +129,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
}

// deserialize msg
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
log.Error("Error unmarshaling data")
Expand All @@ -140,7 +141,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N

// Print out diagnostic
log.Debugf("%s got message type: '%s' from %s",
dht.self, Message_MessageType_name[int32(pmes.GetType())], mPeer)
dht.self, pb.Message_MessageType_name[int32(pmes.GetType())], mPeer)

// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
Expand Down Expand Up @@ -174,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N

// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {

mes, err := msg.FromObject(p, pmes)
if err != nil {
Expand All @@ -185,7 +186,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)

// Print out diagnostic
log.Debugf("Sent message type: '%s' to %s",
Message_MessageType_name[int32(pmes.GetType())], p)
pb.Message_MessageType_name[int32(pmes.GetType())], p)

rmes, err := dht.sender.SendRequest(ctx, mes)
if err != nil {
Expand All @@ -198,7 +199,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)

rpmes := new(Message)
rpmes := new(pb.Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
Expand All @@ -210,7 +211,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, value []byte) error {

pmes := newMessage(Message_PUT_VALUE, string(key), 0)
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Value = value
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
Expand All @@ -225,10 +226,10 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,

func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {

pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)

// add self as the provider
pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self})
pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})

rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
Expand Down Expand Up @@ -290,9 +291,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
key u.Key, level int) (*Message, error) {
key u.Key, level int) (*pb.Message, error) {

pmes := newMessage(Message_GET_VALUE, string(key), level)
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}

Expand All @@ -301,7 +302,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*Message_Peer, level int) ([]byte, error) {
peerlist []*pb.Message_Peer, level int) ([]byte, error) {

for _, pinfo := range peerlist {
p, err := dht.ensureConnectedToPeer(pinfo)
Expand Down Expand Up @@ -379,17 +380,17 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
return nil, nil
}

func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) {
pmes := newMessage(Message_FIND_NODE, string(id), level)
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
var provArr []peer.Peer
for _, prov := range peers {
p, err := dht.peerFromInfo(prov)
Expand All @@ -413,7 +414,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]

Expand All @@ -423,7 +424,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
}

// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer {
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -462,7 +463,7 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
return p, nil
}

func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {

id := peer.ID(pbp.GetId())

Expand All @@ -485,7 +486,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
return p, nil
}

func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) {
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) {
p, err := dht.peerFromInfo(pbp)
if err != nil {
return nil, err
Expand Down
27 changes: 14 additions & 13 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"

"time"
Expand Down Expand Up @@ -127,13 +128,13 @@ func TestGetFailures(t *testing.T) {
// u.POut("NotFound Test\n")
// Reply with failures to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

resp := &Message{
resp := &pb.Message{
Type: pmes.Type,
}
m, err := msg.FromObject(mes.Peer(), resp)
Expand All @@ -153,9 +154,9 @@ func TestGetFailures(t *testing.T) {

fs.handlers = nil
// Now we test this DHT's handleGetValue failure
typ := Message_GET_VALUE
typ := pb.Message_GET_VALUE
str := "hello"
req := Message{
req := pb.Message{
Type: &typ,
Key: &str,
Value: []byte{0},
Expand All @@ -169,7 +170,7 @@ func TestGetFailures(t *testing.T) {

mes = d.HandleMessage(ctx, mes)

pmes := new(Message)
pmes := new(pb.Message)
err = proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -215,21 +216,21 @@ func TestNotFound(t *testing.T) {

// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

switch pmes.GetType() {
case Message_GET_VALUE:
resp := &Message{Type: pmes.Type}
case pb.Message_GET_VALUE:
resp := &pb.Message{Type: pmes.Type}

peers := []peer.Peer{}
for i := 0; i < 7; i++ {
peers = append(peers, _randPeer())
}
resp.CloserPeers = peersToPBPeers(peers)
resp.CloserPeers = pb.PeersToPBPeers(peers)
mes, err := msg.FromObject(mes.Peer(), resp)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -282,17 +283,17 @@ func TestLessThanKResponses(t *testing.T) {

// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

switch pmes.GetType() {
case Message_GET_VALUE:
resp := &Message{
case pb.Message_GET_VALUE:
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: peersToPBPeers([]peer.Peer{other}),
CloserPeers: pb.PeersToPBPeers([]peer.Peer{other}),
}

mes, err := msg.FromObject(mes.Peer(), resp)
Expand Down
Loading