Skip to content

Commit

Permalink
make changes according to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tok-kkk committed Dec 6, 2019
1 parent 885dd89 commit 2d990c5
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 29 deletions.
2 changes: 1 addition & 1 deletion dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var _ = Describe("DHT", func() {
It("should load the address from store into cache for fast io", func() {
test := func() bool {
me, bootstrapAddress := RandomAddress(), RandomAddresses(rand.Intn(32)+1)
for ContainAddress(bootstrapAddress, me){
for ContainAddress(bootstrapAddress, me) {
me = RandomAddress()
}
store := NewTable("dht")
Expand Down
4 changes: 2 additions & 2 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func NewTCP(logger logrus.FieldLogger, codec protocol.PeerAddressCodec, options
panic(fmt.Errorf("pre-condition violation: fail to initialize dht, err = %v", err))
}
handshaker := handshake.New(signVerifier, handshake.NewGCMSessionManager())
connPool := tcp.NewConnPool(logger, poolOptions, handshaker)
connPool := tcp.NewConnPool(poolOptions, logger, handshaker)
client := tcp.NewClient(logger, connPool)
server := tcp.NewServer(logger, serverOptions, handshaker)
server := tcp.NewServer(serverOptions, logger, handshaker)
return New(logger, codec, options, dht, handshaker, client, server, events)
}

Expand Down
2 changes: 1 addition & 1 deletion tcp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type conn struct {

// NewConnPool returns a ConnPool with no existing connections. It is safe for
// concurrent use.
func NewConnPool(logger logrus.FieldLogger, options ConnPoolOptions, handshaker handshake.Handshaker) ConnPool {
func NewConnPool(options ConnPoolOptions, logger logrus.FieldLogger, handshaker handshake.Handshaker) ConnPool {
if logger == nil {
logger = logrus.New()
}
Expand Down
12 changes: 6 additions & 6 deletions tcp/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ var _ = Describe("Connection pool", func() {
Context("when initializing a ConnPool", func() {
It("should set the options to default if not provided", func() {
handshaker := handshake.New(NewMockSignVerifier(), handshake.NewGCMSessionManager())
_ = NewConnPool(logrus.New(), ConnPoolOptions{}, handshaker)
_ = NewConnPool(ConnPoolOptions{}, logrus.New(), handshaker)
})

It("should panic if providing a nil Hanshaker", func() {
Expect(func() {
_ = NewConnPool(logrus.New(), ConnPoolOptions{}, nil)
_ = NewConnPool(ConnPoolOptions{}, logrus.New(), nil)
}).Should(Panic())
})
})
Expand All @@ -46,7 +46,7 @@ var _ = Describe("Connection pool", func() {
// Initialize a connPool
clientSignVerifier := NewMockSignVerifier()
handshaker := handshake.New(clientSignVerifier, handshake.NewGCMSessionManager())
pool := NewConnPool(logrus.New(), ConnPoolOptions{}, handshaker)
pool := NewConnPool(ConnPoolOptions{}, logrus.New(), handshaker)

// Initialize a server
serverAddr, err := net.ResolveTCPAddr("tcp", ":8080")
Expand Down Expand Up @@ -81,7 +81,7 @@ var _ = Describe("Connection pool", func() {
// Initialize a connPool
clientSignVerifier := NewMockSignVerifier()
handshaker := handshake.New(clientSignVerifier, handshake.NewGCMSessionManager())
pool := NewConnPool(logrus.New(), ConnPoolOptions{MaxConnections: 1}, handshaker)
pool := NewConnPool(ConnPoolOptions{MaxConnections: 1}, logrus.New(), handshaker)

// Initialize two servers
serverAddr1, err := net.ResolveTCPAddr("tcp", ":8080")
Expand Down Expand Up @@ -115,7 +115,7 @@ var _ = Describe("Connection pool", func() {
// Initialize a connPool
clientSignVerifier := NewMockSignVerifier()
handshaker := handshake.New(clientSignVerifier, handshake.NewGCMSessionManager())
pool := NewConnPool(logrus.New(), ConnPoolOptions{TimeToLive: 100 * time.Millisecond}, handshaker)
pool := NewConnPool(ConnPoolOptions{TimeToLive: 100 * time.Millisecond}, logrus.New(), handshaker)

// Initialize a server
serverAddr, err := net.ResolveTCPAddr("tcp", ":8080")
Expand Down Expand Up @@ -160,7 +160,7 @@ var _ = Describe("Connection pool", func() {
// Initialize a connPool
clientSignVerifier := NewMockSignVerifier()
handshaker := handshake.New(clientSignVerifier, handshake.NewGCMSessionManager())
pool := NewConnPool(logrus.New(), ConnPoolOptions{Timeout: 200 * time.Millisecond}, handshaker)
pool := NewConnPool(ConnPoolOptions{Timeout: 200 * time.Millisecond}, logrus.New(), handshaker)

// Initialize a server
serverAddr, err := net.ResolveTCPAddr("tcp", ":8080")
Expand Down
28 changes: 14 additions & 14 deletions tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,16 @@ func (options *ServerOptions) setZerosToDefaults() {
}

type Server struct {
logger logrus.FieldLogger
options ServerOptions
handshaker handshake.Handshaker
logger logrus.FieldLogger
options ServerOptions
handshaker handshake.Handshaker
connections int64

lastConnAttemptsMu *sync.RWMutex
lastConnAttempts map[string]time.Time
}

func NewServer(logger logrus.FieldLogger, options ServerOptions, handshaker handshake.Handshaker) *Server {
func NewServer(options ServerOptions, logger logrus.FieldLogger, handshaker handshake.Handshaker) *Server {
if logger == nil {
logger = logrus.New()
}
Expand All @@ -88,9 +89,10 @@ func NewServer(logger logrus.FieldLogger, options ServerOptions, handshaker hand
panic("handshaker cannot be nil")
}
return &Server{
logger: logger,
options: options,
handshaker: handshaker,
logger: logger,
options: options,
handshaker: handshaker,
connections: 0,

lastConnAttemptsMu: new(sync.RWMutex),
lastConnAttempts: map[string]time.Time{},
Expand All @@ -117,7 +119,6 @@ func (server *Server) Run(ctx context.Context, messages protocol.MessageSender)
}
}()

var connections int64
for {
conn, err := listener.Accept()
if err != nil {
Expand All @@ -132,22 +133,22 @@ func (server *Server) Run(ctx context.Context, messages protocol.MessageSender)
server.logger.Errorf("error accepting connection: %v", err)
continue
}
if atomic.LoadInt64(&connections) >= int64(server.options.MaxConnections) {
if atomic.LoadInt64(&server.connections) >= int64(server.options.MaxConnections) {
server.logger.Info("tcp server reaches max number of connections")
conn.Close()
continue
}
atomic.AddInt64(&connections, 1)
atomic.AddInt64(&server.connections, 1)

// Spawn background goroutine to handle this connection so that it does
// not block other connections.

go server.handle(ctx, conn, messages, &connections)
go server.handle(ctx, conn, messages)
}
}

func (server *Server) handle(ctx context.Context, conn net.Conn, messages protocol.MessageSender, connections *int64) {
defer atomic.AddInt64(connections, -1)
func (server *Server) handle(ctx context.Context, conn net.Conn, messages protocol.MessageSender) {
defer atomic.AddInt64(&server.connections, -1)
defer conn.Close()

// Reject connections from IP addresses that have attempted to connect too recently.
Expand Down Expand Up @@ -183,7 +184,6 @@ func (server *Server) handle(ctx context.Context, conn net.Conn, messages protoc
case <-ctx.Done():
return
case messages <- messageOtw:
server.logger.Debugf("receive a %v message from %v", messageOtw.Message.Variant, messageOtw.From.String())
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var _ = Describe("TCP client and server", func() {
Context("when initializing a server", func() {
It("should panic if providing a nil handshaker", func() {
Expect(func() {
_ = NewServer(logrus.New(), ServerOptions{}, nil)
_ = NewServer(ServerOptions{}, logrus.New(), nil)
}).Should(Panic())
})
})
Expand Down
8 changes: 4 additions & 4 deletions testutil/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewConnection(port string) (io.ReadWriter, io.ReadWriter, func() error, err
func NewTCPClient(ctx context.Context, options tcp.ConnPoolOptions, verifier protocol.SignVerifier) protocol.MessageSender {
messages := make(chan protocol.MessageOnTheWire, 128)
handshaker := handshake.New(verifier, handshake.NewGCMSessionManager())
client := tcp.NewClient(logrus.StandardLogger(), tcp.NewConnPool(logrus.New(), options, handshaker))
client := tcp.NewClient(logrus.New(), tcp.NewConnPool(options, logrus.New(), handshaker))

go client.Run(ctx, messages)
return messages
Expand All @@ -71,7 +71,7 @@ func NewTCPClient(ctx context.Context, options tcp.ConnPoolOptions, verifier pro
func NewMaliciousTCPClient(ctx context.Context, options tcp.ConnPoolOptions, verifier protocol.SignVerifier) protocol.MessageSender {
messages := make(chan protocol.MessageOnTheWire, 128)
handshaker := NewMalHanshaker(verifier, handshake.NewGCMSessionManager())
client := tcp.NewClient(logrus.StandardLogger(), tcp.NewConnPool(logrus.New(), options, handshaker))
client := tcp.NewClient(logrus.StandardLogger(), tcp.NewConnPool(options, logrus.New(), handshaker))

go client.Run(ctx, messages)
return messages
Expand All @@ -85,7 +85,7 @@ func NewTCPServer(ctx context.Context, options tcp.ServerOptions, clientSignVeri
}

handshaker := handshake.New(signVerifier, handshake.NewGCMSessionManager())
server := tcp.NewServer(logrus.New(), options, handshaker)
server := tcp.NewServer(options, logrus.New(), handshaker)
messageSender := make(chan protocol.MessageOnTheWire, 128)
go server.Run(ctx, messageSender)
time.Sleep(50 * time.Millisecond)
Expand All @@ -101,7 +101,7 @@ func NewMaliciousTCPServer(ctx context.Context, options tcp.ServerOptions, clien
}

handshaker := NewMalHanshaker(signVerifier, handshake.NewGCMSessionManager())
server := tcp.NewServer(logrus.New(), options, handshaker)
server := tcp.NewServer(options, logrus.New(), handshaker)
messageSender := make(chan protocol.MessageOnTheWire, 128)
go server.Run(ctx, messageSender)
time.Sleep(50 * time.Millisecond)
Expand Down

0 comments on commit 2d990c5

Please sign in to comment.