Skip to content

Commit

Permalink
Merge pull request #38 from renproject/fn/tcp-options
Browse files Browse the repository at this point in the history
TCP opts in builder | content type in Hash
  • Loading branch information
jazg authored Aug 20, 2020
2 parents 50b815f + 51ef017 commit 4723e75
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 32 deletions.
35 changes: 31 additions & 4 deletions aw.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/renproject/aw/gossip"
"github.com/renproject/aw/handshake"
"github.com/renproject/aw/peer"
"github.com/renproject/aw/tcp"
"github.com/renproject/aw/transport"
"github.com/renproject/aw/wire"
"github.com/renproject/id"
Expand Down Expand Up @@ -51,6 +52,19 @@ func New() *Builder {
return builder
}

// WithLogger consumes a logger instance and updates the Airwave Builder to
// use this logger for all its components
func (builder *Builder) WithLogger(logger *zap.Logger) *Builder {
builder.opts = builder.opts.WithLogger(logger)
builder.handshaker = builder.handshaker.WithLogger(logger)
builder.trans = builder.trans.WithLogger(logger)
builder.trans.TCPClientOpts = builder.trans.TCPClientOpts.WithLogger(logger)
builder.trans.TCPServerOpts = builder.trans.TCPServerOpts.WithLogger(logger)
builder.peer = builder.peer.WithLogger(logger)
builder.gossiper = builder.gossiper.WithLogger(logger)
return builder
}

