Skip to content

Commit

Permalink
Add additional stream APIs (#73)
Browse files Browse the repository at this point in the history
* Add refreshing info on Stream
* Add listing streams and stream names

---------

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored May 15, 2024
1 parent 851f8b5 commit f4c93d6
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 0 deletions.
149 changes: 149 additions & 0 deletions Sources/JetStream/JetStreamContext+Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,153 @@ extension JetStreamContext {
struct StreamDeleteResponse: Codable {
let success: Bool
}

/// Used to list stream infos.
/// Returns an AsyncSequence allowing iteration over streams.
/// Subject can be provided to filter the response.
public func streams(subject: String? = nil) async -> Streams {
return Streams(ctx: self, subject: subject)
}

/// Used to list stream names.
/// Returns an AsyncSequence allowing iteration over streams.
/// Subject can be provided to filter the response.
public func streamNames(subject: String? = nil) async -> StreamNames {
return StreamNames(ctx: self, subject: subject)
}
}

internal struct StreamsPagedRequest: Codable {
let offset: Int
let subject: String?
}

public struct Streams: AsyncSequence {
public typealias Element = StreamInfo
public typealias AsyncIterator = StreamsIterator

private let ctx: JetStreamContext
private let subject: String?
private var buffer: [StreamInfo]
private var offset: Int
private var total: Int?

struct StreamsInfoPage: Codable {
let total: Int
let streams: [StreamInfo]?
}

init(ctx: JetStreamContext, subject: String?) {
self.ctx = ctx
self.subject = subject
self.buffer = []
self.offset = 0
}

public func makeAsyncIterator() -> StreamsIterator {
return StreamsIterator(seq: self)
}

public mutating func next() async throws -> Element? {
if let stream = buffer.first {
buffer.removeFirst()
return stream
}

if let total = self.total, self.offset >= total {
return nil
}

// poll streams
let request = StreamsPagedRequest(offset: offset, subject: subject)

let res: Response<StreamsInfoPage> = try await ctx.request(
"STREAM.LIST", message: JSONEncoder().encode(request))
switch res {
case .success(let infos):
guard let streams = infos.streams else {
return nil
}
self.offset += streams.count
self.total = infos.total
buffer.append(contentsOf: streams)
return try await self.next()
case .error(let err):
throw err.error
}

}

public struct StreamsIterator: AsyncIteratorProtocol {
var seq: Streams

public mutating func next() async throws -> Element? {
try await seq.next()
}
}
}

public struct StreamNames: AsyncSequence {
public typealias Element = String
public typealias AsyncIterator = StreamNamesIterator

private let ctx: JetStreamContext
private let subject: String?
private var buffer: [String]
private var offset: Int
private var total: Int?

struct StreamNamesPage: Codable {
let total: Int
let streams: [String]?
}

init(ctx: JetStreamContext, subject: String?) {
self.ctx = ctx
self.subject = subject
self.buffer = []
self.offset = 0
}

public func makeAsyncIterator() -> StreamNamesIterator {
return StreamNamesIterator(seq: self)
}

public mutating func next() async throws -> Element? {
if let stream = buffer.first {
buffer.removeFirst()
return stream
}

if let total = self.total, self.offset >= total {
return nil
}

// poll streams
let request = StreamsPagedRequest(offset: offset, subject: subject)

let res: Response<StreamNamesPage> = try await ctx.request(
"STREAM.NAMES", message: JSONEncoder().encode(request))
switch res {
case .success(let names):
guard let streams = names.streams else {
return nil
}
self.offset += streams.count
self.total = names.total
buffer.append(contentsOf: streams)
return try await self.next()
case .error(let err):
throw err.error
}

}

public struct StreamNamesIterator: AsyncIteratorProtocol {
var seq: StreamNames

public mutating func next() async throws -> Element? {
try await seq.next()
}
}
}
12 changes: 12 additions & 0 deletions Sources/JetStream/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ public class Stream {
self.info = info
}

public func info() async throws -> StreamInfo {
let subj = "STREAM.INFO.\(info.config.name)"
let info: Response<StreamInfo> = try await ctx.request(subj)
switch info {
case .success(let info):
self.info = info
return info
case .error(let apiResponse):
throw apiResponse.error
}
}

static func validate(name: String) throws {
guard !name.isEmpty else {
throw StreamValidationError.nameRequired
Expand Down
104 changes: 104 additions & 0 deletions Tests/JetStreamTests/Integration/JetStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,108 @@ class JetStreamTests: XCTestCase {

XCTAssertEqual(stream.info.config, cfg)
}

func testStreamInfo() async throws {
let bundle = Bundle.module
natsServer.start(
cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath)
logger.logLevel = .debug

let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build()
try await client.connect()

let ctx = JetStreamContext(client: client)

// minimal config
let cfg = StreamConfig(name: "test", subjects: ["foo"])
let stream = try await ctx.createStream(cfg: cfg)

let info = try await stream.info()
XCTAssertEqual(info.config.name, "test")

// simulate external update of stream
let updateJSON = """
{
"name": "test",
"subjects": ["foo"],
"description": "updated"
}
"""
let data = updateJSON.data(using: .utf8)!

_ = try await client.request(data, subject: "$JS.API.STREAM.UPDATE.test")

XCTAssertNil(stream.info.config.description)

let newInfo = try await stream.info()
XCTAssertEqual(newInfo.config.description, "updated")
XCTAssertEqual(stream.info.config.description, "updated")
}

func testListStreams() async throws {
let bundle = Bundle.module
natsServer.start(
cfg: bundle.url(forResource: "jetstream", withExtension: "conf")!.relativePath)
logger.logLevel = .debug

let client = NatsClientOptions().url(URL(string: natsServer.clientURL)!).build()
try await client.connect()

let ctx = JetStreamContext(client: client)

for i in 0..<260 {
let subPrefix = i % 2 == 0 ? "foo" : "bar"
let cfg = StreamConfig(name: "STREAM-\(i)", subjects: ["\(subPrefix).\(i)"])
let _ = try await ctx.createStream(cfg: cfg)
}

// list all streams
var streams = await ctx.streams()

var i = 0
for try await _ in streams {
i += 1
}
XCTAssertEqual(i, 260)

var names = await ctx.streamNames()
i = 0
for try await _ in names {
i += 1
}
XCTAssertEqual(i, 260)

// list streams with subject foo.*
streams = await ctx.streams(subject: "foo.*")

i = 0
for try await stream in streams {
XCTAssert(stream.config.subjects!.first!.starts(with: "foo."))
i += 1
}
XCTAssertEqual(i, 130)

names = await ctx.streamNames(subject: "foo.*")
i = 0
for try await _ in names {
i += 1
}
XCTAssertEqual(i, 130)

// list streams with subject not matching any
streams = await ctx.streams(subject: "baz.*")

i = 0
for try await stream in streams {
XCTFail("should return 0 streams, got: \(stream.config)")
}
XCTAssertEqual(i, 0)

names = await ctx.streamNames(subject: "baz.*")
i = 0
for try await _ in names {
i += 1
}
XCTAssertEqual(i, 0)
}
}

0 comments on commit f4c93d6

Please sign in to comment.