Skip to content

Commit 8a257c8

Browse files
committed
Merge pull request 256dpi#14 from 256dpi/size-prefix
Size Prefix
2 parents 89a4b44 + 2b5aa46 commit 8a257c8

File tree

4 files changed

+79
-64
lines changed

4 files changed

+79
-64
lines changed

src/YunMQTTClient.cpp

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ boolean YunMQTTClient::connect(const char * clientId, const char * username, con
5151
this->process.readStringUntil('\n');
5252

5353
// set will if available
54-
if(this->willTopic != NULL) {
54+
if(strlen(this->willTopic) > 0) {
5555
this->process.print("w:");
5656
this->process.print(this->willTopic);
57-
if(this->willPayload != NULL) {
58-
this->process.print(':');
59-
this->process.print(this->willPayload);
60-
}
57+
this->process.print(':');
58+
this->process.print(strlen(this->willPayload));
59+
this->process.print(';');
60+
this->process.print(this->willPayload);
6161
this->process.print('\n');
6262
}
6363

@@ -74,11 +74,11 @@ boolean YunMQTTClient::connect(const char * clientId, const char * username, con
7474
this->process.print(':');
7575
this->process.print(password);
7676
}
77-
this->process.print('\n');
77+
this->process.print(";\n");
7878

7979
// wait for answer
8080
String ret = this->process.readStringUntil('\n');
81-
this->alive = ret.equals("a");
81+
this->alive = ret.equals("a;");
8282

8383
if(!this->alive) {
8484
this->process.close();
@@ -105,6 +105,8 @@ void YunMQTTClient::publish(const char * topic, const char * payload) {
105105
this->process.print("p:");
106106
this->process.print(topic);
107107
this->process.print(':');
108+
this->process.print(strlen(payload));
109+
this->process.print(';');
108110
this->process.print(payload);
109111
this->process.print('\n');
110112
}
@@ -117,7 +119,7 @@ void YunMQTTClient::subscribe(const char * topic) {
117119
// send subscribe request
118120
this->process.print("s:");
119121
this->process.print(topic);
120-
this->process.print('\n');
122+
this->process.print(";\n");
121123
}
122124

123125
void YunMQTTClient::unsubscribe(String topic) {
@@ -128,26 +130,34 @@ void YunMQTTClient::unsubscribe(const char * topic) {
128130
// send unsubscribe request
129131
this->process.print("u:");
130132
this->process.print(topic);
131-
this->process.print('\n');
133+
this->process.print(";\n");
132134
}
133135

134136
void YunMQTTClient::loop() {
135137
int av = this->process.available();
136138
if(av > 0) {
137-
String ret = process.readStringUntil('\n');
139+
String ret = process.readStringUntil(';');
138140

139141
if(ret.startsWith("m")) {
140142
int startTopic = 2;
141143
int endTopic = ret.indexOf(':', startTopic + 1);
142-
int startPayload = endTopic + 1;
143-
int endPayload = ret.indexOf(':', startPayload + 1);
144144
String topic = ret.substring(startTopic, endTopic);
145-
String payload = ret.substring(startPayload, endPayload);
146-
messageReceived(topic, payload, (char*)payload.c_str(), payload.length());
145+
146+
int startPayloadLength = endTopic + 1;
147+
int endPayloadLength = ret.indexOf(':', startPayloadLength + 1);
148+
int payloadLength = ret.substring(startPayloadLength, endPayloadLength).toInt();
149+
150+
char buf[payloadLength+1];
151+
process.readBytes(buf, payloadLength);
152+
buf[payloadLength] = '\0';
153+
154+
messageReceived(topic, String(buf), buf, payloadLength);
147155
} else if(ret.startsWith("e")) {
148156
this->alive = false;
149157
this->process.close();
150158
}
159+
160+
process.readStringUntil('\n');
151161
}
152162
}
153163

@@ -157,7 +167,7 @@ boolean YunMQTTClient::connected() {
157167

158168
void YunMQTTClient::disconnect() {
159169
// send disconnect request
160-
this->process.print("d\n");
170+
this->process.print("d;\n");
161171
}
162172

163173
#endif //ARDUINO_AVR_YUN

src/YunMQTTClient.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ class YunMQTTClient {
1313
Process process;
1414
const char * hostname;
1515
int port;
16-
const char * willTopic = NULL;
17-
const char * willPayload = NULL;
16+
const char * willTopic = "";
17+
const char * willPayload = "";
1818
boolean alive = false;
1919
boolean updateBridge();
2020
public:

yun/abi.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@
55
The following commands are exchanged between the python script and the arduino library:
66

77
| Command | Format
8-
---|-------------|-----------------------------------------
9-
-> | will | `w:topic:(payload)`
10-
-> | connect | `c:host:port:id:(user):(pass)`
11-
<- | approved | `a`
12-
<- | rejected | `r`
13-
-> | subscribe | `s:(topic)`
14-
-> | unsubscribe | `u:(topic)`
15-
-> | publish | `p:(topic):(data)`
16-
<- | message | `m:(topic):(data)`
17-
-> | disconnect | `d`
18-
<- | closed | `e`
8+
---|-------------|------------------------------------
9+
<- | boot | `b;`
10+
-> | will | `w:topic:payload_len;(payload)`
11+
-> | connect | `c:host:port:id:(user):(pass);`
12+
<- | approved | `a;`
13+
<- | rejected | `r;`
14+
-> | subscribe | `s:topic;`
15+
-> | unsubscribe | `u:topic;`
16+
-> | publish | `p:topic:payload_len;(payload)`
17+
<- | message | `m:topic:payload_len;(payload)`
18+
-> | disconnect | `d;`
19+
<- | closed | `e;`
1920

2021
All commands end with a standard line break `\n`.

yun/bridge.py

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
import mqtt
33

44
class Bridge:
5+
COMMAND_END = b"\n"
6+
ARGUMENTS_SEPERATOR = b":"
7+
PAYLOAD_SEPERATOR = b";"
8+
59
# Constructor
610
def __init__(self):
711
self.client = None
@@ -10,19 +14,16 @@ def __init__(self):
1014

1115
# Bridge Callbacks
1216
def on_connect(self, _, __, ___, rc):
13-
if rc == 0:
14-
self.send_command("a")
15-
else:
16-
self.send_command("r")
17+
self.send_command("a;" if rc == 0 else "r;")
1718
def on_message(self, _, __, msg):
18-
self.send_command("m:" + msg.topic + ":" + str(msg.payload))
19+
self.send_command("m:" + msg.topic + ":" + str(len(msg.payload)) + ";" + str(msg.payload))
1920
def on_disconnect(self):
2021
self.client.loop_stop()
21-
self.send_command("e")
22+
self.send_command("e;")
2223

2324
# Command Helpers
2425
def parse_command(self, line):
25-
segments = line.split(":")
26+
segments = line.split(self.ARGUMENTS_SEPERATOR)
2627
cmd = segments[0]
2728
remaining = segments[1:]
2829
if cmd == 'w':
@@ -37,51 +38,54 @@ def parse_command(self, line):
3738
self.do_publish(remaining)
3839
elif cmd == 'd':
3940
self.do_disconnect()
41+
self.read_until(self.COMMAND_END)
4042
def send_command(self, line):
41-
sys.stdout.write(line + "\n")
43+
sys.stdout.write(line + self.COMMAND_END)
4244

4345
# Command Handlers
4446
def do_will(self, args):
4547
self.will_topic = args[0]
46-
if len(args) >= 2:
47-
self.will_payload = args[1]
48+
self.will_payload = self.read_chunk(int(args[1]))
4849

4950
def do_connect(self, args):
50-
if len(args) >= 3:
51-
self.client = mqtt.Client(args[2])
52-
self.client.on_connect = self.on_connect
53-
self.client.on_message = self.on_message
54-
if len(args) >= 5:
55-
self.client.username_pw_set(args[3], args[4])
56-
if len(self.will_topic) > 0:
57-
if len(self.will_payload) > 0:
58-
self.client.will_set(self.will_topic, self.will_payload)
59-
else:
60-
self.client.will_set(self.will_topic)
61-
try:
62-
self.client.connect(args[0], int(args[1]))
63-
self.client.loop_start()
64-
except:
65-
self.send_command("r")
51+
self.client = mqtt.Client(args[2])
52+
self.client.on_connect = self.on_connect
53+
self.client.on_message = self.on_message
54+
if len(args) >= 5:
55+
self.client.username_pw_set(args[3], args[4])
56+
if len(self.will_topic) > 0:
57+
self.client.will_set(self.will_topic, self.will_payload)
58+
try:
59+
self.client.connect(args[0], int(args[1]))
60+
self.client.loop_start()
61+
except:
62+
self.send_command("r;")
6663

6764
def do_subscribe(self, args):
68-
if self.client and len(args) >= 1:
69-
self.client.subscribe(args[0])
65+
self.client.subscribe(args[0])
7066
def do_unsubscribe(self, args):
71-
if self.client and len(args) >= 1:
72-
self.client.unsubscribe(args[0])
67+
self.client.unsubscribe(args[0])
7368
def do_publish(self, args):
74-
if self.client and len(args) >= 2:
75-
self.client.publish(args[0], args[1])
69+
self.client.publish(args[0], self.read_chunk(int(args[1])))
7670
def do_disconnect(self):
77-
if self.client:
78-
self.client.disconnect()
71+
self.client.disconnect()
7972

8073
# Main
8174
def run(self):
82-
self.send_command("ok")
75+
self.send_command("b;")
8376
while True:
84-
self.parse_command(sys.stdin.readline()[0:-1])
77+
self.parse_command(self.read_until(self.PAYLOAD_SEPERATOR))
78+
79+
# Low Level Helpers
80+
def read_until(self, end):
81+
s = b""
82+
c = sys.stdin.read(1)
83+
while c not in end:
84+
s += c
85+
c = sys.stdin.read(1)
86+
return s
87+
def read_chunk(self, length):
88+
return sys.stdin.read(length)
8589

8690
# Main Loop
8791
Bridge().run()

0 commit comments

Comments
 (0)