Skip to content

Commit

Permalink
ensure Public is loaded for p2p topics even if one sub is deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Mar 3, 2019
1 parent fa9f4ef commit fc21669
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 27 deletions.
37 changes: 32 additions & 5 deletions server/db/mysql/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,25 +1139,39 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
// The difference between UsersForTopic vs SubsForTopic is that the former loads user.public,
// the latter does not.
func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
tcat := t.GetTopicCat(topic)

// Fetch all subscribed users. The number of users is not large
q := `SELECT s.createdat,s.updatedat,s.deletedat,s.userid,s.topic,s.delid,s.recvseqid,
s.readseqid,s.modewant,s.modegiven,u.public,s.private
FROM subscriptions AS s JOIN users AS u ON s.userid=u.id
WHERE s.topic=?`
args := []interface{}{topic}
if !keepDeleted {
// Filter out rows with DeletedAt being not null
q += " AND s.deletedAt IS NULL AND u.deletedat IS NULL"
// Filter out rows with users deleted
q += " AND u.deletedat IS NULL"

// For p2p topics we must load all subscriptions including deleted.
// Otherwise it will be impossibel to swipe Public values.
if tcat != t.TopicCatP2P {
// Filter out deletd subscriptions.
q += " AND s.deletedAt IS NULL"
}
}

