Skip to content

Commit

Permalink
merge with release/0.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-pure committed Mar 25, 2021
2 parents b317e69 + 6ffc2a1 commit f5b2557
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 156 deletions.
10 changes: 5 additions & 5 deletions channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ type writer struct {
// go ch.Run(ctx)
// // Read inbound messages that have been sent by the remote peer and echo
// // them back to the remote peer.
// for msg := range inbound {
// outbound <- msg
// for Msg := range inbound {
// outbound <- Msg
// }
// // Attach a network connection to the remote peer.
// // ...
Expand All @@ -96,7 +96,7 @@ type Channel struct {
opts Options
remote id.Signatory

inbound chan<- wire.Msg
inbound chan<- wire.Packet
outbound <-chan wire.Msg

readers chan reader
Expand All @@ -116,7 +116,7 @@ type Channel struct {
// outbound messaging channel, but there is no functional attached network
// connection, or when messages are being received on an attached network
// connection, but the inbound message channel is not being drained.
func New(opts Options, remote id.Signatory, inbound chan<- wire.Msg, outbound <-chan wire.Msg) *Channel {
func New(opts Options, remote id.Signatory, inbound chan<- wire.Packet, outbound <-chan wire.Msg) *Channel {
return &Channel{
opts: opts,
remote: remote,
Expand Down Expand Up @@ -292,7 +292,7 @@ func (ch *Channel) readLoop(ctx context.Context) error {
close(r.q)
}
return
case ch.inbound <- m:
case ch.inbound <- wire.Packet{Msg: m, IPAddr: r.Conn.RemoteAddr()}:
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (

var _ = Describe("Channels", func() {

run := func(ctx context.Context, remote id.Signatory) (*channel.Channel, <-chan wire.Msg, chan<- wire.Msg) {
inbound, outbound := make(chan wire.Msg), make(chan wire.Msg)
run := func(ctx context.Context, remote id.Signatory) (*channel.Channel, <-chan wire.Packet, chan<- wire.Msg) {
inbound, outbound := make(chan wire.Packet), make(chan wire.Msg)
ch := channel.New(
channel.DefaultOptions().WithDrainTimeout(1500*time.Millisecond),
remote,
Expand Down Expand Up @@ -54,7 +54,7 @@ var _ = Describe("Channels", func() {
return quit
}

stream := func(inbound <-chan wire.Msg, n uint64, inOrder bool) <-chan struct{} {
stream := func(inbound <-chan wire.Packet, n uint64, inOrder bool) <-chan struct{} {
quit := make(chan struct{})
go func() {
defer GinkgoRecover()
Expand All @@ -65,7 +65,7 @@ var _ = Describe("Channels", func() {
for iter := uint64(0); iter < n; iter++ {
select {
case msg := <-inbound:
data := binary.BigEndian.Uint64(msg.Data)
data := binary.BigEndian.Uint64(msg.Msg.Data)
if data > max {
max = data
}
Expand Down
18 changes: 9 additions & 9 deletions channel/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

type receiver struct {
ctx context.Context
f func(id.Signatory, wire.Msg) error
f func(id.Signatory, wire.Packet) error
}

type sharedChannel struct {
Expand All @@ -28,15 +28,15 @@ type sharedChannel struct {
cancel context.CancelFunc
// inbound channel receives messages from the remote peer to which the
// channel is bound.
inbound <-chan wire.Msg
inbound <-chan wire.Packet
// outbound channel is sent messages that are destined for the remote peer
// to which the channel is bound.
outbound chan<- wire.Msg
}

type Msg struct {
wire.Msg
From id.Signatory
wire.Packet
From id.Signatory
}

type Client struct {
Expand Down Expand Up @@ -77,7 +77,7 @@ func (client *Client) Bind(remote id.Signatory) {
return
}

inbound := make(chan wire.Msg, client.opts.InboundBufferSize)
inbound := make(chan wire.Packet, client.opts.InboundBufferSize)
outbound := make(chan wire.Msg, client.opts.OutboundBufferSize)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -92,11 +92,11 @@ func (client *Client) Bind(remote id.Signatory) {
select {
case <-ctx.Done():
return
case msg := <-inbound:
case packet := <-inbound:
select {
case <-ctx.Done():
return
case client.inbound <- Msg{Msg: msg, From: remote}:
case client.inbound <- Msg{Packet: packet, From: remote}:
}
}
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (client *Client) Send(ctx context.Context, remote id.Signatory, msg wire.Ms
}
}

func (client *Client) Receive(ctx context.Context, f func(id.Signatory, wire.Msg) error) {
func (client *Client) Receive(ctx context.Context, f func(id.Signatory, wire.Packet) error) {
client.receiversRunningMu.Lock()
if client.receiversRunning {
client.receiversRunningMu.Unlock()
Expand Down Expand Up @@ -201,7 +201,7 @@ func (client *Client) Receive(ctx context.Context, f func(id.Signatory, wire.Msg
// Do nothing. This will implicitly mark it for
// deletion.
default:
if err := receiver.f(msg.From, msg.Msg); err != nil {
if err := receiver.f(msg.From, msg.Packet); err != nil {
// When a channel is killed, its context will be
// cancelled, its underlying network connections
// will be dropped, and sending will fail. A killed
Expand Down
4 changes: 2 additions & 2 deletions channel/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ var _ = Describe("Client", func() {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
receiver := make(chan wire.Msg)
client.Receive(ctx, func(signatory id.Signatory, msg wire.Msg) error {
receiver <- msg
client.Receive(ctx, func(signatory id.Signatory, packet wire.Packet) error {
receiver <- packet.Msg
return nil
})
for iter := uint64(0); iter < n; iter++ {
Expand Down
29 changes: 0 additions & 29 deletions dht/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ type Table interface {
// PeerAddress returns the network address associated with the given peer.
PeerAddress(id.Signatory) (wire.Address, bool)

// AddPeer to the table with an associate network address.
AddIP(id.Signatory, string)
// DeleteIP from the table.
DeleteIP(id.Signatory)
// IP returns the network ip address associated with the given peer.
IP(id.Signatory) (string, bool)

// Peers returns the n closest peers to the local peer, using XORing as the
// measure of distance between two peers.
Peers(int) []id.Signatory
Expand Down Expand Up @@ -157,28 +150,6 @@ func (table *InMemTable) PeerAddress(peerID id.Signatory) (wire.Address, bool) {
return addr, ok
}

func (table *InMemTable) AddIP(peerID id.Signatory, ipAddress string) {
table.ipBySignatoryMu.Lock()
defer table.ipBySignatoryMu.Unlock()

table.ipBySignatory[peerID] = ipAddress
}

func (table *InMemTable) DeleteIP(peerID id.Signatory) {
table.ipBySignatoryMu.Lock()
defer table.ipBySignatoryMu.Unlock()

delete(table.ipBySignatory, peerID)
}

func (table *InMemTable) IP(peerID id.Signatory) (string, bool) {
table.ipBySignatoryMu.Lock()
defer table.ipBySignatoryMu.Unlock()

ip, ok := table.ipBySignatory[peerID]
return ip, ok
}

// Peers returns the n closest peer IDs.
func (table *InMemTable) Peers(n int) []id.Signatory {
table.sortedMu.Lock()
Expand Down
46 changes: 0 additions & 46 deletions dht/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,52 +294,6 @@ var _ = Describe("DHT", func() {
}, 10)
})

Describe("IP Addresses", func() {
Context("when adding an ip address", func() {
It("should be able to query it", func() {
table, _ := initDHT()

r := rand.New(rand.NewSource(time.Now().UnixNano()))
f := func(seed int64) bool {
privKey := id.NewPrivKey()
sig := privKey.Signatory()
ipAddr := fmt.Sprintf("%d.%d.%d.%d:%d",
r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(65536))
table.AddIP(sig, ipAddr)

signatory := id.NewSignatory((*id.PubKey)(&privKey.PublicKey))
newIPAddr, ok := table.IP(signatory)
Expect(ok).To(BeTrue())
Expect(newIPAddr).To(Equal(ipAddr))
return true
}
Expect(quick.Check(f, nil)).To(Succeed())
})
})

Context("when deleting an ip address", func() {
It("should not be able to query it", func() {
table, _ := initDHT()

r := rand.New(rand.NewSource(time.Now().UnixNano()))
f := func(seed int64) bool {
privKey := id.NewPrivKey()
ipAddr := fmt.Sprintf("%d.%d.%d.%d:%d",
r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(256), r.Intn(65536))

signatory := id.NewSignatory((*id.PubKey)(&privKey.PublicKey))
table.DeleteIP(signatory)
table.AddIP(signatory, ipAddr)
table.DeleteIP(signatory)

_, ok := table.IP(signatory)
return !ok
}
Expect(quick.Check(f, nil)).To(Succeed())
})
})
})

Describe("Subnets", func() {
Context("when adding a subnet", func() {
It("should be able to query it", func() {
Expand Down
4 changes: 2 additions & 2 deletions examples/fuzz/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func main() {
peers[i] = peer.New(
opts[i],
transports[i])
peers[i].Receive(context.Background(), func(from id.Signatory, msg wire.Msg) error {
fmt.Printf("%4v: received \"%v\" from %4v\n", opts[i].PrivKey.Signatory(), string(msg.Data), from)
peers[i].Receive(context.Background(), func(from id.Signatory, packet wire.Packet) error {
fmt.Printf("%4v: received \"%v\" from %4v\n", opts[i].PrivKey.Signatory(), string(packet.Msg.Data), from)
return nil
})
peers[i].Resolve(context.Background(), contentResolver)
Expand Down
10 changes: 5 additions & 5 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,23 @@ func (p *Peer) DiscoverPeers(ctx context.Context) {
}

func (p *Peer) Run(ctx context.Context) {
p.transport.Receive(ctx, func(from id.Signatory, msg wire.Msg) error {
p.transport.Receive(ctx, func(from id.Signatory, packet wire.Packet) error {
// TODO(ross): Think about merging the syncer and the gossiper.
if err := p.syncer.DidReceiveMessage(from, msg); err != nil {
if err := p.syncer.DidReceiveMessage(from, packet.Msg); err != nil {
return err
}
if err := p.gossiper.DidReceiveMessage(from, msg); err != nil {
if err := p.gossiper.DidReceiveMessage(from, packet.Msg); err != nil {
return err
}
if err := p.discoveryClient.DidReceiveMessage(from, msg); err != nil {
if err := p.discoveryClient.DidReceiveMessage(from, packet.IPAddr, packet.Msg); err != nil {
return err
}
return nil
})
p.transport.Run(ctx)
}

func (p *Peer) Receive(ctx context.Context, f func(id.Signatory, wire.Msg) error) {
func (p *Peer) Receive(ctx context.Context, f func(id.Signatory,wire.Packet) error) {
p.transport.Receive(ctx, f)
}

Expand Down
18 changes: 6 additions & 12 deletions peer/peerdiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/binary"
"fmt"
"net"
"time"

"github.com/renproject/aw/transport"
Expand Down Expand Up @@ -71,10 +72,10 @@ Outer:
}
}

func (dc *DiscoveryClient) DidReceiveMessage(from id.Signatory, msg wire.Msg) error {
func (dc *DiscoveryClient) DidReceiveMessage(from id.Signatory, ipAddr net.Addr, msg wire.Msg) error {
switch msg.Type {
case wire.MsgTypePing:
if err := dc.didReceivePing(from, msg); err != nil {
if err := dc.didReceivePing(from, ipAddr, msg); err != nil {
return err
}
case wire.MsgTypePingAck:
Expand All @@ -85,26 +86,19 @@ func (dc *DiscoveryClient) DidReceiveMessage(from id.Signatory, msg wire.Msg) er
return nil
}

func (dc *DiscoveryClient) didReceivePing(from id.Signatory, msg wire.Msg) error {
func (dc *DiscoveryClient) didReceivePing(from id.Signatory, ipAddr net.Addr, msg wire.Msg) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if dataLen := len(msg.Data); dataLen != 2 {
return fmt.Errorf("malformed port received in ping message. expected: 2 bytes, received: %v bytes", dataLen)
}
port := binary.LittleEndian.Uint16(msg.Data)
ipAddr, ipAddrOk := dc.transport.Table().IP(from)
if !ipAddrOk {
if _, ok := dc.transport.Table().PeerAddress(from); ok {
return nil
}
return fmt.Errorf("ip address for remote peer not found")
}

dc.transport.Table().AddPeer(
from,
wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("%v:%v", ipAddr, port), uint64(time.Now().UnixNano())),
wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("%v:%v", ipAddr.(*net.TCPAddr).IP.String(), port), uint64(time.Now().UnixNano())),
)
dc.transport.Table().DeleteIP(from)

peers := dc.transport.Table().Peers(dc.opts.MaxExpectedPeers)
addrAndSig := make([]wire.SignatoryAndAddress, 0, len(peers))
Expand Down
6 changes: 3 additions & 3 deletions peer/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,16 @@ var _ = Describe("Sync", func() {
go peers[0].Run(ctx)
go func(ctx context.Context) {
once := false
transports[1].Receive(ctx, func(from id.Signatory, msg wire.Msg) error {
transports[1].Receive(ctx, func(from id.Signatory, packet wire.Packet) error {
if !once {
once = true
return nil
}

if err := peers[1].Syncer().DidReceiveMessage(from, msg); err != nil {
if err := peers[1].Syncer().DidReceiveMessage(from, packet.Msg); err != nil {
return err
}
if err := peers[1].Gossiper().DidReceiveMessage(from, msg); err != nil {
if err := peers[1].Gossiper().DidReceiveMessage(from, packet.Msg); err != nil {
return err
}
return nil
Expand Down
5 changes: 1 addition & 4 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -171,7 +170,7 @@ func (t *Transport) Send(ctx context.Context, remote id.Signatory, msg wire.Msg)
return t.client.Send(ctx, remote, msg)
}

func (t *Transport) Receive(ctx context.Context, receiver func(id.Signatory, wire.Msg) error) {
func (t *Transport) Receive(ctx context.Context, receiver func(id.Signatory, wire.Packet) error) {
t.client.Receive(ctx, receiver)
}

Expand Down Expand Up @@ -244,8 +243,6 @@ func (t *Transport) run(ctx context.Context) {
enc = codec.LengthPrefixEncoder(codec.PlainEncoder, enc)
dec = codec.LengthPrefixDecoder(codec.PlainDecoder, dec)

t.table.AddIP(remote, addr[:strings.IndexByte(addr, ':')])

t.connect(remote)
defer t.disconnect(remote)

Expand Down
Loading

0 comments on commit f5b2557

Please sign in to comment.