Skip to content

Commit

Permalink
net: dispatcher.go. support enable & disable duplicate msg filter for…
Browse files Browse the repository at this point in the history
… subscribers.
  • Loading branch information
royshang committed Jan 30, 2018
1 parent dfed816 commit 5adbb32
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 20 deletions.
4 changes: 2 additions & 2 deletions cmd/network/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func run(mode, configPath string, packageSize, concurrentMessageCount, totalMess
}

// register dispatcher.
netService.Register(net.NewSubscriber(netService, messageCh, PingMessage))
netService.Register(net.NewSubscriber(netService, messageCh, PongMessage))
netService.Register(net.NewSubscriber(netService, messageCh, false, PingMessage))
netService.Register(net.NewSubscriber(netService, messageCh, false, PongMessage))

// start server.
netService.Start()
Expand Down
6 changes: 3 additions & 3 deletions core/block_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func NewBlockPool(size int) (*BlockPool, error) {

// RegisterInNetwork register message subscriber in network.
func (pool *BlockPool) RegisterInNetwork(ns net.Service) {
ns.Register(net.NewSubscriber(pool, pool.receiveBlockMessageCh, MessageTypeNewBlock))
ns.Register(net.NewSubscriber(pool, pool.receiveBlockMessageCh, MessageTypeDownloadedBlockReply))
ns.Register(net.NewSubscriber(pool, pool.receiveDownloadBlockMessageCh, MessageTypeDownloadedBlock))
ns.Register(net.NewSubscriber(pool, pool.receiveBlockMessageCh, true, MessageTypeNewBlock))
ns.Register(net.NewSubscriber(pool, pool.receiveBlockMessageCh, true, MessageTypeDownloadedBlockReply))
ns.Register(net.NewSubscriber(pool, pool.receiveDownloadBlockMessageCh, true, MessageTypeDownloadedBlock))
pool.ns = ns
}

Expand Down
2 changes: 1 addition & 1 deletion core/transaction_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (pool *TransactionPool) SetGasConfig(gasPrice, gasLimit *util.Uint128) {

// RegisterInNetwork register message subscriber in network.
func (pool *TransactionPool) RegisterInNetwork(ns net.Service) {
ns.Register(net.NewSubscriber(pool, pool.receivedMessageCh, MessageTypeNewTx))
ns.Register(net.NewSubscriber(pool, pool.receivedMessageCh, true, MessageTypeNewTx))
pool.ns = ns
}

Expand Down
14 changes: 10 additions & 4 deletions net/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Dispatcher struct {
quitCh chan bool
receivedMessageCh chan Message
dispatchedMessages *lru.Cache
filters map[string]bool
}

// NewDispatcher create Dispatcher instance.
Expand All @@ -48,6 +49,7 @@ func NewDispatcher() *Dispatcher {
subscribersMap: new(sync.Map),
quitCh: make(chan bool, 10),
receivedMessageCh: make(chan Message, 65536),
filters: make(map[string]bool),
}

dp.dispatchedMessages, _ = lru.New(10240)
Expand All @@ -61,6 +63,7 @@ func (dp *Dispatcher) Register(subscribers ...*Subscriber) {
for _, mt := range v.msgTypes {
m, _ := dp.subscribersMap.LoadOrStore(mt, new(sync.Map))
m.(*sync.Map).Store(v, true)
dp.filters[mt] = v.DoFilter()
}
}
}
Expand All @@ -76,6 +79,7 @@ func (dp *Dispatcher) Deregister(subscribers ...*Subscriber) {
}
m.(*sync.Map).Delete(v)
dp.subscribersMap.Delete(mt)
delete(dp.filters, mt)
}
}
}
Expand Down Expand Up @@ -134,10 +138,12 @@ func (dp *Dispatcher) Stop() {
func (dp *Dispatcher) PutMessage(msg Message) {
// it's a optimize strategy for message dispatch, according to https://github.com/nebulasio/go-nebulas/issues/50
hash := msg.Hash()
if exist, _ := dp.dispatchedMessages.ContainsOrAdd(hash, hash); exist == true {
// duplicated message, ignore.
metricsDuplicatedMessage(msg.MessageType())
return
if dp.filters[msg.MessageType()] {
if exist, _ := dp.dispatchedMessages.ContainsOrAdd(hash, hash); exist == true {
// duplicated message, ignore.
metricsDuplicatedMessage(msg.MessageType())
return
}
}

dp.receivedMessageCh <- msg
Expand Down
12 changes: 10 additions & 2 deletions net/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,14 @@ type Subscriber struct {

// msgType message types to subscribe
msgTypes []string

// doFilter dup message
doFilter bool
}

// NewSubscriber return new Subscriber instance.
func NewSubscriber(id interface{}, msgChan chan Message, msgTypes ...string) *Subscriber {
return &Subscriber{id, msgChan, msgTypes}
func NewSubscriber(id interface{}, msgChan chan Message, doFilter bool, msgTypes ...string) *Subscriber {
return &Subscriber{id, msgChan, msgTypes, doFilter}
}

// ID return id.
Expand All @@ -128,6 +131,11 @@ func (s *Subscriber) MessageChan() chan Message {
return s.msgChan
}

// DoFilter return doFilter
func (s *Subscriber) DoFilter() bool {
return s.doFilter
}

// BaseMessage base message
type BaseMessage struct {
t string
Expand Down
16 changes: 8 additions & 8 deletions sync/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func (ss *Service) Start() {

// register the network handler.
netService := ss.netService
netService.Register(net.NewSubscriber(ss, ss.messageCh, net.ChainSync))
netService.Register(net.NewSubscriber(ss, ss.messageCh, net.ChainChunks))
netService.Register(net.NewSubscriber(ss, ss.messageCh, net.ChainGetChunk))
netService.Register(net.NewSubscriber(ss, ss.messageCh, net.ChainChunkData))
netService.Register(net.NewSubscriber(ss, ss.messageCh, false, net.ChainSync))
netService.Register(net.NewSubscriber(ss, ss.messageCh, false, net.ChainChunks))
netService.Register(net.NewSubscriber(ss, ss.messageCh, false, net.ChainGetChunk))
netService.Register(net.NewSubscriber(ss, ss.messageCh, false, net.ChainChunkData))

// start loop().
go ss.startLoop()
Expand All @@ -82,10 +82,10 @@ func (ss *Service) Start() {
func (ss *Service) Stop() {
// deregister the network handler.
netService := ss.netService
netService.Deregister(net.NewSubscriber(ss, ss.messageCh, net.ChainSync))
netService.Deregister(net.NewSubscriber(ss, ss.messageCh, net.ChainChunks))
netService.Deregister(net.NewSubscriber(ss, ss.messageCh, net.ChainGetChunk))
netService.Deregister(net.NewSubscriber(ss, ss.messageCh, net.ChainChunkData))
netService.Deregister(net.NewSubscriber(ss, ss.messageCh, false, net.ChainSync))
netService.Deregister(net.NewSubscriber(ss, ss.messageCh, false, net.ChainChunks))
netService.Deregister(net.NewSubscriber(ss, ss.messageCh, false, net.ChainGetChunk))
netService.Deregister(net.NewSubscriber(ss, ss.messageCh, false, net.ChainChunkData))

ss.StopActiveSync()

Expand Down

0 comments on commit 5adbb32

Please sign in to comment.