Skip to content

Commit

Permalink
Add fetching individual messages from stream (#75)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed May 22, 2024
1 parent 9d0a36a commit f59f4a4
Show file tree
Hide file tree
Showing 5 changed files with 463 additions and 38 deletions.
12 changes: 10 additions & 2 deletions Sources/JetStream/JetStreamContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ public class JetStreamContext {

extension JetStreamContext {
// fix the error type. Add AckError
public func publish(_ subject: String, message: Data) async throws -> AckFuture {
public func publish(
_ subject: String, message: Data, headers: NatsHeaderMap? = nil
) async throws -> AckFuture {

let inbox = nextNuid()
let sub = try await self.client.subscribe(subject: inbox)
try await self.client.publish(message, subject: subject, reply: inbox)
try await self.client.publish(message, subject: subject, reply: inbox, headers: headers)
return AckFuture(sub: sub, timeout: self.timeout)
}

Expand All @@ -67,6 +69,12 @@ extension JetStreamContext {

return try decoder.decode(Response<T>.self, from: payload)
}

internal func request(_ subject: String, message: Data? = nil) async throws -> NatsMessage {
let data = message ?? Data()
return try await self.client.request(
data, subject: "\(self.prefix).\(subject)", timeout: self.timeout)
}
}

public struct AckFuture {
Expand Down
304 changes: 303 additions & 1 deletion Sources/JetStream/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@
import Foundation
import Nats

/// Exposes a set of operations performed on a stream:
/// - fetching stream info
/// - fetching individual messages from the stream
/// - deleting messages from a stream
/// - purging a stream
/// - operating on Consumers
public class Stream {

/// Contains information about the stream.
/// Note that this may be out of date and reading it does not query the server.
/// For up-to-date stream info use ``Stream/info()``
public internal(set) var info: StreamInfo
internal let ctx: JetStreamContext

Expand All @@ -23,6 +33,14 @@ public class Stream {
self.info = info
}

/// Retrieves information about the stream
/// This also refreshes ``Stream/info``
///
/// - Returns ``StreamInfo`` from the server
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful
/// - ``JetStreamError`` if the server responded with an API error
public func info() async throws -> StreamInfo {
let subj = "STREAM.INFO.\(info.config.name)"
let info: Response<StreamInfo> = try await ctx.request(subj)
Expand All @@ -35,6 +53,206 @@ public class Stream {
}
}

/// Retrieves a raw message from stream.
///
/// - Parameters:
/// - sequence: The sequence of the message in the stream.
/// - subject: The stream subject the message should be retrieved from.
/// When combined with `seq` will return the first msg with seq >= of the specified sequence.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an API error.
public func getMessage(sequence: UInt64, subject: String? = nil) async throws -> StreamMessage?
{
let request = GetMessageRequest(seq: sequence, next: subject)
return try await getMessage(request: request)
}

/// Retrieves the first message on the stream for a given subject.
///
/// - Parameter firstForSubject: The subject from which the first message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an API error.
public func getMessage(firstForSubject: String) async throws -> StreamMessage? {
let request = GetMessageRequest(next: firstForSubject)
return try await getMessage(request: request)
}

/// Retrieves last message on a stream for a given subject
///
/// - Parameter lastForSubject: The stream subject for which the last available message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamError`` if the server responded with an API error.
public func getMessage(lastForSubject: String) async throws -> StreamMessage? {
let request = GetMessageRequest(last: lastForSubject)
return try await getMessage(request: request)
}

/// Retrieves a raw message from stream.
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(sequence:subject:)``, as it can fetch ``RawMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// - Parameters:
/// - sequence: The sequence of the message in the stream.
/// - subject: The stream subject the message should be retrieved from.
/// When combined with `seq` will return the first msg with seq >= of the specified sequence.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid
public func getMessageDirect(
sequence: UInt64, subject: String? = nil
) async throws -> StreamMessage? {
let request = GetMessageRequest(seq: sequence, next: subject)
return try await getMessageDirect(request: request)
}

/// Retrieves the first message on the stream for a given subject.
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(firstForSubject:)``, as it can fetch ``RawMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// - Parameter firstForSubject: The subject from which the first message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid
public func getMessageDirect(firstForSubject: String) async throws -> StreamMessage? {
let request = GetMessageRequest(next: firstForSubject)
return try await getMessageDirect(request: request)
}

/// Retrieves last message on a stream for a given subject
///
/// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`.
/// This is different from ``Stream/getMsg(lastForSubject:)``, as it can fetch ``RawMessage``
/// from any replica member. This means read after write is possible,
/// as that given replica might not yet catch up with the leader.
///
/// - Parameter lastForSubject: The stream subject for which the last available message should be retrieved.
///
/// - Returns ``RawMessage`` containing message payload, headers and metadata.
///
/// - Throws:
/// - ``JetStreamRequestError`` if the request was unsuccesful.
/// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid
public func getMessageDirect(lastForSubject: String) async throws -> StreamMessage? {
let request = GetMessageRequest(last: lastForSubject)
return try await getMessageDirect(request: request)
}

private func getMessage(request: GetMessageRequest) async throws -> StreamMessage? {
let subject = "STREAM.MSG.GET.\(info.config.name)"
let requestData = try JSONEncoder().encode(request)

let resp: Response<GetRawMessageResp> = try await ctx.request(subject, message: requestData)

switch resp {
case .success(let msg):
return try StreamMessage(from: msg.message)
case .error(let err):
if err.error.errorCode == .noMessageFound {
return nil
}
throw err.error
}
}

private func getMessageDirect(request: GetMessageRequest) async throws -> StreamMessage? {
let subject = "DIRECT.GET.\(info.config.name)"
let requestData = try JSONEncoder().encode(request)

let resp = try await ctx.request(subject, message: requestData)

if let status = resp.status {
if status == StatusCode.notFound {
return nil
}
throw JetStreamDirectGetError.errorResponse(status, resp.description)
}

guard let headers = resp.headers else {
throw JetStreamDirectGetError.invalidResponse("response should contain headers")
}

guard headers[.natsStream] != nil else {
throw JetStreamDirectGetError.invalidResponse("missing Nats-Stream header")
}

guard let seqHdr = headers[.natsSequence] else {
throw JetStreamDirectGetError.invalidResponse("missing Nats-Sequence header")
}

let seq = UInt64(seqHdr.description)
if seq == nil {
throw JetStreamDirectGetError.invalidResponse("invalid Nats-Sequence header: \(seqHdr)")
}

guard let timeStamp = headers[.natsTimestamp] else {
throw JetStreamDirectGetError.invalidResponse("missing Nats-Timestamp header")
}

guard let subject = headers[.natsSubject] else {
throw JetStreamDirectGetError.invalidResponse("missing Nats-Subject header")
}

let payload = resp.payload ?? Data()

return StreamMessage(
subject: subject.description, sequence: seq!, payload: payload, headers: resp.headers,
time: timeStamp.description)
}

internal struct GetMessageRequest: Codable {
internal let seq: UInt64?
internal let nextBySubject: String?
internal let lastBySubject: String?

internal init(seq: UInt64, next: String?) {
self.seq = seq
self.nextBySubject = next
self.lastBySubject = nil
}

internal init(next: String) {
self.seq = nil
self.nextBySubject = next
self.lastBySubject = nil
}

internal init(last: String) {
self.seq = nil
self.nextBySubject = nil
self.lastBySubject = last
}

enum CodingKeys: String, CodingKey {
case seq
case nextBySubject = "next_by_subj"
case lastBySubject = "last_by_subj"
}
}

static func validate(name: String) throws {
guard !name.isEmpty else {
throw StreamValidationError.nameRequired
Expand All @@ -47,7 +265,29 @@ public class Stream {
}
}

public enum StreamValidationError: NatsError {
public enum JetStreamDirectGetError: NatsError, Equatable {
case msgNotFound
case invalidResponse(String)
case errorResponse(StatusCode, String?)

public var description: String {
switch self {
case .msgNotFound:
return "message not found"
case .invalidResponse(let cause):
return "invalid response: \(cause)"
case .errorResponse(let code, let description):
if let description {
return "unable to get message: \(code) \(description)"
} else {
return "unable to get message: \(code)"
}
}
}
}

/// Returned when a provided ``StreamConfig`` is not valid.
public enum StreamValidationError: NatsError, Equatable {
case nameRequired
case invalidCharacterFound(String)

Expand Down Expand Up @@ -622,3 +862,65 @@ public struct PeerInfo: Codable {
case lag
}
}

internal struct GetRawMessageResp: Codable {
internal struct StoredRawMessage: Codable {
public let subject: String
public let sequence: UInt64
public let payload: Data
public let headers: Data?
public let time: String

enum CodingKeys: String, CodingKey {
case subject
case sequence = "seq"
case payload = "data"
case headers = "hdrs"
case time
}
}

internal let message: StoredRawMessage
}

/// Represents a message persisted on a stream.
public struct StreamMessage {

/// Subject of the message.
public let subject: String

/// Sequence of the message.
public let sequence: UInt64

/// Raw payload of the message as a base64 encoded string.
public let payload: Data

/// Message headers, if any.
public let headers: NatsHeaderMap?

/// The time the message was published.
public let time: String

internal init(
subject: String, sequence: UInt64, payload: Data, headers: NatsHeaderMap?, time: String
) {
self.subject = subject
self.sequence = sequence
self.payload = payload
self.headers = headers
self.time = time
}

internal init(from storedMsg: GetRawMessageResp.StoredRawMessage) throws {
self.subject = storedMsg.subject
self.sequence = storedMsg.sequence
self.payload = storedMsg.payload
if let headers = storedMsg.headers, let headersStr = String(data: headers, encoding: .utf8)
{
self.headers = try NatsHeaderMap(from: headersStr)
} else {
self.headers = nil
}
self.time = storedMsg.time
}
}
Loading

0 comments on commit f59f4a4

Please sign in to comment.