Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release/0.6.0' into feat/receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-pure committed Nov 16, 2021
2 parents e60a8a7 + abd8b8e commit e8df373
Showing 1 changed file with 91 additions and 22 deletions.
113 changes: 91 additions & 22 deletions aw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ var _ = Describe("Peer", func() {
peer1 := peers[0]
peer2 := peers[1]

peer1.GossipNonBlocking(contentID, nil)
err = peer1.GossipNonBlocking(contentID, nil)
if err != nil {
panic(err)
}

Eventually(func() []byte { msg, _ := peer2.ContentResolver.QueryContent(contentID); return msg }).Should(Equal(content))
})
Expand All @@ -91,7 +94,10 @@ var _ = Describe("Peer", func() {
data := []byte("hello")
peer1.ContentResolver.InsertContent(contentID, data)

peer1.GossipNonBlocking(contentID, nil)
err = peer1.GossipNonBlocking(contentID, nil)
if err != nil {
panic(err)
}

Eventually(func() []byte { msg, _ := peer2.ContentResolver.QueryContent(contentID); return msg }).Should(Equal(data))
})
Expand Down Expand Up @@ -122,7 +128,10 @@ var _ = Describe("Peer", func() {
contentID := []byte("id")
data := []byte("hello")
peers[0].ContentResolver.InsertContent(contentID, data)
peers[0].GossipNonBlocking(contentID, nil)
err = peers[0].GossipNonBlocking(contentID, nil)
if err != nil {
panic(err)
}

for _, peer := range peers {
Eventually(func() []byte { msg, _ := peer.ContentResolver.QueryContent(contentID); return msg }).Should(Equal(data))
Expand Down Expand Up @@ -155,7 +164,10 @@ var _ = Describe("Peer", func() {
}
go peer1.Run(ctx)

peer1.GossipNonBlocking(contentID, nil)
err = peer1.GossipNonBlocking(contentID, nil)
if err != nil {
panic(err)
}

time.Sleep(50 * time.Millisecond)

Expand Down Expand Up @@ -203,7 +215,10 @@ var _ = Describe("Peer", func() {
contentID := []byte("id")
data := []byte("hello")
peers[0].ContentResolver.InsertContent(contentID, data)
peers[0].GossipNonBlocking(contentID, &subnet)
err = peers[0].GossipNonBlocking(contentID, &subnet)
if err != nil {
panic(err)
}

// Give more of an opportunity for the gossip to reach everyone.
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -386,8 +401,14 @@ var _ = Describe("Peer", func() {
go peer1.Run(ctx)
go peer2.Run(ctx)

peer1.Link(peer2.ID())
peer2.Link(peer1.ID())
err = peer1.Link(peer2.ID())
if err != nil {
panic(err)
}
err = peer2.Link(peer1.ID())
if err != nil {
panic(err)
}

msg := []byte("hello")
sendCtx, sendCancel := context.WithTimeout(ctx, time.Second)
Expand Down Expand Up @@ -440,24 +461,39 @@ var _ = Describe("Peer", func() {
go stablePeer.Run(ctx)
go crashPeer.Run(ctx)

stablePeer.Link(crashPeer.ID())
crashPeer.Link(stablePeer.ID())
err = stablePeer.Link(crashPeer.ID())
if err != nil {
panic(err)
}
err = crashPeer.Link(stablePeer.ID())
if err != nil {
panic(err)
}

numMessages := 20
for i := 0; i < numMessages; i++ {
contentID := []byte(fmt.Sprintf("%v", i))
content := []byte(fmt.Sprintf("message %v", i))
stablePeer.ContentResolver.InsertContent(contentID, content)
stablePeer.Gossip(ctx, contentID, nil)
err = stablePeer.Gossip(ctx, contentID, nil)
if err != nil {
panic(err)
}

if i == numMessages/2 {
// NOTE(ross): We don't check that the content was received
// because there is always a chance that it won't be; this can
// happen if the connection is killed at any point after the
// stable peer sends the gossip and before the sync is handled
// by the crash peer.
crashPeer.Unlink(stablePeer.ID())
crashPeer.Link(stablePeer.ID())
err = crashPeer.Unlink(stablePeer.ID())
if err != nil {
panic(err)
}
err = crashPeer.Link(stablePeer.ID())
if err != nil {
panic(err)
}

// NOTE(ross): Due to scheduling, it is possible that the next
// loop can begin and the stable peer can have started
Expand Down Expand Up @@ -540,14 +576,26 @@ var _ = Describe("Peer", func() {
go writePeer.Run(ctx)
go readPeer.Run(ctx)

writePeer.Link(readPeer.ID())
readPeer.Link(writePeer.ID())
err = writePeer.Link(readPeer.ID())
if err != nil {
panic(err)
}
err = readPeer.Link(writePeer.ID())
if err != nil {
panic(err)
}

data := make([]byte, int(opts.ConnectionRateLimiterOptions.Rate))

writePeer.Send(ctx, data, readPeer.ID())
err = writePeer.Send(ctx, data, readPeer.ID())
if err != nil {
panic(err)
}
Eventually(received).Should(Receive())
writePeer.Send(ctx, data, readPeer.ID())
err = writePeer.Send(ctx, data, readPeer.ID())
if err != nil {
panic(err)
}
Consistently(received).ShouldNot(Receive())
})
})
Expand Down Expand Up @@ -603,13 +651,19 @@ var _ = Describe("Peer", func() {
go lastPeer.Run(ctx)

for _, otherPeer := range otherPeers {
otherPeer.Link(peer.ID())
err = otherPeer.Link(peer.ID())
if err != nil {
panic(err)
}
}

// Give some time for the other peers to finish linking.
time.Sleep(50 * time.Millisecond)

lastPeer.Send(ctx, []byte{0}, peer.ID())
err = lastPeer.Send(ctx, []byte{0}, peer.ID())
if err != nil {
panic(err)
}

Consistently(received).ShouldNot(Receive())
})
Expand Down Expand Up @@ -721,6 +775,7 @@ var _ = Describe("Peer", func() {
Data: buf[:32],
}
bs, err := surge.ToBinary(msg)
Expect(err).ToNot(HaveOccurred())
nonceBuf = session.GetWriteNonceAndIncrement()
sealed = session.GCM.Seal(buf[4:4], nonceBuf[:], bs, nil)
binary.BigEndian.PutUint32(buf, uint32(len(sealed)))
Expand Down Expand Up @@ -749,6 +804,7 @@ var _ = Describe("Peer", func() {
Data: id[:],
}
bs, err = surge.ToBinary(msg)
Expect(err).ToNot(HaveOccurred())
nonceBuf = session.GetWriteNonceAndIncrement()
sealed = session.GCM.Seal(buf[4:4], nonceBuf[:], bs, nil)
binary.BigEndian.PutUint32(buf, uint32(len(sealed)))
Expand All @@ -771,6 +827,7 @@ var _ = Describe("Peer", func() {
SyncData: buf[:100],
}
bs, err = surge.ToBinary(msg)
Expect(err).ToNot(HaveOccurred())
nonceBuf = session.GetWriteNonceAndIncrement()
sealed = session.GCM.Seal(buf[4:4], nonceBuf[:], bs, nil)
binary.BigEndian.PutUint32(buf, uint32(len(sealed)))
Expand Down Expand Up @@ -865,8 +922,14 @@ func linkAllPeers(peers []*aw.Peer) {
for i := range peers {
for j := range peers {
if i != j {
peers[i].Link(peers[j].ID())
peers[j].Link(peers[i].ID())
err := peers[i].Link(peers[j].ID())
if err != nil {
panic(err)
}
err = peers[j].Link(peers[i].ID())
if err != nil {
panic(err)
}
}
}
}
Expand All @@ -877,8 +940,14 @@ func linkPeersRing(peers []*aw.Peer) {

for i := range peers {
next := (i + 1) % n
peers[i].Link(peers[next].ID())
peers[next].Link(peers[i].ID())
err := peers[i].Link(peers[next].ID())
if err != nil {
panic(err)
}
err = peers[next].Link(peers[i].ID())
if err != nil {
panic(err)
}
}
}

Expand Down

0 comments on commit e8df373

Please sign in to comment.