diff --git a/server/hub.go b/server/hub.go index 94ed0a0cb..b6abdf893 100644 --- a/server/hub.go +++ b/server/hub.go @@ -156,10 +156,28 @@ func (h *Hub) run() { // 2. Check access rights and reject, if appropriate // 3. Attach session to the topic - t := h.topicGet(sreg.topic) // is the topic already loaded? + // Is the topic already loaded? + t := h.topicGet(sreg.topic) if t == nil { - // Topic does not exist or not loaded - go topicInit(sreg, h) + // Topic does not exist or not loaded. + t = &Topic{name: sreg.topic, + xoriginal: sreg.pkt.topic, + sessions: make(map[*Session]types.UidSlice), + broadcast: make(chan *ServerComMessage, 256), + reg: make(chan *sessionJoin, 32), + unreg: make(chan *sessionLeave, 32), + meta: make(chan *metaReq, 32), + perUser: make(map[types.Uid]perUserData), + exit: make(chan *shutDown, 1), + } + // Topic is created in suspended state because it's not yet configured. + t.suspend() + // Save topic now to prevent race condition. + h.topicPut(sreg.topic, t) + + // Create the topic. + go topicInit(t, sreg, h) + } else { // Topic found. // Topic will check access rights and send appropriate {ctrl} @@ -309,6 +327,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea sess.queueOut(NoErr(msg.id, msg.topic, now)) h.topicDel(topic) + t.markDeleted() t.exit <- &shutDown{reason: StopDeleted} statsInc("LiveTopics", -1) } else { @@ -401,7 +420,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea // Case 2: just unregister. // If t is nil, it's not registered, no action is needed if t := h.topicGet(topic); t != nil { - t.suspend() + t.markDeleted() h.topicDel(topic) t.exit <- &shutDown{reason: reason} @@ -434,7 +453,7 @@ func (h *Hub) stopTopicsForUser(uid types.Uid, reason int, alldone chan<- bool) if _, isMember := topic.perUser[uid]; (topic.cat != types.TopicCatGrp && isMember) || topic.owner == uid { - topic.suspend() + topic.markDeleted() h.topics.Delete(name) diff --git a/server/pres.go b/server/pres.go index 78e771d17..bc227de1c 100644 --- a/server/pres.go +++ b/server/pres.go @@ -90,7 +90,7 @@ func (t *Topic) loadContacts(uid types.Uid) error { // "+dis" disable subscription withot removing it, the opposite of "en". // The "+en/rem/dis" command itself is stripped from the notification. func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string { - if t.isSuspended() { + if !t.isReady() { return "" } diff --git a/server/store/types/types.go b/server/store/types/types.go index a82c7bf13..5ada065d7 100644 --- a/server/store/types/types.go +++ b/server/store/types/types.go @@ -38,7 +38,11 @@ const ( ErrPolicy = StoreError("policy") // ErrCredentials means credentials like email or captcha must be validated ErrCredentials = StoreError("credentials") - // ErrNotFound means the objevy was not found + // ErrUserNotFound means the user was not found + ErrUserNotFound = StoreError("user not found") + // ErrNotFound means the topic was not found + ErrTopicNotFound = StoreError("topic not found") + // ErrNotFound means the object other then user or topic was not found ErrNotFound = StoreError("not found") // ErrPermissionDenied means the operation is not permitted ErrPermissionDenied = StoreError("denied") diff --git a/server/topic.go b/server/topic.go index 351a1d809..0860353c0 100644 --- a/server/topic.go +++ b/server/topic.go @@ -176,7 +176,7 @@ func (t *Topic) run(hub *Hub) { case sreg := <-t.reg: // Request to add a connection to this topic - if t.isSuspended() { + if !t.isReady() { sreg.sess.queueOut(ErrLocked(sreg.pkt.id, t.original(sreg.sess.uid), types.TimeNow())) } else { // The topic is alive, so stop the kill timer, if it's ticking. We don't want the topic to die @@ -203,7 +203,7 @@ func (t *Topic) run(hub *Hub) { // userId.IsZero() == true when the entire session is being dropped. asUid := leave.userId - if t.isSuspended() { + if !t.isReady() { if !asUid.IsZero() && leave.reqID != "" { leave.sess.queueOut(ErrLocked(leave.reqID, t.original(asUid), now)) } @@ -271,7 +271,7 @@ func (t *Topic) run(hub *Hub) { var pushRcpt *push.Receipt asUid := types.ParseUserId(msg.from) if msg.Data != nil { - if t.isSuspended() { + if !t.isReady() { msg.sess.queueOut(ErrLocked(msg.id, t.original(asUid), msg.timestamp)) continue } @@ -322,7 +322,7 @@ func (t *Topic) run(hub *Hub) { pluginMessage(msg.Data, plgActCreate) } else if msg.Pres != nil { - if t.isSuspended() { + if !t.isReady() { // Ignore presence update - topic is being deleted continue } @@ -336,7 +336,7 @@ func (t *Topic) run(hub *Hub) { // "what" may have changed, i.e. unset or "+command" removed ("on+en" -> "on") msg.Pres.What = what } else if msg.Info != nil { - if t.isSuspended() { + if !t.isReady() { // Ignore info messages - topic is being deleted continue } @@ -2351,12 +2351,20 @@ func (t *Topic) suspend() { atomic.StoreInt32((*int32)(&t.suspended), 1) } +func (t *Topic) markDeleted() { + atomic.StoreInt32((*int32)(&t.suspended), 2) +} + func (t *Topic) resume() { atomic.StoreInt32((*int32)(&t.suspended), 0) } -func (t *Topic) isSuspended() bool { - return atomic.LoadInt32((*int32)(&t.suspended)) != 0 +func (t *Topic) isReady() bool { + return atomic.LoadInt32((*int32)(&t.suspended)) == 0 +} + +func (t *Topic) isDeleted() bool { + return atomic.LoadInt32((*int32)(&t.suspended)) == 2 } // Get topic name suitable for the given client diff --git a/server/utils.go b/server/utils.go index 115ef49a2..506c4376a 100644 --- a/server/utils.go +++ b/server/utils.go @@ -263,6 +263,10 @@ func decodeStoreError(err error, id, topic string, timestamp time.Time, errmsg = ErrPolicy(id, topic, timestamp) case types.ErrCredentials: errmsg = InfoValidateCredentials(id, timestamp) + case types.ErrUserNotFound: + errmsg = ErrUserNotFound(id, topic, timestamp) + case types.ErrTopicNotFound: + errmsg = ErrTopicNotFound(id, topic, timestamp) case types.ErrNotFound: errmsg = ErrNotFound(id, topic, timestamp) default: