Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync subscription next #21

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/experimental/scoped_connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ function with_connection(f, nc::Connection)
with(f, sconnection => nc)
end

function subscribe(
subject::String;
queue_group::Union{String, Nothing} = nothing,
channel_size = parse(Int64, get(ENV, "NATS_SUBSCRIPTION_CHANNEL_SIZE", string(DEFAULT_SUBSCRIPTION_CHANNEL_SIZE))),
monitoring_throttle_seconds = parse(Float64, get(ENV, "NATS_SUBSCRIPTION_ERROR_THROTTLING_SECONDS", string(DEFAULT_SUBSCRIPTION_ERROR_THROTTLING_SECONDS)))
)
subscribe(scoped_connection(), subject; queue_group, channel_size, monitoring_throttle_seconds)
end

function subscribe(
f,
subject::String;
Expand Down Expand Up @@ -129,3 +138,19 @@ function request(
)
request(T, scoped_connection(), subject, data; timer)
end

function next(sub::Sub; no_wait = false, no_throw = false)::Union{Msg, Nothing}
next(scoped_connection(), sub::Sub; no_wait = false, no_throw = false)
end

function next(T::Type, sub::Sub; no_wait = false, no_throw = false)::Union{T, Nothing}
next(T, scoped_connection(), sub::Sub; no_wait = false, no_throw = false)
end

function next(sub::Sub, batch::Integer; no_wait = false, no_throw = false)::Vector{Msg}
next(scoped_connection(), sub, batch; no_wait = false, no_throw = false)
end

function next(T::Type, sub::Sub, batch::Integer; no_wait = false, no_throw = false)::Vector{T}
next(T, scoped_connection(), sub, batch; no_wait = false, no_throw = false)
end
37 changes: 36 additions & 1 deletion src/pubsub/subscribe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
subject::String;
queue_group::Union{String, Nothing} = nothing,
channel_size::Int64 = parse(Int64, get(ENV, "NATS_SUBSCRIPTION_CHANNEL_SIZE", string(DEFAULT_SUBSCRIPTION_CHANNEL_SIZE))),
monitoring_throttle_seconds::Float64 = parse(Float64, get(ENV, "NATS_SUBSCRIPTION_ERROR_THROTTLING_SECONDS", string(DEFAULT_SUBSCRIPTION_ERROR_THROTTLING_SECONDS)))
)
sid = new_sid(connection)
sub = Sub(subject, queue_group, sid)
Expand All @@ -73,6 +74,19 @@
connection.sub_data[sid] = SubscriptionData(sub, subscription_channel, sub_stats, false, ReentrantLock())
end
send(connection, sub)
subscription_monitoring_task = Threads.@spawn :interactive disable_sigint() do
while isopen(subscription_channel) || Base.n_avail(subscription_channel) > 0
sleep(monitoring_throttle_seconds)
# Warn if subscription channel is too small.
level = Base.n_avail(subscription_channel) / subscription_channel.sz_max
if level > 0.8 # TODO: add to config.
stats = NATS.stats(connection, sub)
@warn "Subscription on $subject channel is full in $(100 * level) %, dropped messages: $(stats.msgs_dropped)"

Check warning on line 84 in src/pubsub/subscribe.jl

View check run for this annotation

Codecov / codecov/patch

src/pubsub/subscribe.jl#L83-L84

Added lines #L83 - L84 were not covered by tests
end
end
# @debug "Subscription monitoring task finished" subject
end
errormonitor(subscription_monitoring_task)
sub
end

Expand All @@ -83,7 +97,7 @@
- `no_wait`: do not wait for next message, return `nothing` if buffer is empty
- `no_throw`: do not throw exception, returns `nothing` if cannot get next message
"""
function next(connection, sub; no_wait = false, no_throw = false)
function next(connection::Connection, sub::Sub; no_wait = false, no_throw = false)::Union{Msg, Nothing}
sub_data = @lock connection.lock get(connection.sub_data, sub.sid, nothing)
if isnothing(sub_data)
no_throw && return nothing
Expand Down Expand Up @@ -130,6 +144,27 @@
msg
end

function next(T::Type, connection::Connection, sub::Sub; no_wait = false, no_throw = false)::Union{T, Nothing}
find_msg_conversion_or_throw(T)
msg = next(connection, sub; no_wait, no_throw)
isnothing(msg) ? nothing : convert(T, msg) #TODO: invokelatest
end

function next(connection::Connection, sub::Sub, batch::Integer; no_wait = false, no_throw = false)::Vector{Msg}
msgs = []
for i in 1:batch
msg = next(connection, sub; no_wait, no_throw)
isnothing(msg) && break
push!(msgs, msg)
end
msgs
end

function next(T::Type, connection::Connection, sub::Sub, batch::Integer; no_wait = false, no_throw = false)::Vector{T}
find_msg_conversion_or_throw(T)
convert.(T, next(connection, sub, batch; no_wait, no_throw)) #TODO: invokelatest
end

@kwdef mutable struct SubscriptionMonitoringData
last_error::Union{Any, Nothing} = nothing
last_error_msg::Union{Msg, Nothing} = nothing
Expand Down
29 changes: 29 additions & 0 deletions test/experimental.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using NATS, Test
using JSON3

@testset "Scoped connections" begin
@test_throws ErrorException publish("some.random.subject")
Expand Down Expand Up @@ -45,3 +46,31 @@ using NATS, Test
end
drain(sc)
end

@testset "Scoped connections sync subscriptions" begin
sc = NATS.connect()

with_connection(sc) do
sub = subscribe("subject_1")

publish("subject_1", "test")
msg = next(sub)
@test msg isa NATS.Msg

publish("subject_1", "{}")
msg = next(JSON3.Object, sub)
@test msg isa JSON3.Object

publish("subject_1", "test")
msgs = next(sub, 1)
@test msgs isa Vector{NATS.Msg}
@test length(msgs) == 1

publish("subject_1", "{}")
jsons = next(JSON3.Object, sub, 1)
@test jsons isa Vector{JSON3.Object}
@test length(jsons) == 1

drain(sub)
end
end
53 changes: 53 additions & 0 deletions test/pubsub.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Test
using NATS
using Random
using JSON3

@testset "Publish subscribe" begin
c = Channel()
Expand Down Expand Up @@ -100,6 +101,58 @@ NATS.status()
end
end

@testset "Synchronous subscriptions" begin
subject = randstring(8)
sub = subscribe(nc, subject)

msg = next(nc, sub; no_wait = true)
@test isnothing(msg)

@async begin
for i in 1:100
sleep(0.01)
publish(nc, subject, """{"x": 1}""")
end
sleep(0.2)
unsubscribe(nc, sub)
end

msg = next(nc, sub)
@test msg isa NATS.Msg

json = next(JSON3.Object, nc, sub)
@test json.x == 1

msgs = next(nc, sub, 10)
@test msgs isa Vector{NATS.Msg}
@test length(msgs) == 10

jsons = next(JSON3.Object, nc, sub, 10)
@test length(jsons) == 10

sleep(2)

msgs = next(nc, sub, 78)
@test msgs isa Vector{NATS.Msg}
@test length(msgs) == 78

msgs = next(nc, sub, 100; no_wait = true, no_throw = true)
@test msgs isa Vector{NATS.Msg}
@test length(msgs) == 0

jsons = next(JSON3.Object, nc, sub, 100; no_throw = true, no_wait = true)
@test msgs isa Vector{NATS.Msg}
@test length(jsons) == 0

@test_throws "Client unsubscribed" next(nc, sub)
@test_throws "Client unsubscribed" next(JSON3.Object, nc, sub)
@test_throws "Client unsubscribed" next(nc, sub; no_wait = true)
@test_throws "Client unsubscribed" next(JSON3.Object, nc, sub)
@test_throws "Client unsubscribed" next(nc, sub, 2)
@test_throws "Client unsubscribed" next(JSON3.Object, nc, sub, 2)
@test isnothing(next(nc, sub; no_throw = true, no_wait = true))
end

@testset "10k subscriptions" begin
n_subs = 10000
n_pubs = 10
Expand Down
Loading