func (builder *Builder) WithPrivKey(privKey *id.PrivKey) *Builder {
builder.handshaker.PrivKey = privKey
builder.dht = dht.New(
Expand Down Expand Up @@ -90,6 +104,16 @@ func (builder *Builder) WithPort(port uint16) *Builder {
return builder
}

func (builder *Builder) WithTCPClientOptions(opts tcp.ClientOptions) *Builder {
builder.trans.TCPClientOpts = opts
return builder
}

func (builder *Builder) WithTCPServerOptions(opts tcp.ServerOptions) *Builder {
builder.trans.TCPServerOpts = opts
return builder
}

func (builder *Builder) Build() *Node {
handshaker := handshake.NewECDSA(builder.handshaker)
trans := transport.New(builder.trans, handshaker)
Expand Down Expand Up @@ -133,13 +157,11 @@ func (node *Node) Run(ctx context.Context) {
wg.Wait()
}

func (node *Node) Send(ctx context.Context, signatory id.Signatory, dataType uint8, data []byte) {
hash := sha256.Sum256(data)
func (node *Node) Send(ctx context.Context, signatory id.Signatory, hash id.Hash, dataType uint8, data []byte) {
node.gossiper.Gossip(id.Hash(signatory), hash, dataType)
}

func (node *Node) Broadcast(ctx context.Context, subnet id.Hash, dataType uint8, data []byte) {
hash := sha256.Sum256(data)
func (node *Node) Broadcast(ctx context.Context, subnet id.Hash, hash id.Hash, dataType uint8, data []byte) {
node.gossiper.Gossip(subnet, hash, dataType)
}

Expand Down Expand Up @@ -170,3 +192,8 @@ func (node *Node) Identity() id.Signatory {
func (node *Node) Addr() wire.Address {
return node.peer.Addr()
}

func Hash(dataType uint8, data []byte) id.Hash {
data = append(data, byte(dataType))
return sha256.Sum256(data)
}
67 changes: 56 additions & 11 deletions aw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package aw_test
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"math/rand"
"sync/atomic"
Expand All @@ -12,8 +11,11 @@ import (
"github.com/renproject/aw"
"github.com/renproject/aw/dht"
"github.com/renproject/aw/gossip"
"github.com/renproject/aw/tcp"
"github.com/renproject/aw/wire"
"github.com/renproject/id"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -38,21 +40,37 @@ var _ = Describe("Airwave", func() {
addr1 := wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("0.0.0.0:%v", port1), uint64(time.Now().UnixNano()))
privKey1 := id.NewPrivKey()
Expect(addr1.Sign(privKey1)).To(Succeed())

tcpClientOpts := tcp.DefaultClientOptions().
WithTimeToDial(1 * time.Second)
tcpServerOpts := tcp.DefaultServerOptions().
WithHost("0.0.0.0").
WithPreventDuplicateConns(false)

logger, _ := zap.Config{
Encoding: "json",
Level: zap.NewAtomicLevelAt(zapcore.ErrorLevel),
}.Build()

node1 := aw.New().
WithPrivKey(privKey1).
WithAddr(addr1).
WithHost("0.0.0.0").
WithTCPClientOptions(tcpClientOpts).
WithTCPServerOptions(tcpServerOpts).
WithPort(port1).
WithLogger(logger).
Build()

port2 := uint16(3000 + r.Int()%3000)
addr2 := wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("0.0.0.0:%v", port2), uint64(time.Now().UnixNano()))
privKey2 := id.NewPrivKey()
Expect(addr2.Sign(privKey2)).To(Succeed())

node2 := aw.New().
WithPrivKey(privKey2).
WithAddr(addr2).
WithHost("0.0.0.0").
WithTCPClientOptions(tcpClientOpts).
WithTCPServerOptions(tcpServerOpts).
WithPort(port2).
WithContentResolver(
dht.NewDoubleCacheContentResolver(dht.DefaultDoubleCacheContentResolverOptions(), dht.CallbackContentResolver{
Expand All @@ -77,6 +95,7 @@ var _ = Describe("Airwave", func() {
},
}),
).
WithLogger(logger).
Build()

node1.DHT().InsertAddr(node2.Addr())
Expand All @@ -89,12 +108,21 @@ var _ = Describe("Airwave", func() {
time.Sleep(100 * time.Millisecond)

subnet := node1.DHT().AddSubnet([]id.Signatory{node2.Identity()})
fmt.Printf("%v\n", subnet)
for i := uint64(0); i < willSendN; i++ {
node1.Broadcast(ctx, subnet, 0, []byte("once"))
node1.Broadcast(ctx, subnet, 0, []byte(fmt.Sprintf("message #%v", i)))
data1 := []byte("once")
data2 := []byte(fmt.Sprintf("message #%v", i))
hash1 := aw.Hash(0, data1)
hash2 := aw.Hash(0, data2)

node1.DHT().InsertContent(hash1, 0, data1)
node1.Broadcast(ctx, subnet, hash1, 0, data1)
node1.DHT().InsertContent(hash2, 0, data2)
node1.Broadcast(ctx, subnet, hash2, 0, data2)
}
node1.Broadcast(ctx, subnet, 0, []byte("done"))
data := []byte("done")
hash := aw.Hash(0, data)
node1.DHT().InsertContent(hash, 0, data)
node1.Broadcast(ctx, subnet, hash, 0, data)

<-ctx.Done()

Expand All @@ -117,17 +145,33 @@ var _ = Describe("Airwave", func() {
n := 3
nodes := make([]*aw.Node, n)
addrs := make([]wire.Address, n)

tcpClientOpts := tcp.DefaultClientOptions().
WithTimeToDial(1 * time.Second)
tcpServerOpts := tcp.DefaultServerOptions().
WithHost("0.0.0.0").
WithPreventDuplicateConns(false)

logger, _ := zap.Config{
Encoding: "json",
Level: zap.NewAtomicLevelAt(zapcore.ErrorLevel),
}.Build()

for i := range nodes {
port := uint16(3000 + i)
addrs[i] = wire.NewUnsignedAddress(wire.TCP, fmt.Sprintf("0.0.0.0:%v", port), uint64(time.Now().UnixNano()))
privKey := id.NewPrivKey()
Expect(addrs[i].Sign(privKey)).To(Succeed())

node := aw.New().
WithPrivKey(privKey).
WithAddr(addrs[i]).
WithHost("0.0.0.0").
WithTCPClientOptions(tcpClientOpts).
WithTCPServerOptions(tcpServerOpts).
WithPort(port).
WithLogger(logger).
Build()

nodes[i] = node
}

Expand All @@ -150,10 +194,11 @@ var _ = Describe("Airwave", func() {
// each other.
time.Sleep(100 * time.Millisecond)

contentHash := sha256.Sum256([]byte("hello!"))
contentType := uint8(1)
content := []byte("hello!")
nodes[0].Broadcast(ctx, gossip.DefaultSubnet, contentType, content)
contentType := uint8(1)
contentHash := aw.Hash(contentType, content)
nodes[0].DHT().InsertContent(contentHash, contentType, content)
nodes[0].Broadcast(ctx, gossip.DefaultSubnet, contentHash, contentType, content)

found := map[id.Signatory]struct{}{}
for {
Expand Down
8 changes: 8 additions & 0 deletions gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/renproject/aw/transport"
"github.com/renproject/aw/wire"
"github.com/renproject/id"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -321,6 +323,10 @@ func initNodes(ctx context.Context, n uint, alpha int) []node {
signatory := id.NewSignatory((*id.PubKey)(&privKey.PublicKey))
host := "0.0.0.0"
port := uint16(3000 + rand.Int()%3000)
logger, _ := zap.Config{
Encoding: "json",
Level: zap.NewAtomicLevelAt(zapcore.ErrorLevel),
}.Build()

dht := dht.New(
signatory,
Expand All @@ -333,6 +339,7 @@ func initNodes(ctx context.Context, n uint, alpha int) []node {
transport.DefaultOptions().
WithTCPServerOptions(
tcp.DefaultServerOptions().
WithLogger(logger).
WithHost(host).
WithPort(port).
WithPreventDuplicateConns(false),
Expand All @@ -344,6 +351,7 @@ func initNodes(ctx context.Context, n uint, alpha int) []node {
)
gossiper := gossip.New(
gossip.DefaultOptions().
WithLogger(logger).
WithAlpha(int(n)),
signatory,
dht,
Expand Down
2 changes: 1 addition & 1 deletion handshake/opt_test.go
Original file line number Diff line number Diff line change
@@ -1 +1 @@
package handshake_test
package handshake_test
2 changes: 1 addition & 1 deletion opt_test.go
Original file line number Diff line number Diff line change
@@ -1 +1 @@
package aw_test
package aw_test
5 changes: 5 additions & 0 deletions peer/opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func DefaultOptions() Options {
}
}

func (opts Options) WithLogger(logger *zap.Logger) Options {
opts.Logger = logger
return opts
}

func (opts Options) WithAddr(addr wire.Address) Options {
opts.Addr = addr
return opts
Expand Down
2 changes: 1 addition & 1 deletion tcp/server_test.go
Original file line number Diff line number Diff line change
@@ -1 +1 @@
package tcp_test
package tcp_test
4 changes: 2 additions & 2 deletions wire/msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

var _ = Describe("Message", func() {
Context("when marshaling and unmarshaling", func() {
It("shoudl equal itself", func() {
It("should equal itself", func() {
f := func() bool {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
msg := wireutil.NewMessageBuilder(r).Build()
Expand All @@ -27,7 +27,7 @@ var _ = Describe("Message", func() {
Expect(msg.Equal(&unmarshaledMsg)).To(BeTrue())
return true
}
Expect(quick.Check(f, nil)).To(Succeed())
Expect(quick.Check(f, &quick.Config{MaxCount: 10})).To(Succeed())
})
})
})
2 changes: 1 addition & 1 deletion wire/wire_test.go
Original file line number Diff line number Diff line change
@@ -1 +1 @@
package wire_test
package wire_test
8 changes: 4 additions & 4 deletions wire/wireutil/addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var _ = Describe("Address builder", func() {
Context("when building messages", func() {
Context("when settting the protocol", func() {
Context("when setting the protocol", func() {
It("should build a message with that protocol", func() {
f := func() bool {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -26,7 +26,7 @@ var _ = Describe("Address builder", func() {
})
})

Context("when settting the value", func() {
Context("when setting the value", func() {
It("should build a message with that value", func() {
f := func() bool {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -39,7 +39,7 @@ var _ = Describe("Address builder", func() {
})
})

Context("when settting the nonce", func() {
Context("when setting the nonce", func() {
It("should build a message with that nonce", func() {
f := func() bool {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -52,7 +52,7 @@ var _ = Describe("Address builder", func() {
})
})

Context("when settting the signature", func() {
Context("when setting the signature", func() {
It("should build a message with that signature", func() {
f := func() bool {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down
12 changes: 6 additions & 6 deletions wire/wireutil/msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var _ = Describe("Message builder", func() {
Context("when building messages", func() {
Context("when settting the version", func() {
Context("when setting the version", func() {
It("should build a message with that version", func() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
f := func() bool {
Expand All @@ -23,11 +23,11 @@ var _ = Describe("Message builder", func() {
Expect(msg.Version).To(Equal(version))
return true
}
Expect(quick.Check(f, nil)).To(Succeed())
Expect(quick.Check(f, &quick.Config{MaxCount: 10})).To(Succeed())
})
})

Context("when settting the type", func() {
Context("when setting the type", func() {
It("should build a message with that type", func() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
f := func() bool {
Expand All @@ -36,11 +36,11 @@ var _ = Describe("Message builder", func() {
Expect(msg.Type).To(Equal(ty))
return true
}
Expect(quick.Check(f, nil)).To(Succeed())
Expect(quick.Check(f, &quick.Config{MaxCount: 10})).To(Succeed())
})
})

Context("when settting the data", func() {
Context("when setting the data", func() {
It("should build a message with that data", func() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
f := func() bool {
Expand All @@ -49,7 +49,7 @@ var _ = Describe("Message builder", func() {
Expect(bytes.Equal(msg.Data, data)).To(BeTrue())
return true
}
Expect(quick.Check(f, nil)).To(Succeed())
Expect(quick.Check(f, &quick.Config{MaxCount: 10})).To(Succeed())
})
})
})
Expand Down
2 changes: 1 addition & 1 deletion wire/wireutil/wireutil.go
Original file line number Diff line number Diff line change
@@ -1 +1 @@
package wireutil
package wireutil

0 comments on commit 4723e75

Please sign in to comment.