Skip to content

Commit

Permalink
Merge branch 'master' into feature/compression
Browse files Browse the repository at this point in the history
  • Loading branch information
traysh committed Apr 19, 2018
2 parents 96cdf00 + 7fbacbd commit cd79bc7
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 8 deletions.
2 changes: 1 addition & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func listen() {
app.running = true
}

// SetDictionary set routes map
// SetDictionary sets routes map
func SetDictionary(dict map[string]uint16) error {
if app.running {
return constants.ErrChangeDictionaryWhileRunning
Expand Down
1 change: 0 additions & 1 deletion component.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,4 @@ func shutdownComponents() {
for i := length - 1; i >= 0; i-- {
remoteComp[i].comp.Shutdown()
}

}
12 changes: 7 additions & 5 deletions service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ func NewHandlerService(

// Dispatch message to corresponding logic handler
func (h *HandlerService) Dispatch(thread int) {
// close chLocalProcess & chCloseSession when application quit
// close chLocalProcess & chCloseSession when application quits
// TODO: This timer is being stopped multiple times, it probably doesn't need to be stopped here
defer timer.GlobalTicker.Stop()

// handle packet that sent to chLocalProcess
for {
// Calls to remote servers block calls to local server
select {
case lm := <-h.chLocalProcess:
h.localProcess(lm.agent, lm.route, lm.msg)

case rm := <-h.chRemoteProcess:
h.remoteService.remoteProcess(nil, rm.agent, rm.route, rm.msg)

Expand Down Expand Up @@ -157,7 +159,7 @@ func (h *HandlerService) Handle(conn net.Conn) {

logger.Log.Debugf("New session established: %s", a.String())

// guarantee agent related resource be destroyed
// guarantee agent related resource is destroyed
defer func() {
a.Session.Close()
logger.Log.Debugf("Session read goroutine exit, SessionID=%d, UID=%d", a.Session.ID(), a.Session.UID())
Expand All @@ -172,7 +174,7 @@ func (h *HandlerService) Handle(conn net.Conn) {
return
}

// (warning): decoder use slice for performance, packet data should be copy before next Decode
// (warning): decoder uses slice for performance, packet data should be copied before next Decode
packets, err := h.decoder.Decode(buf[:n])
if err != nil {
logger.Log.Error(err.Error())
Expand Down Expand Up @@ -209,7 +211,7 @@ func (h *HandlerService) processPacket(a *agent.Agent, p *packet.Packet) error {

case packet.Data:
if a.GetStatus() < constants.StatusWorking {
return fmt.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s",
return fmt.Errorf("receive data on socket which is not yet ACK, session will be closed immediately, remote=%s",
a.RemoteAddr().String())
}

Expand Down
2 changes: 1 addition & 1 deletion service/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func processHandlerMessage(
// This is a special case and should only happen with nats rpc client
// because we used nats request we have to answer to it or else a timeout
// will happen in the caller server and will be returned to the client
// the reason why we not just Publish is to keep track of failed rpc requests
// the reason why we don't just Publish is to keep track of failed rpc requests
// with timeouts, maybe we can improve this flow
resp = []byte("ack")
}
Expand Down

0 comments on commit cd79bc7

Please sign in to comment.