limit := maxSubscribers
var oneUser t.Uid
if opts != nil {
// Ignore IfModifiedSince - we must return all entries
// Those unmodified will be stripped of Public & Private.

if !opts.User.IsZero() {
q += " AND s.userid=?"
args = append(args, store.DecodeUid(opts.User))
// For p2p topics we have to fetch both users otherwise public cannot be swapped.
if tcat != t.TopicCatP2P {
q += " AND s.userid=?"
args = append(args, store.DecodeUid(opts.User))
}
oneUser = opts.User
}
if opts.Limit > 0 && opts.Limit < limit {
limit = opts.Limit
Expand Down Expand Up @@ -1191,15 +1205,28 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt
}
rows.Close()

if err == nil && t.GetTopicCat(topic) == t.TopicCatP2P && len(subs) > 0 {
if err == nil && tcat == t.TopicCatP2P && len(subs) > 0 {
// Swap public values of P2P topics as expected.
if len(subs) == 1 {
// The other user is deleted, nothing we can do.
subs[0].SetPublic(nil)
} else {
pub := subs[0].GetPublic()
subs[0].SetPublic(subs[1].GetPublic())
subs[1].SetPublic(pub)
}

// Remove deleted and unneeded subscriptions
if !keepDeleted || !oneUser.IsZero() {
var xsubs []t.Subscription
for i := range subs {
if (subs[i].DeletedAt != nil && !keepDeleted) || (!oneUser.IsZero() && subs[i].Uid() != oneUser) {
continue
}
xsubs = append(xsubs, subs[i])
}
subs = xsubs
}
}

return subs, err
Expand Down
28 changes: 25 additions & 3 deletions server/db/rethinkdb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,20 +868,29 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (

// UsersForTopic loads users subscribed to the given topic
func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
tcat := t.GetTopicCat(topic)

// Fetch topic subscribers
// Fetch all subscribed users. The number of users is not large
q := rdb.DB(a.dbName).Table("subscriptions").GetAllByIndex("Topic", topic)
if !keepDeleted {
// Filter out rows with DeletedAt being not null
if !keepDeleted && tcat != t.TopicCatP2P {
// Filter out rows with DeletedAt being not null.
// P2P topics must load all subscriptions otherwise it will be impossible
// to swap Public values.
q = q.Filter(rdb.Row.HasFields("DeletedAt").Not())
}

limit := maxSubscribers
var oneUser t.Uid
if opts != nil {
// Ignore IfModifiedSince - we must return all entries
// Those unmodified will be stripped of Public & Private.

if !opts.User.IsZero() {
q = q.Filter(rdb.Row.Field("User").Eq(opts.User.String()))
if tcat != t.TopicCatP2P {
q = q.Filter(rdb.Row.Field("User").Eq(opts.User.String()))
}
oneUser = opts.User
}
if opts.Limit > 0 && opts.Limit < limit {
limit = opts.Limit
Expand Down Expand Up @@ -931,12 +940,25 @@ func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt
if t.GetTopicCat(topic) == t.TopicCatP2P && len(subs) > 0 {
// Swap public values of P2P topics as expected.
if len(subs) == 1 {
// User is deleted. Nothing we can do.
subs[0].SetPublic(nil)
} else {
pub := subs[0].GetPublic()
subs[0].SetPublic(subs[1].GetPublic())
subs[1].SetPublic(pub)
}

// Remove deleted and unneeded subscriptions
if !keepDeleted || !oneUser.IsZero() {
var xsubs []t.Subscription
for i := range subs {
if (subs[i].DeletedAt != nil && !keepDeleted) || (!oneUser.IsZero() && subs[i].Uid() != oneUser) {
continue
}
xsubs = append(xsubs, subs[i])
}
subs = xsubs
}
}

return subs, nil
Expand Down
10 changes: 2 additions & 8 deletions server/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,6 @@ func topicInit(sreg *sessionJoin, h *Hub) {
recvID: subs[i].RecvSeqId,
readID: subs[i].ReadSeqId,
}

if t.perUser[uid].public == nil {
log.Println("LOADED NIL public (both subscriptions present)")
}
}

} else {
Expand Down Expand Up @@ -528,6 +524,7 @@ func topicInit(sreg *sessionJoin, h *Hub) {
// a. requester is starting a new topic
// b. requester's subscription is missing: deleted or creation failed
if sub1 == nil {

// Set user1's ModeGiven from user2's default values
userData.modeGiven = selectAccessMode(auth.Level(sreg.pkt.authLvl),
users[u2].Access.Anon,
Expand Down Expand Up @@ -640,10 +637,6 @@ func topicInit(sreg *sessionJoin, h *Hub) {
readID: sub2.ReadSeqId,
recvID: sub2.RecvSeqId,
}

if t.perUser[userID1].public == nil || t.perUser[userID2].public == nil {
log.Println("LOADED NIL public (one or both subs missing)")
}
}

// Clear original topic name.
Expand Down Expand Up @@ -905,6 +898,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea

} else {
// Case 1.2: topic is offline.

asUid := types.ParseUserId(msg.from)
// Get all subscribers: we need to know how many are left and notify them.
subs, err := store.Topics.GetSubs(topic, nil)
Expand Down
2 changes: 1 addition & 1 deletion server/pres.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (t *Topic) presProcReq(fromUserID, what string, wantReply bool) string {

if cmd == "rem" {
replyAs = "off+rem"
if !psd.enabled {
if !psd.enabled && what == "off" {
// If it was disabled before, don't send a redundant update.
what = ""
}
Expand Down
3 changes: 1 addition & 2 deletions server/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ func statsUpdater() {

for upd := range globals.statsUpdate {
if upd == nil {
// Nil update means shutdown
close(globals.statsUpdate)
globals.statsUpdate = nil
// Dont' care to close the channel.
break
}

Expand Down
6 changes: 4 additions & 2 deletions server/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,14 @@ func (TopicsObjMapper) Get(topic string) (*types.Topic, error) {
return adp.TopicGet(topic)
}

// GetUsers loads subscriptions for topic plus loads user.Public
// GetUsers loads subscriptions for topic plus loads user.Public.
// Deleted subscriptions are not loaded.
func (TopicsObjMapper) GetUsers(topic string, opts *types.QueryOpt) ([]types.Subscription, error) {
return adp.UsersForTopic(topic, false, opts)
}

// GetUsersAny is the same as GetUsers, except it loads deleted subscriptions too.
// GetUsersAny loads subscriptions for topic plus loads user.Public. It's the same as GetUsers,
// except it loads deleted subscriptions too.
func (TopicsObjMapper) GetUsersAny(topic string, opts *types.QueryOpt) ([]types.Subscription, error) {
return adp.UsersForTopic(topic, true, opts)
}
Expand Down
10 changes: 4 additions & 6 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2187,8 +2187,8 @@ func (t *Topic) notifySubChange(uid, actor types.Uid, oldWant, oldGiven,
// In case of a P2P topic subscribe/unsubscribe users from each other's notifications.
if t.cat == types.TopicCatP2P {
uid2 := t.p2pOtherUser(uid)
// Remove user1's subscription to user2 without passing "off" change to user1's sessions.
presSingleUserOfflineOffline(uid, uid2.UserId(), "?none+rem", nilPresParams, "")
// Remove user1's subscription to user2 and notify user1's other sessions that he is gone.
t.presSingleUserOffline(uid, "gone", nilPresParams, skip, false)
// Tell user2 that user1 is offline but let him keep sending updates in case user1 resubscribes.
presSingleUserOfflineOffline(uid2, target, "off", nilPresParams, "")
} else if t.cat == types.TopicCatGrp {
Expand Down Expand Up @@ -2220,10 +2220,8 @@ func (t *Topic) notifySubChange(uid, actor types.Uid, oldWant, oldGiven,
t.presSingleUserOffline(uid, "?unkn+en", nilPresParams, "", false)
}

// Notify requester's other sessions.
if unsub {
t.presSingleUserOffline(uid, "gone", nilPresParams, skip, false)
} else {
// Notify requester's other sessions that permissions have changed.
if !unsub {
t.presSingleUserOffline(uid, "acs", params, skip, false)
}
}
Expand Down

0 comments on commit fc21669

Please sign in to comment.