Skip to content

Commit

Permalink
other: 优化
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Apr 8, 2024
1 parent 409350f commit 64c1653
Showing 1 changed file with 21 additions and 19 deletions.
40 changes: 21 additions & 19 deletions server/internal/v2/reactor/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,7 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) {
go func(r *Reactor[M], q *queue.Queue[int, string, M]) {
defer r.wg.Done()
for m := range q.Read() {
m(r.process, func(m queue.MessageWrapper[int, string, M], last bool) {
if last {
r.locationRW.RLock()
mq, exist := r.location[m.Ident()]
r.locationRW.RUnlock()
if exist {
r.locationRW.Lock()
defer r.locationRW.Unlock()
mq, exist = r.location[m.Ident()]
if exist {
delete(r.location, m.Ident())
r.queueRW.RLock()
mq := r.queues[mq]
r.queueRW.RUnlock()
r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("ident", m.Ident()), log.Any("queue", mq.Id()))
}
}
}
})
m(r.process, r.processFinish)
}
}(r, q)
}
Expand All @@ -212,3 +194,23 @@ func (r *Reactor[M]) Close() {
atomic.StoreInt32(&r.state, statusClosed)
r.queueRW.Unlock()
}

func (r *Reactor[M]) processFinish(m queue.MessageWrapper[int, string, M], last bool) {
if last {
r.locationRW.RLock()
mq, exist := r.location[m.Ident()]
r.locationRW.RUnlock()
if exist {
r.locationRW.Lock()
defer r.locationRW.Unlock()
mq, exist = r.location[m.Ident()]
if exist {
delete(r.location, m.Ident())
r.queueRW.RLock()
mq := r.queues[mq]
r.queueRW.RUnlock()
r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("ident", m.Ident()), log.Any("queue", mq.Id()))
}
}
}
}

0 comments on commit 64c1653

Please sign in to comment.