Skip to content

Commit

Permalink
Add fetching individual messages from stream
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed May 20, 2024
1 parent 9d0a36a commit 523fb2f
Show file tree
Hide file tree
Showing 5 changed files with 457 additions and 38 deletions.
10 changes: 8 additions & 2 deletions Sources/JetStream/JetStreamContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public class JetStreamContext {

extension JetStreamContext {
// fix the error type. Add AckError
public func publish(_ subject: String, message: Data) async throws -> AckFuture {
public func publish(_ subject: String, message: Data, headers: NatsHeaderMap? = nil) async throws -> AckFuture {

let inbox = nextNuid()
let sub = try await self.client.subscribe(subject: inbox)
try await self.client.publish(message, subject: subject, reply: inbox)
try await self.client.publish(message, subject: subject, reply: inbox, headers: headers)
return AckFuture(sub: sub, timeout: self.timeout)
}

Expand All @@ -67,6 +67,12 @@ extension JetStreamContext {

return try decoder.decode(Response<T>.self, from: payload)
}

internal func request(_ subject: String, message: Data? = nil) async throws -> NatsMessage {
let data = message ?? Data()
return try await self.client.request(
data, subject: "\(self.prefix).\(subject)", timeout: self.timeout)
}
}

public struct AckFuture {
Expand Down
293 changes: 292 additions & 1 deletion Sources/JetStream/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@
import Foundation
import Nats

/// Exposes a set of operations performed on a stream:
/// - fetching stream info
/// - fetching individual messages from the stream
/// - deleting messages from a stream
/// - purging a stream
/// - operating on Consumers
public class Stream {

/// Contains information about the stream.
/// Note that this may be out of date and reading it does not query the server.
/// For up-to-date stream info use ``Stream/info()``
public internal(set) var info: StreamInfo
internal let ctx: JetStreamContext

Expand All @@ -23,6 +33,14 @@ public class Stream {
self.info = info
}

/// Retrieves information about the stream
/// This also refreshes ``Stream/info``
///
/// - Returns ``StreamInfo`` from the server
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful
/// - ``JetStreamError`` if the server responded with an API error
public func info() async throws -> StreamInfo {
let subj = "STREAM.INFO.\(info.config.name)"
let info: Response<StreamInfo> = try await ctx.request(subj)
Expand All @@ -35,6 +53,199 @@ public class Stream {
}
}

/// Retrieves a raw message from stream.
///
/// - Parameters:
/// - sequence: The sequence of the message in the stream.
/// - subject: The stream subject the message should be retrieved from.
/// When combined with `seq` will return the first msg with seq >= of the specified sequence.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an API error.
public func getMsg(sequence: UInt64, subject: String? = nil) async throws -> RawMessage {
let request = GetMsgRequest(seq: sequence, next: subject)
return try await getRawMsg(request: request)
}

/// Retrieves the first message on the stream for a given subject.
///
/// - Parameter firstForSubject: The subject from which the first message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an API error.
public func getMsg(firstForSubject: String) async throws -> RawMessage {
let request = GetMsgRequest(next: firstForSubject)
return try await getRawMsg(request: request)
}

/// Retrieves last message on a stream for a given subject
///
/// - Parameter lastForSubject: The stream subject for which the last available message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an API error.
public func getMsg(lastForSubject: String) async throws -> RawMessage {
let request = GetMsgRequest(last: lastForSubject)
return try await getRawMsg(request: request)
}

/// Retrieves a raw message from stream.
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(sequence:subject:)``, as it can fetch ``RawMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// - Parameters:
/// - sequence: The sequence of the message in the stream.
/// - subject: The stream subject the message should be retrieved from.
/// When combined with `seq` will return the first msg with seq >= of the specified sequence.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid
public func getMsgDirect(sequence: UInt64, subject: String? = nil) async throws -> RawMessage {
let request = GetMsgRequest(seq: sequence, next: subject)
return try await getRawMsgDirect(request: request)
}

/// Retrieves the first message on the stream for a given subject.
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(firstForSubject:)``, as it can fetch ``RawMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// - Parameter firstForSubject: The subject from which the first message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid
public func getMsgDirect(firstForSubject: String) async throws -> RawMessage {
let request = GetMsgRequest(next: firstForSubject)
return try await getRawMsgDirect(request: request)
}

/// Retrieves last message on a stream for a given subject
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(lastForSubject:)``, as it can fetch ``RawMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// - Parameter lastForSubject: The stream subject for which the last available message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid
public func getMsgDirect(lastForSubject: String) async throws -> RawMessage {
let request = GetMsgRequest(last: lastForSubject)
return try await getRawMsgDirect(request: request)
}

private func getRawMsg(request: GetMsgRequest) async throws -> RawMessage {
let subject = "STREAM.MSG.GET.\(info.config.name)"
let requestData = try JSONEncoder().encode(request)

let resp: Response<GetRawMessageResp> = try await ctx.request(subject, message: requestData)

switch resp {
case .success(let msg):
return try RawMessage(from: msg.message)
case .error(let err):
throw err.error
}
}

private func getRawMsgDirect(request: GetMsgRequest) async throws -> RawMessage {
let subject = "DIRECT.GET.\(info.config.name)"
let requestData = try JSONEncoder().encode(request)

let resp = try await ctx.request(subject, message: requestData)

if let status = resp.status {
if status == StatusCode.notFound {
throw JetStreamDirectGetError.msgNotFound
}
throw JetStreamDirectGetError.errorResponse(status, resp.description)
}

guard let headers = resp.headers else {
throw JetStreamDirectGetError.invalidResponse("response should contain headers")
}

guard headers[.natsStream] != nil else {
throw JetStreamDirectGetError.invalidResponse("missing Nats-Stream header")
}

guard let seqHdr = headers[.natsSequence] else {
throw JetStreamDirectGetError.invalidResponse("missing Nats-Sequence header")
}

let seq = UInt64(seqHdr.description)
if seq == nil {
throw JetStreamDirectGetError.invalidResponse("invalid Nats-Sequence header: \(seqHdr)")
}

guard let timeStamp = headers[.natsTimestamp] else {
throw JetStreamDirectGetError.invalidResponse("missing Nats-Timestamp header")
}

guard let subject = headers[.natsSubject] else {
throw JetStreamDirectGetError.invalidResponse("missing Nats-Subject header")
}

let payload = resp.payload ?? Data()

return RawMessage(subject: subject.description, sequence: seq!, payload: payload, headers: resp.headers, time: timeStamp.description)
}


internal struct GetMsgRequest: Codable {
internal let seq: UInt64?
internal let nextBySubject: String?
internal let lastBySubject: String?

internal init(seq: UInt64, next: String?) {
self.seq = seq
self.nextBySubject = next
self.lastBySubject = nil
}

internal init(next: String) {
self.seq = nil
self.nextBySubject = next
self.lastBySubject = nil
}

internal init(last: String) {
self.seq = nil
self.nextBySubject = nil
self.lastBySubject = last
}

enum CodingKeys: String, CodingKey {
case seq
case nextBySubject = "next_by_subj"
case lastBySubject = "last_by_subj"
}
}

static func validate(name: String) throws {
guard !name.isEmpty else {
throw StreamValidationError.nameRequired
Expand All @@ -47,7 +258,29 @@ public class Stream {
}
}

public enum StreamValidationError: NatsError {
public enum JetStreamDirectGetError: NatsError, Equatable {
case msgNotFound
case invalidResponse(String)
case errorResponse(StatusCode, String?)

public var description: String {
switch self {
case .msgNotFound:
return "message not found"
case .invalidResponse(let cause):
return "invalid response: \(cause)"
case .errorResponse(let code, let description):
if let description {
return "unable to get message: \(code) \(description)"
} else {
return "unable to get message: \(code)"
}
}
}
}

/// Returned when a provided ``StreamConfig`` is not valid.
public enum StreamValidationError: NatsError, Equatable {
case nameRequired
case invalidCharacterFound(String)

Expand Down Expand Up @@ -622,3 +855,61 @@ public struct PeerInfo: Codable {
case lag
}
}

internal struct GetRawMessageResp: Codable {
internal struct StoredRawMessage: Codable {
public let subject: String
public let sequence: UInt64
public let payload: Data
public let headers: Data?
public let time: String

enum CodingKeys: String, CodingKey {
case subject
case sequence = "seq"
case payload = "data"
case headers = "hdrs"
case time
}
}

internal let message: StoredRawMessage
}

public struct RawMessage {

/// Subject of the message.
public let subject: String

/// Sequence of the message.
public let sequence: UInt64

/// Raw payload of the message as a base64 encoded string.
public let payload: Data

/// Raw header string, if any.
public let headers: NatsHeaderMap?

/// The time the message was published.
public let time: String

internal init(subject: String, sequence: UInt64, payload: Data, headers: NatsHeaderMap?, time: String) {
self.subject = subject
self.sequence = sequence
self.payload = payload
self.headers = headers
self.time = time
}

internal init(from storedMsg: GetRawMessageResp.StoredRawMessage) throws {
self.subject = storedMsg.subject
self.sequence = storedMsg.sequence
self.payload = storedMsg.payload
if let headers = storedMsg.headers, let headersStr = String(data: headers, encoding: .utf8) {
self.headers = try NatsHeaderMap(from: headersStr)
} else {
self.headers = nil
}
self.time = storedMsg.time
}
}
Loading

0 comments on commit 523fb2f

Please sign in to comment.