From 523fb2f17f2e3c8abc7bf1d9628657e6f0ecabff Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 20 May 2024 16:38:17 +0200 Subject: [PATCH] Add fetching individual messages from stream Signed-off-by: Piotr Piotrowski --- Sources/JetStream/JetStreamContext.swift | 10 +- Sources/JetStream/Stream.swift | 293 +++++++++++++++++- Sources/Nats/Extensions/Data+Parser.swift | 39 +-- Sources/Nats/NatsHeaders.swift | 47 +++ .../Integration/JetStreamTests.swift | 106 +++++++ 5 files changed, 457 insertions(+), 38 deletions(-) diff --git a/Sources/JetStream/JetStreamContext.swift b/Sources/JetStream/JetStreamContext.swift index eb7f82d..d763ab6 100644 --- a/Sources/JetStream/JetStreamContext.swift +++ b/Sources/JetStream/JetStreamContext.swift @@ -45,11 +45,11 @@ public class JetStreamContext { extension JetStreamContext { // fix the error type. Add AckError - public func publish(_ subject: String, message: Data) async throws -> AckFuture { + public func publish(_ subject: String, message: Data, headers: NatsHeaderMap? = nil) async throws -> AckFuture { let inbox = nextNuid() let sub = try await self.client.subscribe(subject: inbox) - try await self.client.publish(message, subject: subject, reply: inbox) + try await self.client.publish(message, subject: subject, reply: inbox, headers: headers) return AckFuture(sub: sub, timeout: self.timeout) } @@ -67,6 +67,12 @@ extension JetStreamContext { return try decoder.decode(Response.self, from: payload) } + + internal func request(_ subject: String, message: Data? = nil) async throws -> NatsMessage { + let data = message ?? Data() + return try await self.client.request( + data, subject: "\(self.prefix).\(subject)", timeout: self.timeout) + } } public struct AckFuture { diff --git a/Sources/JetStream/Stream.swift b/Sources/JetStream/Stream.swift index 2fb42b2..5f27fa9 100644 --- a/Sources/JetStream/Stream.swift +++ b/Sources/JetStream/Stream.swift @@ -14,7 +14,17 @@ import Foundation import Nats +/// Exposes a set of operations performed on a stream: +/// - fetching stream info +/// - fetching individual messages from the stream +/// - deleting messages from a stream +/// - purging a stream +/// - operating on Consumers public class Stream { + + /// Contains information about the stream. + /// Note that this may be out of date and reading it does not query the server. + /// For up-to-date stream info use ``Stream/info()`` public internal(set) var info: StreamInfo internal let ctx: JetStreamContext @@ -23,6 +33,14 @@ public class Stream { self.info = info } + /// Retrieves information about the stream + /// This also refreshes ``Stream/info`` + /// + /// - Returns ``StreamInfo`` from the server + /// + /// - Throws: + /// - ``JetStreamRequestError`` if the request was unsuccesful + /// - ``JetStreamError`` if the server responded with an API error public func info() async throws -> StreamInfo { let subj = "STREAM.INFO.\(info.config.name)" let info: Response = try await ctx.request(subj) @@ -35,6 +53,199 @@ public class Stream { } } + /// Retrieves a raw message from stream. + /// + /// - Parameters: + /// - sequence: The sequence of the message in the stream. + /// - subject: The stream subject the message should be retrieved from. + /// When combined with `seq` will return the first msg with seq >= of the specified sequence. + /// + /// - Returns ``RawMessage`` containing message payload, headers and metadata. + /// + /// - Throws: + /// - ``JetStreamRequestError`` if the request was unsuccesful. + /// - ``JetStreamError`` if the server responded with an API error. + public func getMsg(sequence: UInt64, subject: String? = nil) async throws -> RawMessage { + let request = GetMsgRequest(seq: sequence, next: subject) + return try await getRawMsg(request: request) + } + + /// Retrieves the first message on the stream for a given subject. + /// + /// - Parameter firstForSubject: The subject from which the first message should be retrieved. + /// + /// - Returns ``RawMessage`` containing message payload, headers and metadata. + /// + /// - Throws: + /// - ``JetStreamRequestError`` if the request was unsuccesful. + /// - ``JetStreamError`` if the server responded with an API error. + public func getMsg(firstForSubject: String) async throws -> RawMessage { + let request = GetMsgRequest(next: firstForSubject) + return try await getRawMsg(request: request) + } + + /// Retrieves last message on a stream for a given subject + /// + /// - Parameter lastForSubject: The stream subject for which the last available message should be retrieved. + /// + /// - Returns ``RawMessage`` containing message payload, headers and metadata. + /// + /// - Throws: + /// - ``JetStreamRequestError`` if the request was unsuccesful. + /// - ``JetStreamError`` if the server responded with an API error. + public func getMsg(lastForSubject: String) async throws -> RawMessage { + let request = GetMsgRequest(last: lastForSubject) + return try await getRawMsg(request: request) + } + + /// Retrieves a raw message from stream. + /// + /// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`. + /// This is different from ``Stream/getMsg(sequence:subject:)``, as it can fetch ``RawMessage`` + /// from any replica member. This means read after write is possible, + /// as that given replica might not yet catch up with the leader. + /// + /// - Parameters: + /// - sequence: The sequence of the message in the stream. + /// - subject: The stream subject the message should be retrieved from. + /// When combined with `seq` will return the first msg with seq >= of the specified sequence. + /// + /// - Returns ``RawMessage`` containing message payload, headers and metadata. + /// + /// - Throws: + /// - ``JetStreamRequestError`` if the request was unsuccesful. + /// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid + public func getMsgDirect(sequence: UInt64, subject: String? = nil) async throws -> RawMessage { + let request = GetMsgRequest(seq: sequence, next: subject) + return try await getRawMsgDirect(request: request) + } + + /// Retrieves the first message on the stream for a given subject. + /// + /// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`. + /// This is different from ``Stream/getMsg(firstForSubject:)``, as it can fetch ``RawMessage`` + /// from any replica member. This means read after write is possible, + /// as that given replica might not yet catch up with the leader. + /// + /// - Parameter firstForSubject: The subject from which the first message should be retrieved. + /// + /// - Returns ``RawMessage`` containing message payload, headers and metadata. + /// + /// - Throws: + /// - ``JetStreamRequestError`` if the request was unsuccesful. + /// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid + public func getMsgDirect(firstForSubject: String) async throws -> RawMessage { + let request = GetMsgRequest(next: firstForSubject) + return try await getRawMsgDirect(request: request) + } + + /// Retrieves last message on a stream for a given subject + /// + /// Requires a ``Stream`` with ``StreamConfig/allowDirect`` set to `true`. + /// This is different from ``Stream/getMsg(lastForSubject:)``, as it can fetch ``RawMessage`` + /// from any replica member. This means read after write is possible, + /// as that given replica might not yet catch up with the leader. + /// + /// - Parameter lastForSubject: The stream subject for which the last available message should be retrieved. + /// + /// - Returns ``RawMessage`` containing message payload, headers and metadata. + /// + /// - Throws: + /// - ``JetStreamRequestError`` if the request was unsuccesful. + /// - ``JetStreamDirectGetError`` if the server responded with an error or the response is invalid + public func getMsgDirect(lastForSubject: String) async throws -> RawMessage { + let request = GetMsgRequest(last: lastForSubject) + return try await getRawMsgDirect(request: request) + } + + private func getRawMsg(request: GetMsgRequest) async throws -> RawMessage { + let subject = "STREAM.MSG.GET.\(info.config.name)" + let requestData = try JSONEncoder().encode(request) + + let resp: Response = try await ctx.request(subject, message: requestData) + + switch resp { + case .success(let msg): + return try RawMessage(from: msg.message) + case .error(let err): + throw err.error + } + } + + private func getRawMsgDirect(request: GetMsgRequest) async throws -> RawMessage { + let subject = "DIRECT.GET.\(info.config.name)" + let requestData = try JSONEncoder().encode(request) + + let resp = try await ctx.request(subject, message: requestData) + + if let status = resp.status { + if status == StatusCode.notFound { + throw JetStreamDirectGetError.msgNotFound + } + throw JetStreamDirectGetError.errorResponse(status, resp.description) + } + + guard let headers = resp.headers else { + throw JetStreamDirectGetError.invalidResponse("response should contain headers") + } + + guard headers[.natsStream] != nil else { + throw JetStreamDirectGetError.invalidResponse("missing Nats-Stream header") + } + + guard let seqHdr = headers[.natsSequence] else { + throw JetStreamDirectGetError.invalidResponse("missing Nats-Sequence header") + } + + let seq = UInt64(seqHdr.description) + if seq == nil { + throw JetStreamDirectGetError.invalidResponse("invalid Nats-Sequence header: \(seqHdr)") + } + + guard let timeStamp = headers[.natsTimestamp] else { + throw JetStreamDirectGetError.invalidResponse("missing Nats-Timestamp header") + } + + guard let subject = headers[.natsSubject] else { + throw JetStreamDirectGetError.invalidResponse("missing Nats-Subject header") + } + + let payload = resp.payload ?? Data() + + return RawMessage(subject: subject.description, sequence: seq!, payload: payload, headers: resp.headers, time: timeStamp.description) + } + + + internal struct GetMsgRequest: Codable { + internal let seq: UInt64? + internal let nextBySubject: String? + internal let lastBySubject: String? + + internal init(seq: UInt64, next: String?) { + self.seq = seq + self.nextBySubject = next + self.lastBySubject = nil + } + + internal init(next: String) { + self.seq = nil + self.nextBySubject = next + self.lastBySubject = nil + } + + internal init(last: String) { + self.seq = nil + self.nextBySubject = nil + self.lastBySubject = last + } + + enum CodingKeys: String, CodingKey { + case seq + case nextBySubject = "next_by_subj" + case lastBySubject = "last_by_subj" + } + } + static func validate(name: String) throws { guard !name.isEmpty else { throw StreamValidationError.nameRequired @@ -47,7 +258,29 @@ public class Stream { } } -public enum StreamValidationError: NatsError { +public enum JetStreamDirectGetError: NatsError, Equatable { + case msgNotFound + case invalidResponse(String) + case errorResponse(StatusCode, String?) + + public var description: String { + switch self { + case .msgNotFound: + return "message not found" + case .invalidResponse(let cause): + return "invalid response: \(cause)" + case .errorResponse(let code, let description): + if let description { + return "unable to get message: \(code) \(description)" + } else { + return "unable to get message: \(code)" + } + } + } +} + +/// Returned when a provided ``StreamConfig`` is not valid. +public enum StreamValidationError: NatsError, Equatable { case nameRequired case invalidCharacterFound(String) @@ -622,3 +855,61 @@ public struct PeerInfo: Codable { case lag } } + +internal struct GetRawMessageResp: Codable { + internal struct StoredRawMessage: Codable { + public let subject: String + public let sequence: UInt64 + public let payload: Data + public let headers: Data? + public let time: String + + enum CodingKeys: String, CodingKey { + case subject + case sequence = "seq" + case payload = "data" + case headers = "hdrs" + case time + } + } + + internal let message: StoredRawMessage +} + +public struct RawMessage { + + /// Subject of the message. + public let subject: String + + /// Sequence of the message. + public let sequence: UInt64 + + /// Raw payload of the message as a base64 encoded string. + public let payload: Data + + /// Raw header string, if any. + public let headers: NatsHeaderMap? + + /// The time the message was published. + public let time: String + + internal init(subject: String, sequence: UInt64, payload: Data, headers: NatsHeaderMap?, time: String) { + self.subject = subject + self.sequence = sequence + self.payload = payload + self.headers = headers + self.time = time + } + + internal init(from storedMsg: GetRawMessageResp.StoredRawMessage) throws { + self.subject = storedMsg.subject + self.sequence = storedMsg.sequence + self.payload = storedMsg.payload + if let headers = storedMsg.headers, let headersStr = String(data: headers, encoding: .utf8) { + self.headers = try NatsHeaderMap(from: headersStr) + } else { + self.headers = nil + } + self.time = storedMsg.time + } +} diff --git a/Sources/Nats/Extensions/Data+Parser.swift b/Sources/Nats/Extensions/Data+Parser.swift index 9a18e6e..aa5c538 100644 --- a/Sources/Nats/Extensions/Data+Parser.swift +++ b/Sources/Nats/Extensions/Data+Parser.swift @@ -19,7 +19,7 @@ extension Data { private static let crlf = Data([cr, lf]) private static var currentNum = 0 private static var errored = false - private static let versionLinePrefix = "NATS/1.0" + internal static let versionLinePrefix = "NATS/1.0" func removePrefix(_ prefix: Data) -> Data { guard self.starts(with: prefix) else { return self } @@ -152,41 +152,10 @@ extension Data { let headersData = self[headersStartIndex.. 0 { - let statusAndDesc = versionLineSuffix.split( - separator: " ", maxSplits: 1) - guard let status = StatusCode(statusAndDesc[0]) else { - throw NatsParserError("could not parse status parameter") - } - msg.status = status - if statusAndDesc.count > 1 { - msg.description = String(statusAndDesc[1]) - } - } - - for header in headersArray.dropFirst() { - let headerParts = header.split(separator: ":") - if headerParts.count == 2 { - headers.append( - try NatsHeaderName(String(headerParts[0])), - NatsHeaderValue(String(headerParts[1]))) - } else { - logger.error("Error parsing header: \(header)") - } - } + headers = try NatsHeaderMap(from: headersString) } + msg.status = headers.status + msg.description = headers.description msg.headers = headers if var payload = payload { diff --git a/Sources/Nats/NatsHeaders.swift b/Sources/Nats/NatsHeaders.swift index fad2c30..d8d0c0f 100644 --- a/Sources/Nats/NatsHeaders.swift +++ b/Sources/Nats/NatsHeaders.swift @@ -56,17 +56,59 @@ public struct NatsHeaderName: Equatable, Hashable, CustomStringConvertible { // Example of standard headers public static let natsStream = try! NatsHeaderName("Nats-Stream") public static let natsSequence = try! NatsHeaderName("Nats-Sequence") + public static let natsTimestamp = try! NatsHeaderName("Nats-Time-Stamp") + public static let natsSubject = try! NatsHeaderName("Nats-Subject") // Add other standard headers as needed... } // Represents a NATS header map in Swift. public struct NatsHeaderMap: Equatable { private var inner: [NatsHeaderName: [NatsHeaderValue]] + internal var status: StatusCode? = nil + internal var description: String? = nil public init() { self.inner = [:] } + public init(from headersString: String) throws { + self.inner = [:] + let headersArray = headersString.split(separator: "\r\n") + let versionLine = headersArray[0] + guard versionLine.hasPrefix(Data.versionLinePrefix) else { + throw NatsParserError( + "header version line does not begin with `NATS/1.0`") + } + let versionLineSuffix = + versionLine + .dropFirst(Data.versionLinePrefix.count) + .trimmingCharacters(in: .whitespacesAndNewlines) + + // handle inlines status and description + if versionLineSuffix.count > 0 { + let statusAndDesc = versionLineSuffix.split( + separator: " ", maxSplits: 1) + guard let status = StatusCode(statusAndDesc[0]) else { + throw NatsParserError("could not parse status parameter") + } + self.status = status + if statusAndDesc.count > 1 { + self.description = String(statusAndDesc[1]) + } + } + + for header in headersArray.dropFirst() { + let headerParts = header.split(separator: ":", maxSplits: 1) + if headerParts.count == 2 { + self.append( + try NatsHeaderName(String(headerParts[0])), + NatsHeaderValue(String(headerParts[1]).trimmingCharacters(in: .whitespaces))) + } else { + logger.error("Error parsing header: \(header)") + } + } + } + var isEmpty: Bool { return inner.isEmpty } @@ -106,6 +148,11 @@ public struct NatsHeaderMap: Equatable { bytes.append(contentsOf: "\r\n".utf8) return bytes } + + // Implementing the == operator to exclude status and desc internal properties + public static func == (lhs: NatsHeaderMap, rhs: NatsHeaderMap) -> Bool { + return lhs.inner == rhs.inner + } } extension NatsHeaderMap { diff --git a/Tests/JetStreamTests/Integration/JetStreamTests.swift b/Tests/JetStreamTests/Integration/JetStreamTests.swift index a16c398..47079bb 100644 --- a/Tests/JetStreamTests/Integration/JetStreamTests.swift +++ b/Tests/JetStreamTests/Integration/JetStreamTests.swift @@ -27,6 +27,8 @@ class JetStreamTests: XCTestCase { ("testStreamConfig", testStreamConfig), ("testStreamInfo", testStreamInfo), ("testListStreams", testListStreams), + ("testGetMsg", testGetMsg), + ("testGetMsgDirect", testGetMsgDirect), ] var natsServer = NatsServer() @@ -310,4 +312,108 @@ class JetStreamTests: XCTestCase { } XCTAssertEqual(i, 0) } + + func testGetMsg() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let cfg = StreamConfig(name: "STREAM", subjects: ["foo.*"]) + let stream = try await ctx.createStream(cfg: cfg) + + var hm = NatsHeaderMap() + hm[try! NatsHeaderName("foo")] = NatsHeaderValue("bar") + hm.append(try! NatsHeaderName("foo"), NatsHeaderValue("baz")) + hm[try! NatsHeaderName("key")] = NatsHeaderValue("val") + for i in 1...100 { + let msg = "\(i)".data(using: .utf8)! + let subj = i%2 == 0 ? "foo.A" : "foo.B" + let ack = try await ctx.publish(subj, message: msg, headers: hm) + _ = try await ack.wait() + } + + // get by sequence + var msg = try await stream.getMsg(sequence: 50) + XCTAssertEqual(msg.payload, "50".data(using: .utf8)!) + + // get by sequence and subject + msg = try await stream.getMsg(sequence: 50, subject: "foo.B") + // msg with sequence 50 is on subject foo.A, so we expect the next message which should be on foo.B + XCTAssertEqual(msg.payload, "51".data(using: .utf8)!) + XCTAssertEqual(msg.headers, hm) + + // get first message from a subject + msg = try await stream.getMsg(firstForSubject: "foo.A") + XCTAssertEqual(msg.payload, "2".data(using: .utf8)!) + XCTAssertEqual(msg.headers, hm) + + // get last message from subject + msg = try await stream.getMsg(lastForSubject: "foo.B") + XCTAssertEqual(msg.payload, "99".data(using: .utf8)!) + XCTAssertEqual(msg.headers, hm) + + // message not found + do { + _ = try await stream.getMsg(sequence: 200) + } catch let err as JetStreamError { + XCTAssertEqual(err.errorCode, .noMessageFound) + } + } + + + func testGetMsgDirect() async throws { + let bundle = Bundle.module + natsServer.start( + cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath) + logger.logLevel = .debug + + let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build() + try await client.connect() + + let ctx = JetStreamContext(client: client) + + let cfg = StreamConfig(name: "STREAM", subjects: ["foo.*"], allowDirect: true) + let stream = try await ctx.createStream(cfg: cfg) + + var hm = NatsHeaderMap() + hm[try! NatsHeaderName("foo")] = NatsHeaderValue("bar") + hm.append(try! NatsHeaderName("foo"), NatsHeaderValue("baz")) + hm[try! NatsHeaderName("key")] = NatsHeaderValue("val") + for i in 1...100 { + let msg = "\(i)".data(using: .utf8)! + let subj = i%2 == 0 ? "foo.A" : "foo.B" + let ack = try await ctx.publish(subj, message: msg, headers: hm) + _ = try await ack.wait() + } + + // get by sequence + var msg = try await stream.getMsgDirect(sequence: 50) + XCTAssertEqual(msg.payload, "50".data(using: .utf8)!) + + // get by sequence and subject + msg = try await stream.getMsgDirect(sequence: 50, subject: "foo.B") + // msg with sequence 50 is on subject foo.A, so we expect the next message which should be on foo.B + XCTAssertEqual(msg.payload, "51".data(using: .utf8)!) + + // get first message from a subject + msg = try await stream.getMsgDirect(firstForSubject: "foo.A") + XCTAssertEqual(msg.payload, "2".data(using: .utf8)!) + + // get last message from subject + msg = try await stream.getMsgDirect(lastForSubject: "foo.B") + XCTAssertEqual(msg.payload, "99".data(using: .utf8)!) + + // message not found + do { + msg = try await stream.getMsgDirect(sequence: 200) + } catch let err as JetStreamDirectGetError { + XCTAssertEqual(err, .msgNotFound) + } + } }