Skip to content

Commit

Permalink
Add deleting msgs and purging a stream
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed May 22, 2024
1 parent f59f4a4 commit 75f531a
Show file tree
Hide file tree
Showing 2 changed files with 341 additions and 16 deletions.
166 changes: 150 additions & 16 deletions Sources/JetStream/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ public class Stream {
}
}

/// Retrieves a raw message from stream.
/// Retrieves a message from stream.
///
/// - Parameters:
/// - sequence: The sequence of the message in the stream.
/// - subject: The stream subject the message should be retrieved from.
/// When combined with `seq` will return the first msg with seq >= of the specified sequence.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
/// - Returns ``StreamMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
Expand All @@ -75,7 +75,7 @@ public class Stream {
///
/// - Parameter firstForSubject: The subject from which the first message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
/// - Returns ``StreamMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
Expand All @@ -89,7 +89,7 @@ public class Stream {
///
/// - Parameter lastForSubject: The stream subject for which the last available message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
/// - Returns ``StreamMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
Expand All @@ -99,10 +99,10 @@ public class Stream {
return try await getMessage(request: request)
}

/// Retrieves a raw message from stream.
/// Retrieves a message from stream.
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(sequence:subject:)``, as it can fetch ``RawMessage``
/// This is different from ``Stream/getMsg(sequence:subject:)``, as it can fetch ``StreamMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
Expand All @@ -111,7 +111,7 @@ public class Stream {
/// - subject: The stream subject the message should be retrieved from.
/// When combined with `seq` will return the first msg with seq >= of the specified sequence.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
/// - Returns ``StreamMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
Expand All @@ -126,13 +126,13 @@ public class Stream {
/// Retrieves the first message on the stream for a given subject.
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(firstForSubject:)``, as it can fetch ``RawMessage``
/// This is different from ``Stream/getMsg(firstForSubject:)``, as it can fetch ``StreamMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// - Parameter firstForSubject: The subject from which the first message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
/// - Returns ``StreamMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
Expand All @@ -145,13 +145,13 @@ public class Stream {
/// Retrieves last message on a stream for a given subject
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(lastForSubject:)``, as it can fetch ``RawMessage``
/// This is different from ``Stream/getMsg(lastForSubject:)``, as it can fetch ``StreamMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// - Parameter lastForSubject: The stream subject for which the last available message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
/// - Returns ``StreamMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
Expand All @@ -161,11 +161,95 @@ public class Stream {
return try await getMessageDirect(request: request)
}

/// Removes a message with provided sequence from the stream.
/// Requires ``StreamConfig/denyDelete`` to be false.
///
/// - Parameters:
/// - sequence: The sequence of the message in the stream.
/// - secure: If set to true, the message will be permanently removed from the stream (overwritten with random data).
/// Otherwise, it will be marked as deleted.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an error or the response is invalid
public func deleteMessage(sequence: UInt64, secure: Bool = false) async throws {
var request: DeleteMessageRequest
if secure {
request = DeleteMessageRequest(seq: sequence)
} else {
request = DeleteMessageRequest(seq: sequence, noErase: true)
}
let subject = "STREAM.MSG.DELETE.\(info.config.name)"
let requestData = try JSONEncoder().encode(request)

let resp: Response<DeleteMessageResponse> = try await ctx.request(
subject, message: requestData)

switch resp {
case .success(_):
return
case .error(let err):
throw err.error
}
}

/// Purges messages from the stream. If `subject` is not provided, all messages on a stream will be permanently removed.
/// Requires ``StreamConfig/denyPurge`` to be false.
///
/// - Parameter subject:when set, filters the subject from which the messages will be removed (may contain wildcards).
///
/// - Returns the number of messages purged.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an error or the response is invalid.
public func purge(subject: String? = nil) async throws -> UInt64 {
let request = PurgeRequest(filter: subject)

return try await purge(request: request)
}

/// Purges messages from the stream up to the given stream sequence (non-inclusive).
/// Requires ``StreamConfig/denyPurge`` to be false.
///
/// - Parameters:
/// - sequence: the upper bound sequence for messages to be deleted (non-inclusive).
/// - subject: when set, filters the subject from which the messages will be removed (may contain wildcards).
///
/// - Returns the number of messages purged.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an error or the response is invalid.
public func purge(sequence: UInt64, subject: String? = nil) async throws -> UInt64 {
let request = PurgeRequest(seq: sequence, filter: subject)

return try await purge(request: request)
}

/// Purges messages from the stream, retaining the provided number of messages).
/// Requires ``StreamConfig/denyPurge`` to be false.
///
/// - Parameters:
/// - keep: the number of messages to be retained. If there are less matching messages on than this number, no messages will be purged.
/// - subject: when set, filters the subject from which the messages will be removed (may contain wildcards).
///
/// - Returns the number of messages purged.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an error or the response is invalid.
public func purge(keep: UInt64, subject: String? = nil) async throws -> UInt64 {
let request = PurgeRequest(keep: keep, filter: subject)

return try await purge(request: request)
}

private func getMessage(request: GetMessageRequest) async throws -> StreamMessage? {
let subject = "STREAM.MSG.GET.\(info.config.name)"
let requestData = try JSONEncoder().encode(request)

let resp: Response<GetRawMessageResp> = try await ctx.request(subject, message: requestData)
let resp: Response<GetMessageResp> = try await ctx.request(subject, message: requestData)

switch resp {
case .success(let msg):
Expand Down Expand Up @@ -223,6 +307,20 @@ public class Stream {
time: timeStamp.description)
}

private func purge(request: PurgeRequest) async throws -> UInt64 {
let subject = "STREAM.PURGE.\(info.config.name)"
let requestData = try JSONEncoder().encode(request)

let resp: Response<PurgeResponse> = try await ctx.request(subject, message: requestData)

switch resp {
case .success(let result):
return result.purged
case .error(let err):
throw err.error
}
}

internal struct GetMessageRequest: Codable {
internal let seq: UInt64?
internal let nextBySubject: String?
Expand Down Expand Up @@ -253,6 +351,42 @@ public class Stream {
}
}

private struct DeleteMessageRequest: Codable {
internal let seq: UInt64
internal let noErase: Bool?

init(seq: UInt64, noErase: Bool? = nil) {
self.seq = seq
self.noErase = noErase
}

enum CodingKeys: String, CodingKey {
case seq
case noErase = "no_erase"
}
}

internal struct DeleteMessageResponse: Codable {
internal let success: Bool
}

private struct PurgeRequest: Codable {
internal let seq: UInt64?
internal let keep: UInt64?
internal let filter: String?

init(seq: UInt64? = nil, keep: UInt64? = nil, filter: String? = nil) {
self.seq = seq
self.keep = keep
self.filter = filter
}
}

internal struct PurgeResponse: Codable {
internal let success: Bool
internal let purged: UInt64
}

static func validate(name: String) throws {
guard !name.isEmpty else {
throw StreamValidationError.nameRequired
Expand Down Expand Up @@ -863,8 +997,8 @@ public struct PeerInfo: Codable {
}
}

internal struct GetRawMessageResp: Codable {
internal struct StoredRawMessage: Codable {
internal struct GetMessageResp: Codable {
internal struct StoredMessage: Codable {
public let subject: String
public let sequence: UInt64
public let payload: Data
Expand All @@ -880,7 +1014,7 @@ internal struct GetRawMessageResp: Codable {
}
}

internal let message: StoredRawMessage
internal let message: StoredMessage
}

/// Represents a message persisted on a stream.
Expand Down Expand Up @@ -911,7 +1045,7 @@ public struct StreamMessage {
self.time = time
}

internal init(from storedMsg: GetRawMessageResp.StoredRawMessage) throws {
internal init(from storedMsg: GetMessageResp.StoredMessage) throws {
self.subject = storedMsg.subject
self.sequence = storedMsg.sequence
self.payload = storedMsg.payload
Expand Down
Loading

0 comments on commit 75f531a

Please sign in to comment.