Skip to content

Commit

Permalink
pass Get query parameters to adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed May 16, 2018
1 parent 6a0902c commit 2272349
Show file tree
Hide file tree
Showing 15 changed files with 592 additions and 543 deletions.
478 changes: 229 additions & 249 deletions pbx/model.pb.go

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions pbx/model.proto
Expand Up @@ -72,17 +72,18 @@ message SetDesc {
}

message GetOpts {
int64 if_modified_since = 1; // timestamp in milliseconds since epoch 01/01/1970
int32 limit = 2;
}

message BrowseOpts {
// Timestamp in milliseconds since epoch 01/01/1970
int64 if_modified_since = 1;
// Limit search to this user ID
string user = 2;
// Limit search results to one topic;
string topic = 3;
// Load messages with seq id equal or greater than this
int32 since_id = 1;
int32 since_id = 4;
// Load messages with seq id lower than this
int32 before_id = 3;
// Limit the number of messages loaded
int32 limit = 5;
int32 before_id = 5;
// Maximum number of results to return
int32 limit = 6;
}

message GetQuery {
Expand All @@ -93,7 +94,7 @@ message GetQuery {
// Parameters of "sub" request
GetOpts sub = 3;
// Parameters of "data" request
BrowseOpts data = 4;
GetOpts data = 4;
}

message SetQuery {
Expand Down
237 changes: 106 additions & 131 deletions pbx/model_pb2.py

Large diffs are not rendered by default.

29 changes: 14 additions & 15 deletions server/datamodel.go
Expand Up @@ -14,8 +14,14 @@ import (
"time"
)

// MsgBrowseOpts defines parameters for queries by massage IDs.
type MsgBrowseOpts struct {
// MsgGetOpts defines Get query parameters.
type MsgGetOpts struct {
// Optional User ID to return result for one user.
User string `json:"user,omitempty"`
// Optional topic name to return result for one topic.
Topic string `json:"topic,omitempty"`
// Return results modified dince this timespamp.
IfModifiedSince *time.Time `json:"ims,omitempty"`
// Load messages/ranges with IDs equal or greater than this (inclusive or closed)
SinceId int `json:"since,omitempty"`
// Load messages/ranges with IDs lower than this (exclusive or open)
Expand All @@ -24,25 +30,18 @@ type MsgBrowseOpts struct {
Limit int `json:"limit,omitempty"`
}

// MsgGetOpts defines parameters for queries by last modified time.
type MsgGetOpts struct {
User string `json:"user,omitempty"`
IfModifiedSince *time.Time `json:"ims,omitempty"`
Limit int `json:"limit,omitempty"`
}

// MsgGetQuery is a topic metadata or data query.
type MsgGetQuery struct {
What string `json:"what"`

// Parameters of "desc" request
// Parameters of "desc" request: IfModifiedSince
Desc *MsgGetOpts `json:"desc,omitempty"`
// Parameters of "sub" request
// Parameters of "sub" request: User, Topic, IfModifiedSince, Limit.
Sub *MsgGetOpts `json:"sub,omitempty"`
// Parameters of "data" request
Data *MsgBrowseOpts `json:"data,omitempty"`
// Parameters of "del" request
Del *MsgBrowseOpts `json:"del,omitempty"`
// Parameters of "data" request: Since, Before, Limit.
Data *MsgGetOpts `json:"data,omitempty"`
// Parameters of "del" request: Since, Before, Limit.
Del *MsgGetOpts `json:"del,omitempty"`
}

// MsgSetSub is a payload in set.sub request to update current subscription or invite another user, {sub.what} == "sub"
Expand Down
134 changes: 89 additions & 45 deletions server/db/mysql/adapter.go
Expand Up @@ -757,17 +757,35 @@ func (a *adapter) TopicGet(topic string) (*t.Topic, error) {

// TopicsForUser loads user's contact list: p2p and grp topics, except for 'me' subscription.
// Reads and denormalizes Public value.
func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool) ([]t.Subscription, error) {
func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
// Fetch user's subscriptions
q := `SELECT createdat,updatedat,deletedat,topic,delid,recvseqid,
readseqid,modewant,modegiven,private FROM subscriptions WHERE userid=?`
args := []interface{}{store.DecodeUid(uid)}
if !keepDeleted {
// Filter out rows with defined DeletedAt
q += " AND deletedAt IS NULL"
}

limit := maxResults
if opts != nil {
if opts.IfModifiedSince != nil {
q += " AND updatedat>?"
args = append(args, opts.IfModifiedSince)
}
if opts.Topic != "" {
q += " AND topic=?"
args = append(args, opts.Topic)
}
if opts.Limit > 0 && opts.Limit < limit {
limit = opts.Limit
}
}

q += " LIMIT ?"
args = append(args, limit)

rows, err := a.db.Queryx(q, store.DecodeUid(uid), maxResults)
rows, err := a.db.Queryx(q, args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -881,19 +899,39 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool) ([]t.Subscription,
return subs, err
}

// UsersForTopic loads users subscribed to the given topic
func (a *adapter) UsersForTopic(topic string, keepDeleted bool) ([]t.Subscription, error) {
// UsersForTopic loads users subscribed to the given topic.
// The difference between UsersForTopic vs SubsForTopic is that the former loads user.public,
// the latter does not.
func (a *adapter) UsersForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
// Fetch all subscribed users. The number of users is not large
q := `SELECT s.createdat,s.updatedat,s.deletedat,s.userid,s.topic,s.delid,s.recvseqid,
s.readseqid,s.modewant,s.modegiven,u.public,s.private
FROM subscriptions AS s JOIN users AS u ON s.userid=u.id
WHERE s.topic=?`
args := []interface{}{topic}
if !keepDeleted {
// Filter out rows with DeletedAt being not null
q += " AND s.deletedAt IS NULL"
}

limit := maxSubscribers
if opts != nil {
if opts.IfModifiedSince != nil {
q += " AND s.updatedat>?"
args = append(args, opts.IfModifiedSince)
}
if !opts.User.IsZero() {
q += " AND s.userid=?"
args = append(args, store.DecodeUid(opts.User))
}
if opts.Limit > 0 && opts.Limit < limit {
limit = opts.Limit
}
}
q += " LIMIT ?"
rows, err := a.db.Queryx(q, topic, maxSubscribers)
args = append(args, limit)

rows, err := a.db.Queryx(q, args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1036,20 +1074,34 @@ func (a *adapter) SubsLastSeen(topic string, user t.Uid, lastSeen map[string]tim
return err
}

// SubsForUser loads a list of user's subscriptions to topics. Does NOT read Public value.
func (a *adapter) SubsForUser(forUser t.Uid, keepDeleted bool) ([]t.Subscription, error) {
if forUser.IsZero() {
return nil, t.ErrMalformed
}

// SubsForUser loads a list of user's subscriptions to topics. Does NOT load Public value.
func (a *adapter) SubsForUser(forUser t.Uid, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
q := `SELECT createdat,updatedat,deletedat,userid AS user,topic,delid,recvseqid,
readseqid,modewant,modegiven,private FROM subscriptions WHERE userid=?`
args := []interface{}{store.DecodeUid(forUser)}

if !keepDeleted {
q += " AND deletedAt IS NULL"
}

limit := maxResults // maxResults here, not maxSubscribers
if opts != nil {
if opts.IfModifiedSince != nil {
q += " AND updatedat>?"
args = append(args, opts.IfModifiedSince)
}
if opts.Topic != "" {
q += " AND updatedat>?"
args = append(args, opts.Topic)
}
if opts.Limit > 0 && opts.Limit < limit {
limit = opts.Limit
}
}
q += " LIMIT ?"
args = append(args, limit)

rows, err := a.db.Queryx(q, store.DecodeUid(forUser), maxResults)
rows, err := a.db.Queryx(q, args...)
if err != nil {
return nil, err
}
Expand All @@ -1068,32 +1120,37 @@ func (a *adapter) SubsForUser(forUser t.Uid, keepDeleted bool) ([]t.Subscription
return subs, err
}

// SubsForTopic fetches all subsciptions for a topic.
func (a *adapter) SubsForTopic(topic string, keepDeleted bool) ([]t.Subscription, error) {
// log.Println("Loading subscriptions for topic ", topic)

// must load User.Public for p2p topics
var p2p []t.User
var err error
if t.GetTopicCat(topic) == t.TopicCatP2P {
uid1, uid2, _ := t.ParseP2P(topic)
if p2p, err = a.UserGetAll(uid1, uid2); err != nil {
// log.Println("SubsForTopic", "UserGetAll", err)
return nil, err
} else if len(p2p) != 2 {
return nil, errors.New("failed to load two p2p users")
}
}

// SubsForTopic fetches all subsciptions for a topic. Does NOT load Public value.
// The difference between UsersForTopic vs SubsForTopic is that the former loads user.public,
// the latter does not.
func (a *adapter) SubsForTopic(topic string, keepDeleted bool, opts *t.QueryOpt) ([]t.Subscription, error) {
q := `SELECT createdat,updatedat,deletedat,userid AS user,topic,delid,recvseqid,
readseqid,modewant,modegiven,private FROM subscriptions WHERE topic=?`
args := []interface{}{topic}

if !keepDeleted {
// Filter out rows where DeletedAt is defined
q += " AND deletedAt IS NULL"
}
limit := maxSubscribers
if opts != nil {
if opts.IfModifiedSince != nil {
q += " AND updatedAt>?"
args = append(args, opts.IfModifiedSince)
}
if !opts.User.IsZero() {
q += " AND userid=?"
args = append(args, store.DecodeUid(opts.User))
}
if opts.Limit > 0 && opts.Limit < limit {
limit = opts.Limit
}
}

q += " LIMIT ?"
args = append(args, limit)

rows, err := a.db.Queryx(q, topic, maxSubscribers)
rows, err := a.db.Queryx(q, args...)
if err != nil {
return nil, err
}
Expand All @@ -1102,24 +1159,11 @@ func (a *adapter) SubsForTopic(topic string, keepDeleted bool) ([]t.Subscription
var ss t.Subscription
for rows.Next() {
if err = rows.StructScan(&ss); err != nil {
// log.Println("SubsForTopic", "StructScan", err)
break
}

ss.User = encodeString(ss.User).String()
ss.Private = fromJSON(ss.Private)
if p2p != nil {
// Assigning values provided by the other user
if p2p[0].Id == ss.User {
ss.SetPublic(p2p[1].Public)
ss.SetWith(p2p[1].Id)
ss.SetDefaultAccess(p2p[1].Access.Auth, p2p[1].Access.Anon)
} else {
ss.SetPublic(p2p[0].Public)
ss.SetWith(p2p[0].Id)
ss.SetDefaultAccess(p2p[0].Access.Auth, p2p[0].Access.Anon)
}
}
subs = append(subs, ss)
// log.Printf("SubsForTopic: loaded sub %#+v", ss)
}
Expand Down Expand Up @@ -1302,7 +1346,7 @@ func (a *adapter) MessageSave(msg *t.Message) error {
return err
}

func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.BrowseOpt) ([]t.Message, error) {
func (a *adapter) MessageGetAll(topic string, forUser t.Uid, opts *t.QueryOpt) ([]t.Message, error) {
var limit = maxResults // TODO(gene): pass into adapter as a config param
var lower = 0
var upper = 1 << 31
Expand Down Expand Up @@ -1358,7 +1402,7 @@ var dellog struct {
}

// Get ranges of deleted messages
func (a *adapter) MessageGetDeleted(topic string, forUser t.Uid, opts *t.BrowseOpt) ([]t.DelMessage, error) {
func (a *adapter) MessageGetDeleted(topic string, forUser t.Uid, opts *t.QueryOpt) ([]t.DelMessage, error) {
var limit = maxResults
var lower = 0
var upper = 1 << 31
Expand Down

0 comments on commit 2272349

Please sign in to comment.