Skip to content

Commit

Permalink
feat(mediator) : Add TTL for message collection (#290)
Browse files Browse the repository at this point in the history
Add TTL for message collection to do house keeping
update time stamp field from String to Intant
Update initdb.js
Add migration script for ttl
  • Loading branch information
mineme0110 committed Apr 30, 2024
1 parent 56b0c8a commit f6d19f6
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 10 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,21 @@ To set up the mediator storage (MongoDB):
- `MONGODB_PASSWORD` - is the password used by the Mediator service to connect to the database.
- `MONGODB_DB_NAME` - is the name of the database used by the Mediator.

#### Mediator storage
- The `messages` collection contains two types of messages: `Mediator` and `User`.
1. **Mediator Messages**:
- These messages received by mediator for any interactions with the mediator.
- Examples include messages for setting up mediation, requesting mediation, or picking up messages from the mediator.
- These messages stored in collection can be used for debugging purpose mediator functionality and interactions with the mediator. Hence they can be deleted after a period of time.
- This message type `Mediator` can be setup to have a configurable Time-To-Live (TTL) value, after which they can expire.
- This is how the TTL can be configured for the collection messages [initdb.js](initdb.js)
2. **User Messages**:
- These are the actual messages e.g like the Forward message from the mediator, contain a `User` message inside. This inside message is stored as type `User` to be delivered to user.
- They do not have a TTL, and will persist in the system until the user retrieves them using a pickup protocol and deletes them.
- The mediator is responsible for storing and making these user messages available for delivery to the intended recipients.

ℹ️ For existing users, please utilize the migration script [migration_mediator_collection.js](migration_mediator_collection.js) to migrate the collection.

## Run

The DIDComm Mediator comprises two elements: a backend service and a database.
Expand Down
10 changes: 10 additions & 0 deletions infrastructure/charts/mediator/templates/mongodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ data:
// Only enforce uniqueness on non-empty arrays
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true , partialFilterExpression: { "alias.0": { $exists: true } }});
db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 });
// 7 day * 24 hours * 60 minutes * 60 seconds
const expireAfterSeconds = 7 * 24 * 60 * 60;
db.getCollection(collectionMessages).createIndex(
{ ts: 1 },
{
name: "message-ttl-index",
partialFilterExpression: { "message_type" : "Mediator" },
expireAfterSeconds: expireAfterSeconds
}
);
---
apiVersion: v1
kind: Service
Expand Down
11 changes: 11 additions & 0 deletions initdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,14 @@ db.getCollection(collectionDidAccount).createIndex({ 'did': 1 }, { unique: true
// Only enforce uniqueness on non-empty arrays
db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true, partialFilterExpression: { "alias.0": { $exists: true } } });
db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 });

