Skip to content

Commit

Permalink
Add JetStream API request
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Apr 29, 2024
1 parent 604f629 commit 0f2fe1d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 9 deletions.
27 changes: 27 additions & 0 deletions Sources/JetStream/context.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ extension JetStreamContext {
try await self.client.publish(message, subject: subject, reply: inbox)
return AckFuture(sub: sub, timeout: self.timeout)
}

internal func request<T: Codable>(_ subject: String, message: Data) async throws -> Response<T> {
let response = try await self.client.request(message, subject: "\(self.prefix).\(subject)", timeout: self.timeout)

let decoder = JSONDecoder()
// maybe empty is ok if the response type is nil and we can skip this check?
guard let payload = response.payload else {
throw JetStreamRequestError("empty response payload")
}

return try decoder.decode(Response<T>.self, from: payload)
}
}

struct AckFuture {
Expand Down Expand Up @@ -117,6 +129,13 @@ struct JetStreamPublishError: NatsError {
}
}

struct JetStreamRequestError: NatsError {
var description: String
init(_ description: String) {
self.description = description
}
}

struct Ack: Codable {
var stream: String
var seq: UInt64
Expand Down Expand Up @@ -145,3 +164,11 @@ struct Ack: Codable {
duplicate = try container.decodeIfPresent(Bool.self, forKey: .duplicate) ?? false
}
}

/// contains info about the `JetStream` usage from the current account.
public struct AccountInfo: Codable {
public let memory: Int64
public let storage: Int64
public let streams: Int64
public let consumers: Int64
}
39 changes: 30 additions & 9 deletions Sources/JetStream/errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@

import Foundation

struct JetStreamError: Codable {
var code: uint
public struct JetStreamAPIResponse: Codable {
let type: String
let error: JetStreamError
}

public struct JetStreamError: Codable {
var code: UInt
//FIXME(jrm): This should be mapped to predefined JetStream errors from the server.
var errorCode: ErrorCode
var description: String?
Expand All @@ -26,7 +31,7 @@ struct JetStreamError: Codable {
}
}

struct ErrorCode: Codable {
struct ErrorCode: Codable, Equatable {
let rawValue: UInt64
/// Peer not a member
static let clusterPeerNotMember = ErrorCode(rawValue: 10040)
Expand Down Expand Up @@ -483,12 +488,27 @@ struct ErrorCode: Codable {

}

enum Response<T: Codable>: Codable {
case success(T)
case error(JetStreamError)
extension ErrorCode {
// Encoding
func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(rawValue)
}

// Decoding
init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
let decodedValue = try container.decode(UInt64.self)
self = ErrorCode(rawValue: decodedValue)
}
}

public enum Response<T: Codable>: Codable {
case success(T)
case error(JetStreamAPIResponse)

public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()

// Try to decode the expected success type T first
if let successResponse = try? container.decode(T.self) {
Expand All @@ -497,11 +517,12 @@ enum Response<T: Codable>: Codable {
}

// If that fails, try to decode ErrorResponse
let errorResponse = try container.decode(JetStreamError.self)
let errorResponse = try container.decode(JetStreamAPIResponse.self)
self = .error(errorResponse)
return
}

func encode(to encoder: Encoder) throws {
public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
switch self {
case .success(let successData):
Expand All @@ -513,5 +534,5 @@ enum Response<T: Codable>: Codable {
}

func test() {
JetStreamError(code: 400, errorCode: ErrorCode.accountResourcesExceeded, description: nil)
// JetStreamError(code: 400, errorCode: ErrorCode.accountResourcesExceeded, description: nil)
}
41 changes: 41 additions & 0 deletions Tests/JetStreamTests/Integration/JetStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,45 @@ class JetStreamTests: XCTestCase {

try await client.close()
}

func testRequest() async throws {

let bundle = Bundle.module
natsServer.start(
cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath)
logger.logLevel = .debug

let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build()
try await client.connect()

var ctx = JetStreamContext(client: client)

let stream = """
{
"name": "FOO",
"subjects": ["foo"]
}
"""
let data = stream.data(using: .utf8)!

var resp = try await client.request(data, subject: "$JS.API.STREAM.CREATE.FOO")

let info: Response<AccountInfo> = try await ctx.request("INFO", message: Data())

guard case .success(let info) = info else {
XCTFail("request should be successful")
return
}

XCTAssertEqual(info.streams, 1)
let badInfo: Response<AccountInfo> = try await ctx.request("STREAM.INFO.BAD", message: Data())
guard case .error(let jetStreamAPIResponse) = badInfo else {
XCTFail("should get error")
return
}

XCTAssertEqual(ErrorCode.streamNotFound, jetStreamAPIResponse.error.errorCode)


}
}

0 comments on commit 0f2fe1d

Please sign in to comment.