Skip to content

Commit

Permalink
fix for #742 (wip: mondo adapter is not don yet, will not compile)
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Apr 24, 2022
1 parent 16be737 commit 53ff987
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 108 deletions.
5 changes: 3 additions & 2 deletions server/db/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ type Adapter interface {
// UserGetByCred returns user ID for the given validated credential.
UserGetByCred(method, value string) (t.Uid, error)
// UserUnreadCount returns the total number of unread messages in all topics with
// the R permission.
UserUnreadCount(uid t.Uid) (int, error)
// the R permission. If read fails, the counts are still returned with the original
// user IDs but with the unread count undefined and non-nil error.
UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error)

// Credential management

Expand Down
5 changes: 3 additions & 2 deletions server/db/mongodb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,9 @@ func (a *adapter) UserGetByCred(method, value string) (t.Uid, error) {
}

// UserUnreadCount returns the total number of unread messages in all topics with
// the R permission.
func (a *adapter) UserUnreadCount(uid t.Uid) (int, error) {
// the R permission. If read fails, the counts are still returned with the original
// user IDs but with the unread count undefined and non-nil error.
func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
pipeline := b.A{
b.M{"$match": b.M{"user": uid.String()}},
// Join documents from two collection
Expand Down
20 changes: 14 additions & 6 deletions server/db/mongodb/tests/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,29 @@ func TestCredGetAll(t *testing.T) {
}

func TestUserUnreadCount(t *testing.T) {
count, err := adp.UserUnreadCount(types.ParseUserId("usr" + users[2].Id))
uids := []types.Uid{types.ParseUserId("usr" + users[1].Id), types.ParseUserId("usr" + users[2].Id)}
expected := map[types.Uid]int{uids[0]: 5, uids[1]: 100}
counts, err := adp.UserUnreadCount(uids...)
if err != nil {
t.Fatal(err)
}
if count != 100 {
t.Error(mismatchErrorString("UnreadCount", count, 100))
if len(counts) != 2 {
t.Error(mismatchErrorString("UnreadCount length", len(counts), 2))
}

for i := range counts {
if expected[counts[i].UserId] != counts[i].UnreadCount {
t.Error(mismatchErrorString("UnreadCount", counts[i].UserId, counts[0].UnreadCount))
}
}

// Test not found
count, err = adp.UserUnreadCount(types.ParseUserId("dummyuserid"))
counts, err = adp.UserUnreadCount(types.ParseUserId("dummyuserid"))
if err != nil {
t.Fatal(err)
}
if count != 0 {
t.Error(mismatchErrorString("UnreadCount", count, 0))
if len(counts) != 0 {
t.Error(mismatchErrorString("UnreadCount length (dummy)", len(counts), 0))
}
}

Expand Down
47 changes: 37 additions & 10 deletions server/db/mysql/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,25 +1335,52 @@ func (a *adapter) UserGetByCred(method, value string) (t.Uid, error) {
}

// UserUnreadCount returns the total number of unread messages in all topics with
// the R permission.
func (a *adapter) UserUnreadCount(uid t.Uid) (int, error) {
// the R permission. If read fails, the counts are still returned with the original
// user IDs but with the unread count undefined and non-nil error.
func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
uids := make([]interface{}, len(ids))
for i, id := range ids {
uids[i] = store.DecodeUid(id)
}

q, uids, _ := sqlx.In("SELECT s.userid, SUM(t.seqid)-SUM(s.readseqid) AS unreadcount FROM topics AS t, subscriptions AS s "+
"WHERE s.userid IN (?) AND t.name=s.topic AND s.deletedat IS NULL AND t.state!=? AND "+
"INSTR(s.modewant, 'R')>0 AND INSTR(s.modegiven, 'R')>0 GROUP BY s.userid", uids, t.StateDeleted)
q = a.db.Rebind(q)

ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}
var count int
err := a.db.GetContext(ctx, &count, "SELECT SUM(t.seqid)-SUM(s.readseqid) FROM topics AS t, subscriptions AS s "+
"WHERE s.userid=? AND t.name=s.topic AND s.deletedat IS NULL AND t.state!=? AND "+
"INSTR(s.modewant, 'R')>0 AND INSTR(s.modegiven, 'R')>0", store.DecodeUid(uid), t.StateDeleted)

rows, err := a.db.QueryxContext(ctx, q, uids...)
if err != nil {
return nil, err
}

counts := make(map[t.Uid]int, len(ids))

var userId int64
var unreadCount int
for rows.Next() {
if err = rows.Scan(&userId, &unreadCount); err != nil {
break
}
counts[store.EncodeUid(userId)] = unreadCount
}
if err == nil {
return count, nil
err = rows.Err()
}
rows.Close()

if err == sql.ErrNoRows {
return 0, nil
// Ensure all original uids are always present.
for _, uid := range ids {
if _, ok := counts[uid]; !ok {
counts[uid] = 0
}
}

return -1, err
return counts, err
}

// *****************************
Expand Down
61 changes: 41 additions & 20 deletions server/db/rethinkdb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,45 +1016,66 @@ func (a *adapter) UserGetByCred(method, value string) (t.Uid, error) {
}

// UserUnreadCount returns the total number of unread messages in all topics with
// the R permission.
func (a *adapter) UserUnreadCount(uid t.Uid) (int, error) {
// the R permission. If read fails, the counts are still returned with the original
// user IDs but with the unread count undefined and non-nil error.
func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
// The call expects user IDs to be plain strings like "356zaYaumiU".
uids := make([]interface{}, len(ids))
for i, id := range ids {
uids[i] = id.String()
}

/*
r.db("tinode").table("subscriptions").getAll("8L6HpDuF05c", {index: "User"})
.eqJoin("Topic", r.db("tinode").table("topics"), {index: "Id"})
.filter(
r.not(r.row.hasFields({"left": "DeletedAt"}).or(r.row("right")("State").eq(20)))
)
.zip()
.pluck("ReadSeqId", "ModeWant", "ModeGiven", "SeqId")
.filter(r.js('(function(row) {return row.ModeWant&row.ModeGiven&1 > 0;})'))
.sum(function(x) {return x.getField("SeqId").sub(x.getField("ReadSeqId"));})
Query:
r.db("tinode").table("subscriptions").getAll("356zaYaumiU", "k4cvfaq8zCQ", {index: "User"})
.eqJoin("Topic", r.db("tinode").table("topics"), {index: "Id"})
.filter(
r.not(r.row.hasFields({"left": "DeletedAt"}).or(r.row("right")("State").eq(20)))
)
.zip()
.pluck("User", "ReadSeqId", "ModeWant", "ModeGiven", "SeqId")
.filter(r.js('(function(row) {return row.ModeWant&row.ModeGiven&1 > 0;})'))
.group("User")
.sum(function(x) {return x.getField("SeqId").sub(x.getField("ReadSeqId"));})
Result:
[{group: "356zaYaumiU", reduction: 1}, {group: "k4cvfaq8zCQ", reduction: 0}]
*/
cursor, err := rdb.DB(a.dbName).Table("subscriptions").GetAllByIndex("User", uid.String()).
cursor, err := rdb.DB(a.dbName).Table("subscriptions").GetAllByIndex("User", uids...).
EqJoin("Topic", rdb.DB(a.dbName).Table("topics"), rdb.EqJoinOpts{Index: "Id"}).
// left: subscription; right: topic.
Filter(
rdb.Not(rdb.Row.HasFields(map[string]interface{}{"left": "DeletedAt"}).
Or(rdb.Row.Field("right").Field("State").Eq(t.StateDeleted)))).
Zip().
Pluck("ReadSeqId", "ModeWant", "ModeGiven", "SeqId").
Pluck("User", "ReadSeqId", "ModeWant", "ModeGiven", "SeqId").
Filter(rdb.JS("(function(row) {return (row.ModeWant & row.ModeGiven & " + strconv.Itoa(int(t.ModeRead)) + ") > 0;})")).
Group("User").
Sum(func(row rdb.Term) rdb.Term { return row.Field("SeqId").Sub(row.Field("ReadSeqId")) }).
Run(a.conn)
if err != nil {
return -1, err
return nil, err
}
defer cursor.Close()

if cursor.IsNil() {
return 0, nil
counts := make(map[t.Uid]int, len(ids))
var oneCount struct {
Group string
Reduction int
}
for cursor.Next(&oneCount) {
counts[t.ParseUid(oneCount.Group)] = oneCount.Reduction
}
err = cursor.Err()

var count int
if err = cursor.One(&count); err != nil {
return -1, err
// Ensure all original uids are always present.
for _, uid := range ids {
if _, ok := counts[uid]; !ok {
counts[uid] = 0
}
}

return count, nil
return counts, err
}

// *****************************
Expand Down
14 changes: 9 additions & 5 deletions server/store/mock_store/mock_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions server/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ type UsersPersistenceInterface interface {
GetActiveCred(id types.Uid, method string) (*types.Credential, error)
GetAllCreds(id types.Uid, method string, validatedOnly bool) ([]types.Credential, error)
DelCred(id types.Uid, method, value string) error
GetUnreadCount(id types.Uid) (int, error)
GetUnreadCount(ids ...types.Uid) (map[types.Uid]int, error)
}

// usersMapper is a concrete type which implements UsersPersistenceInterface.
Expand Down Expand Up @@ -494,9 +494,9 @@ func (usersMapper) DelCred(id types.Uid, method, value string) error {
return adp.CredDel(id, method, value)
}

// GetUnreadCount returs user's total count of unread messages in all topics with the R permissions
func (usersMapper) GetUnreadCount(id types.Uid) (int, error) {
return adp.UserUnreadCount(id)
// GetUnreadCount returs users' total count of unread messages in all topics with the R permissions.
func (usersMapper) GetUnreadCount(ids ...types.Uid) (map[types.Uid]int, error) {
return adp.UserUnreadCount(ids...)
}

// TopicsPersistenceInterface is an interface which defines methods for persistent storage of topics.
Expand Down
4 changes: 2 additions & 2 deletions server/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ func TestRegisterSessionNewChannelGetSubDbError(t *testing.T) {
sess: s,
}

helper.ss.EXPECT().Get(chanName, uid, true).Return(nil, types.ErrInternal)
helper.ss.EXPECT().Get(chanName, uid, false).Return(nil, types.ErrInternal)

helper.topic.registerSession(join)
helper.finish()
Expand Down Expand Up @@ -1576,7 +1576,7 @@ func TestRegisterSessionCreateSubFailed(t *testing.T) {
sess: s,
}

helper.ss.EXPECT().Create(gomock.Any()).Return(types.ErrInternal)
helper.ss.EXPECT().Get(topicName, uid, true).Return(nil, types.ErrInternal)

helper.topic.registerSession(join)
helper.finish()
Expand Down
Loading

0 comments on commit 53ff987

Please sign in to comment.