// There are 2 message types `Mediator` and `User` Please follow the Readme for more details in the section Mediator storage
const expireAfterSeconds = 7 * 24 * 60 * 60; // 7 day * 24 hours * 60 minutes * 60 seconds
db.getCollection(collectionMessages).createIndex(
{ ts: 1 },
{
name: "message-ttl-index",
partialFilterExpression: { "message_type" : "Mediator" },
expireAfterSeconds: expireAfterSeconds
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object AgentExecutorMediator {
em.`protected`.obj match
case AnonProtectedHeader(epk, apv, typ, enc, alg) => ops.anonDecrypt(em)
case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => ops.authDecrypt(em)
}.flatMap(decrypt _)
}.flatMap(decrypt)
case sm: SignedMessage =>
ops.verify(sm).flatMap {
case false => ZIO.fail(ValidationFailed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,33 @@ type HASH = String
// messages
type XRequestID = String // x-request-id

enum MessageType {
case Mediator, User
}

case class MessageItem(
_id: HASH,
msg: SignedMessage | EncryptedMessage,
headers: ProtectedHeader | Seq[SignProtectedHeader],
ts: String,
ts: Instant,
message_type: MessageType,
xRequestId: Option[XRequestID]
)
object MessageItem {
def apply(msg: SignedMessage | EncryptedMessage, xRequestId: Option[XRequestID]): MessageItem =
def apply(msg: SignedMessage | EncryptedMessage, messageType: MessageType, xRequestId: Option[XRequestID]): MessageItem =
val now = Instant.now()
msg match {
case sMsg: SignedMessage =>
new MessageItem(
msg.sha256,
msg,
sMsg.signatures.map(_.`protected`.obj),
Instant.now().toString,
now,
messageType,
xRequestId
)
case eMsg: EncryptedMessage =>
new MessageItem(msg.sha256, msg, eMsg.`protected`.obj, Instant.now().toString, xRequestId)
new MessageItem(msg.sha256, msg, eMsg.`protected`.obj, now, messageType, xRequestId)
}

given BSONWriter[ProtectedHeader | Seq[SignProtectedHeader]] with {
Expand Down Expand Up @@ -72,6 +79,25 @@ object MessageItem {
}
}

given BSONWriter[MessageType] with
def writeTry(value: MessageType): Try[BSONValue] = Try {
value match {
case MessageType.Mediator => BSONString("Mediator")
case MessageType.User => BSONString("User")
}
}

given BSONReader[MessageType] with
def readTry(bson: BSONValue): Try[MessageType] = Try {
bson match {
case BSONString("Mediator") => MessageType.Mediator
case BSONString("User") => MessageType.User
case _ => throw new RuntimeException("Invalid MessagePurpose value in BSON")
}
}



given BSONDocumentWriter[MessageItem] = Macros.writer[MessageItem]
given BSONDocumentReader[MessageItem] = Macros.reader[MessageItem]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.hyperledger.identus.mediator.db

import fmgp.did.*
import fmgp.did.comm.{SignedMessage, EncryptedMessage}
import fmgp.did.comm.{EncryptedMessage, SignedMessage}
import org.hyperledger.identus.mediator.db.MessageType.Mediator
import org.hyperledger.identus.mediator.{DuplicateMessage, StorageCollection, StorageError, StorageThrowable}
import reactivemongo.api.bson.*
import reactivemongo.api.bson.collection.BSONCollection
Expand All @@ -28,13 +29,13 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
.map(_.collection(collectionName))
.mapError(ex => StorageCollection(ex))

def insert(msg: SignedMessage | EncryptedMessage): IO[StorageError, WriteResult] = {
def insert(msg: SignedMessage | EncryptedMessage, messageType: MessageType = Mediator): IO[StorageError, WriteResult] = {
for {
_ <- ZIO.logInfo("insert")
_ <- ZIO.logInfo(s"insert $messageType")
xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value))
coll <- collection
result <- ZIO
.fromFuture(implicit ec => coll.insert.one(MessageItem(msg, xRequestId)))
.fromFuture(implicit ec => coll.insert.one(MessageItem(msg, messageType, xRequestId)))
.tapError(err => ZIO.logError(s"insert : ${err.getMessage}"))
.mapError {
case ex: DatabaseException if (ex.code.contains(DuplicateMessage.code)) => DuplicateMessage(ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.hyperledger.identus.mediator.db.*
import zio.*
import zio.json.*
import fmgp.did.comm.protocol.pickup3.MessageDelivery
import org.hyperledger.identus.mediator.db.MessageType.User

object ForwardMessageExecuter
extends ProtocolExecuter[
Expand All @@ -36,7 +37,7 @@ object ForwardMessageExecuter
msg <-
if (numbreOfUpdated > 0) { // Or maybe we can add all the time
for {
_ <- repoMessageItem.insert(m.msg)
_ <- repoMessageItem.insert(m.msg, User)
_ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo")

// For Live Mode
Expand Down
31 changes: 31 additions & 0 deletions migration_mediator_collection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// migration script
// Please utilize the following script to update your existing collection for Mediator release v0.14.5 and beyond.
const collectionName = 'messages';
const collectionNameUserAccount = 'user.account';
let userHashes = [];

db.getCollection(collectionNameUserAccount).find({}).forEach(function(user) {
user.messagesRef.forEach(function(messageRef) {
userHashes.push(messageRef.hash);
});
});

db.getCollection('messages').find({}).forEach(function(message) {
let newTimestamp = new Date(message.ts);
if(userHashes.includes(message._id)) {
db.getCollection('messages').updateOne({ _id: message._id }, { $set: { message_type: 'User', ts: newTimestamp } });
} else {
db.getCollection('messages').updateOne({ _id: message._id }, { $set: { message_type: 'Mediator', ts: newTimestamp } });
}
});

// There are 2 message types `Mediator` and `User` Please follow the Readme for more details in the section Mediator storage
const expireAfterSeconds = 7 * 24 * 60 * 60; // 7 day * 24 hours * 60 minutes * 60 seconds
db.getCollection(collectionMessages).createIndex(
{ ts: 1 },
{
name: "message-ttl-index",
partialFilterExpression: { "message_type" : "Mediator" },
expireAfterSeconds: expireAfterSeconds
}
)

0 comments on commit f6d19f6

Please sign in to comment.