Skip to content

Commit

Permalink
add from filed in the event
Browse files Browse the repository at this point in the history
  • Loading branch information
tok-kkk committed Nov 20, 2019
1 parent c2dfdab commit 4b7bb13
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 26 deletions.
12 changes: 8 additions & 4 deletions broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Broadcaster interface {
Broadcast(ctx context.Context, groupID protocol.PeerGroupID, body protocol.MessageBody) error

// AcceptBroadcast message from another peer in the network.
AcceptBroadcast(ctx context.Context, message protocol.Message) error
AcceptBroadcast(ctx context.Context, from protocol.PeerID, message protocol.Message) error
}

type broadcaster struct {
Expand Down Expand Up @@ -54,7 +54,6 @@ func NewBroadcaster(logger logrus.FieldLogger, messages protocol.MessageSender,
// Broadcast a message to multiple remote servers in an attempt to saturate the
// network.
func (broadcaster *broadcaster) Broadcast(ctx context.Context, groupID protocol.PeerGroupID, body protocol.MessageBody) error {

// Ignore message if it already been sent.
message := protocol.NewMessage(protocol.V1, protocol.Broadcast, groupID, body)
ok, err := broadcaster.messageHashAlreadySeen(message.Hash())
Expand All @@ -78,6 +77,10 @@ func (broadcaster *broadcaster) Broadcast(ctx context.Context, groupID protocol.
default:
}

if err := broadcaster.store.Insert(message.Hash().String(), true); err != nil {
return err
}

phi.ParForAll(addrs, func(i int) {
to := addrs[i]
if to == nil {
Expand All @@ -96,12 +99,12 @@ func (broadcaster *broadcaster) Broadcast(ctx context.Context, groupID protocol.
}
})

return broadcaster.store.Insert(message.Hash().String(), true)
return nil
}

// AcceptBroadcast from a remote client and propagate it to all peers in the
// network.
func (broadcaster *broadcaster) AcceptBroadcast(ctx context.Context, message protocol.Message) error {
func (broadcaster *broadcaster) AcceptBroadcast(ctx context.Context, from protocol.PeerID, message protocol.Message) error {
// Pre-condition checks
if message.Version != protocol.V1 {
return protocol.NewErrMessageVersionIsNotSupported(message.Version)
Expand All @@ -124,6 +127,7 @@ func (broadcaster *broadcaster) AcceptBroadcast(ctx context.Context, message pro
event := protocol.EventMessageReceived{
Time: time.Now(),
Message: message.Body,
From: from,
}

// Check if context is already expired
Expand Down
12 changes: 6 additions & 6 deletions broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ var _ = Describe("Broadcaster", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
message := protocol.NewMessage(protocol.V1, protocol.Broadcast, groupID, messageBody)
Expect(broadcaster.AcceptBroadcast(ctx, message)).ToNot(HaveOccurred())
Expect(broadcaster.AcceptBroadcast(ctx, RandomPeerID(), message)).ToNot(HaveOccurred())

var event protocol.EventMessageReceived
Eventually(events).Should(Receive(&event))
Expand Down Expand Up @@ -184,7 +184,7 @@ var _ = Describe("Broadcaster", func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
message := protocol.NewMessage(protocol.V1, protocol.Broadcast, RandomPeerGroupID(), messageBody)
Expect(broadcaster.AcceptBroadcast(ctx, message)).To(HaveOccurred())
Expect(broadcaster.AcceptBroadcast(ctx, RandomPeerID(), message)).To(HaveOccurred())

return true
}
Expand All @@ -205,7 +205,7 @@ var _ = Describe("Broadcaster", func() {
defer cancel()
message := protocol.NewMessage(protocol.V1, protocol.Broadcast, RandomPeerGroupID(), messageBody)
message.Version = InvalidMessageVersion()
Expect(broadcaster.AcceptBroadcast(ctx, message)).To(HaveOccurred())
Expect(broadcaster.AcceptBroadcast(ctx, RandomPeerID(), message)).To(HaveOccurred())

return true
}
Expand All @@ -226,7 +226,7 @@ var _ = Describe("Broadcaster", func() {
defer cancel()
message := protocol.NewMessage(protocol.V1, protocol.Broadcast, RandomPeerGroupID(), messageBody)
message.Variant = InvalidMessageVariant(protocol.Broadcast)
Expect(broadcaster.AcceptBroadcast(ctx, message)).To(HaveOccurred())
Expect(broadcaster.AcceptBroadcast(ctx, RandomPeerID(), message)).To(HaveOccurred())

return true
}
Expand All @@ -253,7 +253,7 @@ var _ = Describe("Broadcaster", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
message := protocol.NewMessage(protocol.V1, protocol.Broadcast, groupID, messageBody)
Expect(broadcaster.AcceptBroadcast(ctx, message)).ToNot(HaveOccurred())
Expect(broadcaster.AcceptBroadcast(ctx, RandomPeerID(), message)).ToNot(HaveOccurred())

var event protocol.EventMessageReceived
Eventually(events).Should(Receive(&event))
Expand All @@ -268,7 +268,7 @@ var _ = Describe("Broadcaster", func() {
Expect(addrs).Should(ContainElement(message.To))
}

Expect(broadcaster.AcceptBroadcast(ctx, message)).ToNot(HaveOccurred())
Expect(broadcaster.AcceptBroadcast(ctx, RandomPeerID(), message)).ToNot(HaveOccurred())
Eventually(events).ShouldNot(Receive())
return true
}
Expand Down
5 changes: 3 additions & 2 deletions cast/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type Caster interface {
Cast(ctx context.Context, to protocol.PeerID, body protocol.MessageBody) error
AcceptCast(ctx context.Context, message protocol.Message) error
AcceptCast(ctx context.Context, from protocol.PeerID, message protocol.Message) error
}

type caster struct {
Expand Down Expand Up @@ -57,7 +57,7 @@ func (caster *caster) Cast(ctx context.Context, to protocol.PeerID, body protoco
}
}

func (caster *caster) AcceptCast(ctx context.Context, message protocol.Message) error {
func (caster *caster) AcceptCast(ctx context.Context, from protocol.PeerID, message protocol.Message) error {
// TODO: Update to allow message forwarding.
// Pre-condition checks
if message.Version != protocol.V1 {
Expand All @@ -70,6 +70,7 @@ func (caster *caster) AcceptCast(ctx context.Context, message protocol.Message)
event := protocol.EventMessageReceived{
Time: time.Now(),
Message: message.Body,
From: from,
}

// Check if context is already expired
Expand Down
8 changes: 4 additions & 4 deletions cast/cast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var _ = Describe("Caster", func() {
defer cancel()

message := protocol.NewMessage(protocol.V1, protocol.Cast, protocol.NilPeerGroupID, messageBody)
Expect(caster.AcceptCast(ctx, message)).NotTo(HaveOccurred())
Expect(caster.AcceptCast(ctx, RandomPeerID(), message)).NotTo(HaveOccurred())

var event protocol.EventMessageReceived
Eventually(events).Should(Receive(&event))
Expand All @@ -104,7 +104,7 @@ var _ = Describe("Caster", func() {
cancel()

message := protocol.NewMessage(protocol.V1, protocol.Cast, protocol.NilPeerGroupID, messageBody)
Expect(caster.AcceptCast(ctx, message)).Should(HaveOccurred())
Expect(caster.AcceptCast(ctx, RandomPeerID(), message)).Should(HaveOccurred())

return true
}
Expand All @@ -126,7 +126,7 @@ var _ = Describe("Caster", func() {

message := protocol.NewMessage(protocol.V1, protocol.Cast, protocol.NilPeerGroupID, messageBody)
message.Version = InvalidMessageVersion()
Expect(caster.AcceptCast(ctx, message)).Should(HaveOccurred())
Expect(caster.AcceptCast(ctx, RandomPeerID(), message)).Should(HaveOccurred())

return true
}
Expand All @@ -149,7 +149,7 @@ var _ = Describe("Caster", func() {
message := protocol.NewMessage(protocol.V1, protocol.Cast, protocol.NilPeerGroupID, messageBody)
message.Variant = InvalidMessageVariant(protocol.Cast)

Expect(caster.AcceptCast(ctx, message)).Should(HaveOccurred())
Expect(caster.AcceptCast(ctx, RandomPeerID(), message)).Should(HaveOccurred())

return true
}
Expand Down
3 changes: 3 additions & 0 deletions dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func New(me protocol.PeerAddress, codec protocol.PeerAddressCodec, store kv.Tabl

if count, err := dht.NumPeers(); count == 0 || err != nil {
for _, addr := range bootstrapAddrs {
if addr.Equal(me) {
continue
}
if err := dht.addPeerAddressWithoutLock(addr); err != nil {
return nil, fmt.Errorf("error adding bootstrap addresses: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions multicast/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

type Multicaster interface {
Multicast(ctx context.Context, groupID protocol.PeerGroupID, body protocol.MessageBody) error
AcceptMulticast(ctx context.Context, message protocol.Message) error
AcceptMulticast(ctx context.Context, from protocol.PeerID, message protocol.Message) error
}

type multicaster struct {
Expand Down Expand Up @@ -65,7 +65,7 @@ func (multicaster *multicaster) Multicast(ctx context.Context, groupID protocol.
return nil
}

func (multicaster *multicaster) AcceptMulticast(ctx context.Context, message protocol.Message) error {
func (multicaster *multicaster) AcceptMulticast(ctx context.Context, from protocol.PeerID, message protocol.Message) error {
// TODO: Multicasting will always emit an event for a received message, even
// if the message has been seen before. Should this be changed?
if message.Version != protocol.V1 {
Expand All @@ -78,6 +78,7 @@ func (multicaster *multicaster) AcceptMulticast(ctx context.Context, message pro
event := protocol.EventMessageReceived{
Time: time.Now(),
Message: message.Body,
From: from,
}

// Check if context is already expired
Expand Down
8 changes: 4 additions & 4 deletions multicast/multicast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ var _ = Describe("Multicaster", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
message := protocol.NewMessage(protocol.V1, protocol.Multicast, RandomPeerGroupID(), messageBody)
Expect(multicaster.AcceptMulticast(ctx, message)).ToNot(HaveOccurred())
Expect(multicaster.AcceptMulticast(ctx, RandomPeerID(), message)).ToNot(HaveOccurred())

var event protocol.EventMessageReceived
Eventually(events).Should(Receive(&event))
Expand All @@ -136,7 +136,7 @@ var _ = Describe("Multicaster", func() {
cancel()

message := protocol.NewMessage(protocol.V1, protocol.Multicast, RandomPeerGroupID(), protocol.MessageBody{})
Expect(multicaster.AcceptMulticast(ctx, message)).To(HaveOccurred())
Expect(multicaster.AcceptMulticast(ctx, RandomPeerID(), message)).To(HaveOccurred())
})
})

Expand All @@ -152,7 +152,7 @@ var _ = Describe("Multicaster", func() {

message := protocol.NewMessage(protocol.V1, protocol.Multicast, RandomPeerGroupID(), protocol.MessageBody{})
message.Version = InvalidMessageVersion()
Expect(multicaster.AcceptMulticast(ctx, message)).To(HaveOccurred())
Expect(multicaster.AcceptMulticast(ctx, RandomPeerID(), message)).To(HaveOccurred())
})
})

Expand All @@ -168,7 +168,7 @@ var _ = Describe("Multicaster", func() {

message := protocol.NewMessage(protocol.V1, protocol.Multicast, RandomPeerGroupID(), protocol.MessageBody{})
message.Variant = InvalidMessageVariant(protocol.Multicast)
Expect(multicaster.AcceptMulticast(ctx, message)).To(HaveOccurred())
Expect(multicaster.AcceptMulticast(ctx, RandomPeerID(), message)).To(HaveOccurred())
})
})
})
Expand Down
2 changes: 1 addition & 1 deletion pingpong/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (pp *pingPonger) propagatePing(ctx context.Context, sender protocol.PeerID,
return newStorageErr(err)
}
if len(peerAddrs) == 0 {
return errors.New("dht has zero address.")
return errors.New("dht has zero address")
}

// Using the messaging sending channel protects the pinger/ponger from
Expand Down
2 changes: 2 additions & 0 deletions tcp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (pool *connPool) Send(to net.Addr, m protocol.Message) error {
pool.mu.Lock()
defer pool.mu.Unlock()

now := time.Now()
toStr := to.String()
c, ok := pool.conns[toStr]
if !ok {
Expand All @@ -98,6 +99,7 @@ func (pool *connPool) Send(to net.Addr, m protocol.Message) error {
go pool.closeConn(toStr)
}

defer pool.options.Logger.Debugf("sending a %v message to %v takes %v", m.Variant, to.String(), time.Now().Sub(now))
return c.session.WriteMessage(c.conn, m)
}

Expand Down
13 changes: 10 additions & 3 deletions tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (client *Client) Run(ctx context.Context, messages protocol.MessageReceiver
case <-ctx.Done():
return
case messageOtw := <-messages:
client.handleMessageOnTheWire(messageOtw)
go client.handleMessageOnTheWire(messageOtw)
}
}
}
Expand All @@ -45,10 +45,11 @@ func (client *Client) handleMessageOnTheWire(message protocol.MessageOnTheWire)

var DefaultServerOptions = ServerOptions{
Logger: logrus.StandardLogger(),
Timeout: 20 * time.Second,
Host: "127.0.0.1:19231",
Timeout: 20 * time.Second,
RateLimit: time.Minute,
TimeoutKeepAlive: 10 * time.Second,
MaxConnections: 256,
}

type ServerOptions struct {
Expand Down Expand Up @@ -107,6 +108,7 @@ func NewServer(options ServerOptions, handshaker handshake.Handshaker) *Server {
// for new connections, spawning each one into a background goroutine so that it
// can be handled concurrently.
func (server *Server) Run(ctx context.Context, messages protocol.MessageSender) {
server.options.Logger.Debugf("server start listening at %v", server.options.Host)
listener, err := net.Listen("tcp", server.options.Host)
if err != nil {
server.options.Logger.Fatalf("failed to listen on %s: %v", server.options.Host, err)
Expand All @@ -123,6 +125,7 @@ func (server *Server) Run(ctx context.Context, messages protocol.MessageSender)
}()

for {
server.options.Logger.Debugf("waiting for new connection...")
conn, err := listener.Accept()
if err != nil {
select {
Expand All @@ -136,6 +139,7 @@ func (server *Server) Run(ctx context.Context, messages protocol.MessageSender)
server.options.Logger.Errorf("error accepting connection: %v", err)
continue
}
server.options.Logger.Debugf("new connect from %v", conn.RemoteAddr().String())

// Spawn background goroutine to handle this connection so that it does
// not block other connections.
Expand All @@ -153,6 +157,7 @@ func (server *Server) handle(ctx context.Context, conn net.Conn, messages protoc
}

// Attempt to establish a session with the client.
now := time.Now()
session, err := server.establishSession(ctx, conn)
if err != nil {
server.options.Logger.Errorf("closing connection: error establishing session: %v", err)
Expand All @@ -162,6 +167,7 @@ func (server *Server) handle(ctx context.Context, conn net.Conn, messages protoc
server.options.Logger.Errorf("cannot establish session with %v", conn.RemoteAddr().String())
return
}
server.options.Logger.Debugf("new connection with %v takes %v", conn.RemoteAddr().String(), time.Now().Sub(now))

for {
messageOtw, err := session.ReadMessageOnTheWire(conn)
Expand All @@ -178,6 +184,7 @@ func (server *Server) handle(ctx context.Context, conn net.Conn, messages protoc
case <-ctx.Done():
return
case messages <- messageOtw:
server.options.Logger.Debugf("receive a %v message from %v", messageOtw.Message.Variant, messageOtw.From.String())
}
}
}
Expand All @@ -202,7 +209,7 @@ func (server *Server) allowRateLimit(conn net.Conn) bool {
}

if time.Now().Sub(lastConnAttempt) < server.options.RateLimit {
server.options.Logger.Warn("%s is rate limited", conn.RemoteAddr())
server.options.Logger.Debugf("%s is rate limited", conn.RemoteAddr())
return false
}
return true
Expand Down

0 comments on commit 4b7bb13

Please sign in to comment.