Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fix a publish bug

  • Loading branch information...
commit c56420ae5c73a4c86eb3c3bba9cedd8bc0802031 1 parent a242edd
@yilun authored
Showing with 39 additions and 41 deletions.
  1. +39 −41 MQTTClient.js
View
80 MQTTClient.js
@@ -1,6 +1,6 @@
/**
*This is a simple MQTT cient on node.js
- *Author: Fan Yilun @CEIT @10 FEB 2011
+ *Author: Fan Yilun @CEIT @14 FEB 2011
*/
var sys = require('sys');
var net = require('net');
@@ -12,7 +12,6 @@ var MQTTSUBSCRIBE = 0x80; //8<<4;
var KEEPALIVE = 15000;
-//Testing
//var client = Client(1883, '127.0.0.1', 'mirror');
function MQTTClient(port, host, clientID) {
@@ -176,42 +175,43 @@ MQTTClient.prototype.subscribe = function (sub_topic) {
MQTTClient.prototype.publish = function (pub_topic, payload) {
if(this.connected){
- var i = 0, n = 0;
- var var_header = new Buffer(3+pub_topic.length);
-
- //Variable header
- //Assume payload length no longer than 128
- var_header[i++] = 0;
- var_header[i++] = pub_topic.length;
- for (n = 0; n < pub_topic.length; n++) {
- var_header[i++] = pub_topic.charCodeAt(n);
- }
- var_header[i++] = 0;
-
- i = 0;
- var buffer = new Buffer(2+var_header.length+payload.length);
-
- //Fix header
- buffer[i++] = MQTTPUBLISH;
- buffer[i++] = payload.length + var_header.length;
-
- for (n = 0; n < var_header.length; n++) {
- buffer[i++] = var_header[n];
- }
- for (n = 0; n < payload.length; n++) { //Insert payloads
- buffer[i++] = payload.charCodeAt(n);
- }
-
- sys.puts("||Publish|| "+pub_topic+' : '+payload);
-
- this.conn.write(buffer, encoding="ascii");
-
- //reset timer
- var cc = this;
- clearTimeout(this.timeout);
- this.timeout = setTimeout(function() {
- cc.timeUp();
- }, 25000);
+ var i = 0, n = 0;
+ var var_header = new Buffer(2+pub_topic.length);
+
+ //Variable header
+ //Assume payload length no longer than 128
+ var_header[i++] = 0;
+ var_header[i++] = pub_topic.length;
+ for (n = 0; n < pub_topic.length; n++) {
+ var_header[i++] = pub_topic.charCodeAt(n);
+ }
+ //QoS 1&2
+ //var_header[i++] = 0;
+ //var_header[i++] = 0x03;
+
+ i = 0;
+ var buffer = new Buffer(2+var_header.length+payload.length);
+
+ //Fix header
+ buffer[i++] = MQTTPUBLISH;
+ buffer[i++] = payload.length + var_header.length;
+ for (n = 0; n < var_header.length; n++) {
+ buffer[i++] = var_header[n];
+ }
+ for (n = 0; n < payload.length; n++) { //Insert payloads
+ buffer[i++] = payload.charCodeAt(n);
+ }
+
+ sys.puts("||Publish|| "+pub_topic+' : '+payload);
+
+ this.conn.write(buffer, encoding="ascii");
+
+ //reset timer
+ var cc = this;
+ clearTimeout(this.timeout);
+ this.timeout = setTimeout(function() {
+ cc.timeUp();
+ }, 25000);
}
};
@@ -223,16 +223,14 @@ MQTTClient.prototype.onData = function(data){
//sys.puts('3:'+data[3]);
if (type == 3) { // PUBLISH
var tl = data[3]+data[2]; //<<4
- sys.puts(tl);
var topic = new Buffer(tl);
for(var i = 0; i < tl; i++){
topic[i] = data[i+4];
}
if(tl+4 <= data.length){
- var payload = data.slice(tl+5, data.length);
+ var payload = data.slice(tl+4, data.length);
sys.puts("Receive on Topic:"+topic);
sys.puts("Payload:"+payload+'\n');
-
this.emit("mqttData", topic, payload);
}
} else if (type == 12) { // PINGREG -- Ask for alive
Please sign in to comment.
Something went wrong with that request. Please try again.