Skip to content

Commit

Permalink
add PinMessage and PinnedMessage (#2180)
Browse files Browse the repository at this point in the history
* add PinMessage and PinnedMessage

* fix gruop pin messages

* add SkipGroupMessageWrap to pin messages

* update pinMessage ID generation to be symmetric
  • Loading branch information
gravityblast committed May 14, 2021
1 parent 6a930ed commit e9a42bf
Show file tree
Hide file tree
Showing 18 changed files with 1,078 additions and 245 deletions.
60 changes: 60 additions & 0 deletions protocol/common/pin_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package common

import (
"crypto/ecdsa"

"github.com/golang/protobuf/proto"

"github.com/status-im/status-go/protocol/protobuf"
)

type PinMessage struct {
protobuf.PinMessage

// ID calculated as keccak256(compressedAuthorPubKey, data) where data is unencrypted payload.
ID string `json:"id"`
// MessageID string `json:"messageID"`
// WhisperTimestamp is a timestamp of a Whisper envelope.
WhisperTimestamp uint64 `json:"whisperTimestamp"`
// From is a public key of the user who pinned the message.
From string `json:"from"`
// The chat id to be stored locally
LocalChatID string `json:"localChatId"`
SigPubKey *ecdsa.PublicKey `json:"-"`
// Identicon of the author
Identicon string `json:"identicon"`
// Random 3 words name
Alias string `json:"alias"`
}

type PinnedMessage struct {
Message
PinnedAt uint64 `json:"pinnedAt"`
}

// WrapGroupMessage indicates whether we should wrap this in membership information
func (m *PinMessage) WrapGroupMessage() bool {
return false
}

// SetMessageType a setter for the MessageType field
// this function is required to implement the ChatEntity interface
func (m *PinMessage) SetMessageType(messageType protobuf.MessageType) {
m.MessageType = messageType
}

func (m *PinMessage) GetGrant() []byte {
return nil
}

// GetProtoBuf returns the struct's embedded protobuf struct
// this function is required to implement the ChatEntity interface
func (m *PinMessage) GetProtobuf() proto.Message {
return &m.PinMessage
}

// GetSigPubKey returns an ecdsa encoded public key
// this function is required to implement the ChatEntity interface
func (m PinMessage) GetSigPubKey() *ecdsa.PublicKey {
return m.SigPubKey
}
26 changes: 26 additions & 0 deletions protocol/message_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,29 @@ func extendMessageFromChat(message *common.Message, chat *Chat, key *ecdsa.Publi
return nil

}

func extendPinMessageFromChat(message *common.PinMessage, chat *Chat, key *ecdsa.PublicKey, timesource common.TimeSource) error {
clock, timestamp := chat.NextClockAndTimestamp(timesource)

message.LocalChatID = chat.ID
message.Clock = clock
message.From = types.EncodeHex(crypto.FromECDSAPub(key))
message.SigPubKey = key
message.WhisperTimestamp = timestamp

identicon, err := identicon.GenerateBase64(message.From)
if err != nil {
return err
}

message.Identicon = identicon

alias, err := alias.GenerateFromPublicKeyString(message.From)
if err != nil {
return err
}

message.Alias = alias
return nil

}
61 changes: 57 additions & 4 deletions protocol/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"encoding/hex"
"fmt"

"github.com/status-im/status-go/protocol/transport"

"github.com/pkg/errors"

"go.uber.org/zap"

"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/images"
Expand All @@ -18,9 +18,9 @@ import (
"github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/ens"
"github.com/status-im/status-go/protocol/protobuf"
v1protocol "github.com/status-im/status-go/protocol/v1"

"go.uber.org/zap"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
)

const (
Expand Down Expand Up @@ -324,6 +324,59 @@ func (m *MessageHandler) HandleSyncInstallationPublicChat(state *ReceivedMessage
return true
}

func (m *MessageHandler) HandlePinMessage(state *ReceivedMessageState, message protobuf.PinMessage) error {
logger := m.logger.With(zap.String("site", "HandlePinMessage"))

logger.Info("Handling pin message")

pinMessage := &common.PinMessage{
PinMessage: message,
// MessageID: message.MessageId,
WhisperTimestamp: state.CurrentMessageState.WhisperTimestamp,
From: state.CurrentMessageState.Contact.ID,
SigPubKey: state.CurrentMessageState.PublicKey,
Identicon: state.CurrentMessageState.Contact.Identicon,
Alias: state.CurrentMessageState.Contact.Alias,
}

chat, err := m.matchChatEntity(pinMessage, state.AllChats, state.AllContacts, state.Timesource)
if err != nil {
return err // matchChatEntity returns a descriptive error message
}

pinMessage.ID, err = generatePinMessageID(&m.identity.PublicKey, pinMessage, chat)
if err != nil {
return err
}

// If deleted-at is greater, ignore message
if chat.DeletedAtClockValue >= pinMessage.Clock {
return nil
}

// Set the LocalChatID for the message
pinMessage.LocalChatID = chat.ID

if c, ok := state.AllChats.Load(chat.ID); ok {
chat = c
}

// Set the LocalChatID for the message
pinMessage.LocalChatID = chat.ID

if chat.LastClockValue < message.Clock {
chat.LastClockValue = message.Clock
}

state.Response.AddPinMessage(pinMessage)

// Set in the modified maps chat
state.Response.AddChat(chat)
state.AllChats.Store(chat.ID, chat)

return nil
}

func (m *MessageHandler) HandleContactUpdate(state *ReceivedMessageState, message protobuf.ContactUpdate) error {
logger := m.logger.With(zap.String("site", "HandleContactUpdate"))
contact := state.CurrentMessageState.Contact
Expand Down
161 changes: 161 additions & 0 deletions protocol/message_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,91 @@ func (db sqlitePersistence) MessageByChatID(chatID string, currCursor string, li
return result, newCursor, nil
}

// PinnedMessageByChatID returns all pinned messages for a given chatID in descending order.
// Ordering is accomplished using two concatenated values: ClockValue and ID.
// These two values are also used to compose a cursor which is returned to the result.
func (db sqlitePersistence) PinnedMessageByChatIDs(chatIDs []string, currCursor string, limit int) ([]*common.PinnedMessage, string, error) {
cursorWhere := ""
if currCursor != "" {
cursorWhere = "AND cursor <= ?" //nolint: goconst
}
allFields := db.tableUserMessagesAllFieldsJoin()
args := make([]interface{}, len(chatIDs))
for i, v := range chatIDs {
args[i] = v
}
if currCursor != "" {
args = append(args, currCursor)
}
// Build a new column `cursor` at the query time by having a fixed-sized clock value at the beginning
// concatenated with message ID. Results are sorted using this new column.
// This new column values can also be returned as a cursor for subsequent requests.
rows, err := db.db.Query(
fmt.Sprintf(`
SELECT
%s,
pm.clock_value as pinnedAt,
substr('0000000000000000000000000000000000000000000000000000000000000000' || m1.clock_value, -64, 64) || m1.id as cursor
FROM
pin_messages pm
JOIN
user_messages m1
ON
pm.message_id = m1.id
LEFT JOIN
user_messages m2
ON
m1.response_to = m2.id
LEFT JOIN
contacts c
ON
m1.source = c.id
WHERE
pm.pinned = 1
AND NOT(m1.hide) AND m1.local_chat_id IN %s %s
ORDER BY cursor DESC
LIMIT ?
`, allFields, "(?"+strings.Repeat(",?", len(chatIDs)-1)+")", cursorWhere),
append(args, limit+1)..., // take one more to figure our whether a cursor should be returned
)
if err != nil {
return nil, "", err
}
defer rows.Close()

var (
result []*common.PinnedMessage
cursors []string
)
for rows.Next() {
var (
message common.Message
pinnedAt uint64
cursor string
)
if err := db.tableUserMessagesScanAllFields(rows, &message, &pinnedAt, &cursor); err != nil {
return nil, "", err
}
pinnedMessage := &common.PinnedMessage{
Message: message,
PinnedAt: pinnedAt,
}
result = append(result, pinnedMessage)
cursors = append(cursors, cursor)
}

var newCursor string
if len(result) > limit && cursors != nil {
newCursor = cursors[limit]
result = result[:limit]
}
return result, newCursor, nil
}

func (db sqlitePersistence) PinnedMessageByChatID(chatID string, currCursor string, limit int) ([]*common.PinnedMessage, string, error) {
return db.PinnedMessageByChatIDs([]string{chatID}, currCursor, limit)
}

// MessageByChatIDs returns all messages for a given chatIDs in descending order.
// Ordering is accomplished using two concatenated values: ClockValue and ID.
// These two values are also used to compose a cursor which is returned to the result.
Expand Down Expand Up @@ -807,6 +892,76 @@ func (db sqlitePersistence) SaveMessages(messages []*common.Message) (err error)
return
}

func (db sqlitePersistence) SavePinMessages(messages []*common.PinMessage) (err error) {
tx, err := db.db.BeginTx(context.Background(), nil)
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
// don't shadow original error
_ = tx.Rollback()
}()

// select
selectQuery := "SELECT clock_value FROM pin_messages WHERE id = ?"

// insert
allInsertFields := `id, message_id, whisper_timestamp, chat_id, local_chat_id, clock_value, pinned`
insertValues := strings.Repeat("?, ", strings.Count(allInsertFields, ",")) + "?"
insertQuery := "INSERT INTO pin_messages(" + allInsertFields + ") VALUES (" + insertValues + ")" // nolint: gosec
insertStmt, err := tx.Prepare(insertQuery)
if err != nil {
return
}

// update
updateQuery := "UPDATE pin_messages SET pinned = ?, clock_value = ? WHERE id = ?"
updateStmt, err := tx.Prepare(updateQuery)
if err != nil {
return
}

for _, message := range messages {
row := tx.QueryRow(selectQuery, message.ID)
var existingClock uint64
switch err = row.Scan(&existingClock); err {
case sql.ErrNoRows:
// not found, insert new record
allValues := []interface{}{
message.ID,
message.MessageId,
message.WhisperTimestamp,
message.ChatId,
message.LocalChatID,
message.Clock,
message.Pinned,
}
_, err = insertStmt.Exec(allValues...)
if err != nil {
return
}
case nil:
// found, update if current message is more recent, otherwise skip
if existingClock < message.Clock {
// update
_, err = updateStmt.Exec(message.Pinned, message.Clock, message.ID)
if err != nil {
return
}
}

default:
return
}
}

return
}

func (db sqlitePersistence) DeleteMessage(id string) error {
_, err := db.db.Exec(`DELETE FROM user_messages WHERE id = ?`, id)
return err
Expand Down Expand Up @@ -838,6 +993,12 @@ func (db sqlitePersistence) deleteMessagesByChatID(id string, tx *sql.Tx) (err e
}

_, err = tx.Exec(`DELETE FROM user_messages WHERE local_chat_id = ?`, id)
if err != nil {
return
}

_, err = tx.Exec(`DELETE FROM pin_messages WHERE local_chat_id = ?`, id)

return
}

Expand Down
15 changes: 15 additions & 0 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2539,6 +2539,15 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
continue
}

case protobuf.PinMessage:
pinMessage := msg.ParsedMessage.Interface().(protobuf.PinMessage)
err = m.handler.HandlePinMessage(messageState, pinMessage)
if err != nil {
logger.Warn("failed to handle PinMessage", zap.Error(err))
allMessagesProcessed = false
continue
}

case protobuf.PairInstallation:
if !common.IsPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
logger.Warn("not coming from us, ignoring")
Expand Down Expand Up @@ -2939,6 +2948,12 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
return nil, err
}
}
if len(messageState.Response.pinMessages) > 0 {
err = m.SavePinMessages(messageState.Response.PinMessages())
if err != nil {
return nil, err
}
}

for _, emojiReaction := range messageState.EmojiReactions {
messageState.Response.EmojiReactions = append(messageState.Response.EmojiReactions, emojiReaction)
Expand Down
Loading

0 comments on commit e9a42bf

Please sign in to comment.