Skip to content

Commit

Permalink
fix for data race #238
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Jun 1, 2019
1 parent 6c639a7 commit c8a418b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 14 deletions.
29 changes: 24 additions & 5 deletions server/hub.go
Expand Up @@ -156,10 +156,28 @@ func (h *Hub) run() {
// 2. Check access rights and reject, if appropriate
// 3. Attach session to the topic

t := h.topicGet(sreg.topic) // is the topic already loaded?
// Is the topic already loaded?
t := h.topicGet(sreg.topic)
if t == nil {
// Topic does not exist or not loaded
go topicInit(sreg, h)
// Topic does not exist or not loaded.
t = &Topic{name: sreg.topic,
xoriginal: sreg.pkt.topic,
sessions: make(map[*Session]types.UidSlice),
broadcast: make(chan *ServerComMessage, 256),
reg: make(chan *sessionJoin, 32),
unreg: make(chan *sessionLeave, 32),
meta: make(chan *metaReq, 32),
perUser: make(map[types.Uid]perUserData),
exit: make(chan *shutDown, 1),
}
// Topic is created in suspended state because it's not yet configured.
t.suspend()
// Save topic now to prevent race condition.
h.topicPut(sreg.topic, t)

// Create the topic.
go topicInit(t, sreg, h)

} else {
// Topic found.
// Topic will check access rights and send appropriate {ctrl}
Expand Down Expand Up @@ -309,6 +327,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea
sess.queueOut(NoErr(msg.id, msg.topic, now))

h.topicDel(topic)
t.markDeleted()
t.exit <- &shutDown{reason: StopDeleted}
statsInc("LiveTopics", -1)
} else {
Expand Down Expand Up @@ -401,7 +420,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea
// Case 2: just unregister.
// If t is nil, it's not registered, no action is needed
if t := h.topicGet(topic); t != nil {
t.suspend()
t.markDeleted()
h.topicDel(topic)

t.exit <- &shutDown{reason: reason}
Expand Down Expand Up @@ -434,7 +453,7 @@ func (h *Hub) stopTopicsForUser(uid types.Uid, reason int, alldone chan<- bool)
if _, isMember := topic.perUser[uid]; (topic.cat != types.TopicCatGrp && isMember) ||
topic.owner == uid {

topic.suspend()
topic.markDeleted()

h.topics.Delete(name)

Expand Down
2 changes: 1 addition & 1 deletion server/pres.go
Expand Up @@ -90,7 +90,7 @@ func (t *Topic) loadContacts(uid types.Uid) error {
// "+dis" disable subscription withot removing it, the opposite of "en".
// The "+en/rem/dis" command itself is stripped from the notification.
func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string {
if t.isSuspended() {
if !t.isReady() {
return ""
}

Expand Down
6 changes: 5 additions & 1 deletion server/store/types/types.go
Expand Up @@ -38,7 +38,11 @@ const (
ErrPolicy = StoreError("policy")
// ErrCredentials means credentials like email or captcha must be validated
ErrCredentials = StoreError("credentials")
// ErrNotFound means the objevy was not found
// ErrUserNotFound means the user was not found
ErrUserNotFound = StoreError("user not found")
// ErrNotFound means the topic was not found
ErrTopicNotFound = StoreError("topic not found")
// ErrNotFound means the object other then user or topic was not found
ErrNotFound = StoreError("not found")
// ErrPermissionDenied means the operation is not permitted
ErrPermissionDenied = StoreError("denied")
Expand Down
22 changes: 15 additions & 7 deletions server/topic.go
Expand Up @@ -176,7 +176,7 @@ func (t *Topic) run(hub *Hub) {
case sreg := <-t.reg:
// Request to add a connection to this topic

if t.isSuspended() {
if !t.isReady() {
sreg.sess.queueOut(ErrLocked(sreg.pkt.id, t.original(sreg.sess.uid), types.TimeNow()))
} else {
// The topic is alive, so stop the kill timer, if it's ticking. We don't want the topic to die
Expand All @@ -203,7 +203,7 @@ func (t *Topic) run(hub *Hub) {
// userId.IsZero() == true when the entire session is being dropped.
asUid := leave.userId

if t.isSuspended() {
if !t.isReady() {
if !asUid.IsZero() && leave.reqID != "" {
leave.sess.queueOut(ErrLocked(leave.reqID, t.original(asUid), now))
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func (t *Topic) run(hub *Hub) {
var pushRcpt *push.Receipt
asUid := types.ParseUserId(msg.from)
if msg.Data != nil {
if t.isSuspended() {
if !t.isReady() {
msg.sess.queueOut(ErrLocked(msg.id, t.original(asUid), msg.timestamp))
continue
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (t *Topic) run(hub *Hub) {
pluginMessage(msg.Data, plgActCreate)

} else if msg.Pres != nil {
if t.isSuspended() {
if !t.isReady() {
// Ignore presence update - topic is being deleted
continue
}
Expand All @@ -336,7 +336,7 @@ func (t *Topic) run(hub *Hub) {
// "what" may have changed, i.e. unset or "+command" removed ("on+en" -> "on")
msg.Pres.What = what
} else if msg.Info != nil {
if t.isSuspended() {
if !t.isReady() {
// Ignore info messages - topic is being deleted
continue
}
Expand Down Expand Up @@ -2351,12 +2351,20 @@ func (t *Topic) suspend() {
atomic.StoreInt32((*int32)(&t.suspended), 1)
}

func (t *Topic) markDeleted() {
atomic.StoreInt32((*int32)(&t.suspended), 2)
}

func (t *Topic) resume() {
atomic.StoreInt32((*int32)(&t.suspended), 0)
}

func (t *Topic) isSuspended() bool {
return atomic.LoadInt32((*int32)(&t.suspended)) != 0
func (t *Topic) isReady() bool {
return atomic.LoadInt32((*int32)(&t.suspended)) == 0
}

func (t *Topic) isDeleted() bool {
return atomic.LoadInt32((*int32)(&t.suspended)) == 2
}

// Get topic name suitable for the given client
Expand Down
4 changes: 4 additions & 0 deletions server/utils.go
Expand Up @@ -263,6 +263,10 @@ func decodeStoreError(err error, id, topic string, timestamp time.Time,
errmsg = ErrPolicy(id, topic, timestamp)
case types.ErrCredentials:
errmsg = InfoValidateCredentials(id, timestamp)
case types.ErrUserNotFound:
errmsg = ErrUserNotFound(id, topic, timestamp)
case types.ErrTopicNotFound:
errmsg = ErrTopicNotFound(id, topic, timestamp)
case types.ErrNotFound:
errmsg = ErrNotFound(id, topic, timestamp)
default:
Expand Down

0 comments on commit c8a418b

Please sign in to comment.