Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt breaks easily...sending in process #146

Closed
openSoftMan opened this issue Jan 29, 2015 · 13 comments
Closed

mqtt breaks easily...sending in process #146

openSoftMan opened this issue Jan 29, 2015 · 13 comments

Comments

@openSoftMan
Copy link

It's easy to break mqtt. Setup mmqt, connect to broker, and running this reboots with "sending in process"

count = 0
tmr.alarm(1, 10000, 1, function()
count = count+1
print("timer1:"..count)
m:publish("/mytopic","timer1:"..count,0,0, function(conn) end)
end)

count2 = 0
tmr.alarm(2, 30000, 1, function()
count2 = count2+1
print("timer2:"..count2)
m:publish("/mytopic","timer2:"..count2,0,0, function(conn) end)
end)

@Suxsem
Copy link

Suxsem commented Jan 30, 2015

+1 has to be fixed

@openSoftMan
Copy link
Author

The above is just an example to show the reboot issue in a quick way. One can run a single timer that publishes every 30 seconds no problem. But as soon as you add another event based trigger (interrupt or another timer event). It's inevitable that you'll get a reboot with "sending in process".

@kmpm
Copy link
Contributor

kmpm commented Feb 7, 2015

+1
It would be nice with some kind of queue on sending stuff.
This one cought me.

m:on('connect', function (conn) 
    m:subscribe('/inbound', 0)
    m:publish('/outbound','foo',0,0)
end)

@tuanpmt
Copy link
Contributor

tuanpmt commented Feb 8, 2015

you can do it with this: https://github.com/tuanpmt/lua-async

async.waterfall({
    function (callback)
        m:connect("192.168.11.118", 1880, 0, function(conn) 
            print("connected")
            callback()
        end)
    end,
    function (callback)
        m:subscribe('/inbound', 0, function(conn)
            callback()
        end)
    end,
    function(callback)
        m:publish('/outbound','foo',0,0, function(conn)
            callback(nil, "done")
        end)

    end
}, function (err, result)
   -- result now equals "done"
   print(result)
end)

@kmpm
Copy link
Contributor

kmpm commented Feb 8, 2015

lua-async can help in some situations even though it would be easier to use the callback in the connect case.
Where lua-async will fail if for example if there are 2 different timers or some other kind of events that don't know of each other (see initial comment on this issue).
Using some kind of semaphore or locking to check if then connection is occupied could be a solution but that doesn't help the event driven programming style that I like and that the client supports.

If possible, in this memory constrained environment, I would like to see some kind of buffer on the outbound messages.
Then you could do stuff as the following without risking a "sending in process" restart.

m:on('message', function(conn, topic, data)
    if data ~= nil then
        --do something with data and reply
        m:publish('/outbound', data, 0, 0)
    end
end)

tmr.alarm(2, 5000, 1, function()
    temp = ds.read()
    m:publish('/temperature', temp, 0, 0)   
end)

@Diaoul
Copy link

Diaoul commented Feb 13, 2015

+1 for the semaphore solution, event based is cool but so far MQTT can only be single-event based, as soon as you can have multiple events that can trigger m:publish at the same time you'll end with a sending in process error.

@Diaoul
Copy link

Diaoul commented Feb 15, 2015

I had a quick look at the implementation in nodemcu and I don't know how this can be solved easily so I worked on a pure LUA solution:

publisher = coroutine.create(
  function()
    -- define timeout to 6 seconds
    local timeout = 6000000

    -- reference the current coroutine
    local self = coroutine.running()

    -- create a queue
    local queue = {}

   -- init status to OK
   local status = "OK"

   -- init start_time to now
   local start_time = tmr.now()

    -- main loop
    while true do
      -- wait for status while accepting new items
      repeat
        value1, value2 = coroutine.yield()

        if value2 == nil then  -- status
          if value1 == "CHECK" then
            if #queue > 0 and tmr.now() - start_time > timeout then
              status = "TIMEOUT"
            end
          else
            status = value1
          end
        else  -- topic + message
          table.insert(queue, {value1, value2})
        end
      until status ~= "PENDING"

      -- consume the queue
      item = table.remove(queue)
      if item ~= nil then
        local topic, message = unpack(item)
        start_time = tmr.now()
        status = "PENDING"

        -- publish and set status to OK in callback
        m:publish(topic, message, 0, 0, function(conn)
          coroutine.resume(self, "OK")
        end)
      end
    end
  end
)

coroutine.resume(publisher)

tmr.alarm(1, 1000, 1, function()
  coroutine.resume(publisher, "CHECK")
end)

Then anywhere in your code you can call:

coroutine.resume(publisher, "topic", "message")

This is non-blocking, it will queue all the messages and send them one by one with a timeout set to 6 seconds

HTH

@crapthings
Copy link

sending in process

  • 1

@dvv
Copy link
Contributor

dvv commented Mar 19, 2015

@crapthings
Copy link

@dvv great job.

@Diaoul
Copy link

Diaoul commented Mar 19, 2015

@dvv: what happens if MQTT server is down when you publish? I had a quick look to your helper but it seems it gets stuck in sending and doesn't send anything else even if the server is back up. Also the queue doesn't empty on timeout so you'll run out of memory.

@dvv
Copy link
Contributor

dvv commented Mar 19, 2015

The latter case should reboot the node and it's good (just let it die and respawn afresh).
The former is harder but my point is to program normal conditions.
In either event, this helper combined with your timeouts should be what mqtt module does expose per se, without need in such patching at user level, imho.

@nodemcu
Copy link
Collaborator

nodemcu commented Apr 5, 2015

queue added, this is fixed.

@nodemcu nodemcu closed this as completed Apr 5, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants