Skip to content

Fix broken logic when appending in async#51

Merged
ThomasTJdev merged 1 commit intomasterfrom
asyncAddWork
Mar 20, 2026
Merged

Fix broken logic when appending in async#51
ThomasTJdev merged 1 commit intomasterfrom
asyncAddWork

Conversation

@python36
Copy link
Copy Markdown
Owner

Test code:

import nmqtt
import asyncdispatch

let localCtx = newMqttCtx("test_async")
localCtx.setHost("localhost", 1883)
waitFor localCtx.start()

proc test(ind: string) {.async.} =
  for i in 1..2:
    await localCtx.publish("test/" & ind & "/" & $i, "test", 0, false)
    await sleepAsync 1

proc main() {.async.} =
  asyncCheck test("1")
  asyncCheck test("2")

asyncCheck main()

runForever()

Run listener:

mosquitto_sub -p 1883 -t "test/#" -v

Result:

test/1/1 test
test/2/1 test
test/1/2 test

Skipped test/2/2 test!


Problem (nmqtt.nim):
Since the table is copied before the loop, if a new Work is added while the loop is being processed, it will be ignored.

proc work(ctx: MqttCtx) {.async.} =

# ...

    for msgId, work in workQueue:

      #when defined(broker):
      if work.typ in [ConnAck, SubAck, UnsubAck, PingResp]:
        if await ctx.sendWork(work):
          delWork.add msgId
          continue

      if work.wk == PubWork and work.state == WorkNew:
        if work.typ == Publish and work.qos == 0:
          if await ctx.sendWork(work): delWork.add msgId # <--- at this point

        elif work.typ == PubAck and work.qos == 1:
          if await ctx.sendWork(work): delWork.add msgId # <--- at this point

        elif work.typ == PubComp and work.qos == 2:
          if await ctx.sendWork(work): delWork.add msgId # <--- at this point

        else:
          if await ctx.sendWork(work): work.state = WorkSent # <--- at this point

      #when not defined(broker):
      elif work.wk == SubWork and work.state == WorkNew:
        if work.typ == Subscribe:
          if await ctx.sendWork(work): work.state = WorkSent # <--- at this point

        elif work.typ == Unsubscribe:
          if await ctx.sendWork(work): # <--- at this point
            work.state = WorkSent
            ctx.pubCallbacks.del work.topic

# ...

Solution: After looping through the table, check whether new Works have been added. Result after changes:

test/1/1 test
test/2/1 test
test/1/2 test
test/2/2 test

test/2/2 test not skipped!

@ThomasTJdev ThomasTJdev merged commit a586782 into python36:master Mar 20, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants