1
1
#include " YunMQTTClient.h"
2
2
3
- // void messageArrived(MQTT::MessageData& messageData) {
4
- // MQTT::Message &message = messageData.message;
5
- //
6
- // // null terminate topic to create String object
7
- // int len = messageData.topicName.lenstring.len;
8
- // char topic[len+1];
9
- // memcpy(topic, messageData.topicName.lenstring.data, (size_t)len);
10
- // topic[len] = '\0';
11
- //
12
- // // null terminate payload
13
- // char * payload = (char *)message.payload;
14
- // payload[message.payloadlen] = '\0';
15
- // messageReceived(String(topic), String(payload), (char*)message.payload, (unsigned int)message.payloadlen);
16
- // }
17
-
18
3
YunMQTTClient::YunMQTTClient (const char * _hostname, int _port) {
19
- // this->client = new MQTT::Client<Network, Timer, MQTT_BUFFER_SIZE, 0>(this->network);
20
- // this->network.setClient(&_client);
21
- // this->client->setDefaultMessageHandler(messageArrived);
22
- // this->hostname = _hostname;
23
- // this->port = _port;
4
+ this ->hostname = _hostname;
5
+ this ->port = _port;
24
6
}
25
7
26
8
boolean YunMQTTClient::connect (const char * clientId) {
27
- // return this->connect(clientId, "", "");
9
+ return this ->connect (clientId, " " , " " );
28
10
}
29
11
30
12
boolean YunMQTTClient::connect (const char * clientId, const char * username, const char * password) {
31
- // if(!this->network.connect((char*)this->hostname, this->port)) {
32
- // return false;
33
- // }
34
- //
35
- // MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
36
- // data.clientID.cstring = (char*)clientId;
37
- // if(username && password) {
38
- // data.username.cstring = (char*)username;
39
- // data.password.cstring = (char*)password;
40
- // }
41
- //
42
- // return this->client->connect(data) == 0;
13
+ this ->process .begin (" python" );
14
+ this ->process .addParameter (" -u" );
15
+ this ->process .addParameter (" /usr/client.py" );
16
+ this ->process .runAsynchronously ();
17
+ this ->process .setTimeout (10000 );
18
+
19
+ // wait for script to launch
20
+ this ->process .readStringUntil (' \n ' );
21
+
22
+ // send connect request
23
+ this ->process .print (" c:" );
24
+ this ->process .print (this ->hostname );
25
+ this ->process .print (' :' );
26
+ this ->process .print (this ->port );
27
+ if (strlen (username) > 0 && strlen (password) > 0 ) {
28
+ this ->process .print (' :' );
29
+ this ->process .print (username);
30
+ this ->process .print (' :' );
31
+ this ->process .print (password);
32
+ }
33
+ this ->process .print (' \n ' );
34
+
35
+ // wait for answer
36
+ String ret = this ->process .readStringUntil (' \n ' );
37
+ return ret.equals (" ca" );
43
38
}
44
39
45
40
void YunMQTTClient::publish (String topic) {
@@ -51,43 +46,62 @@ void YunMQTTClient::publish(String topic, String payload) {
51
46
}
52
47
53
48
void YunMQTTClient::publish (const char * topic, String payload) {
54
- // this->publish(topic, payload.c_str());
49
+ this ->publish (topic, payload.c_str ());
55
50
}
56
51
57
52
void YunMQTTClient::publish (const char * topic, const char * payload) {
58
- // MQTT::Message message;
59
- // message.qos = MQTT::QOS0;
60
- // message.retained = false;
61
- // message.dup = false;
62
- // message.payload = (char*)payload;
63
- // message.payloadlen = strlen(payload);
64
- // client->publish(topic, message);
53
+ // send publish request
54
+ this ->process .print (" p:" );
55
+ this ->process .print (topic);
56
+ this ->process .print (' :' );
57
+ this ->process .print (payload);
58
+ this ->process .print (' \n ' );
65
59
}
66
60
67
61
void YunMQTTClient::subscribe (String topic) {
68
62
this ->subscribe (topic.c_str ());
69
63
}
70
64
71
65
void YunMQTTClient::subscribe (const char * topic) {
72
- // client->subscribe(topic, MQTT::QOS0, NULL);
66
+ // send subscribe request
67
+ this ->process .print (" s:" );
68
+ this ->process .print (topic);
69
+ this ->process .print (' \n ' );
73
70
}
74
71
75
72
void YunMQTTClient::unsubscribe (String topic) {
76
73
this ->unsubscribe (topic.c_str ());
77
74
}
78
75
79
76
void YunMQTTClient::unsubscribe (const char * topic) {
80
- // client->unsubscribe(topic);
77
+ // send unsubscribe request
78
+ this ->process .print (" u:" );
79
+ this ->process .print (topic);
80
+ this ->process .print (' \n ' );
81
81
}
82
82
83
83
void YunMQTTClient::loop () {
84
- // this->client->yield();
84
+ int av = this ->process .available ();
85
+ if (av > 0 ) {
86
+ String ret = process.readStringUntil (' \n ' );
87
+
88
+ if (ret.charAt (0 ) == ' m' ) {
89
+ int startTopic = 2 ;
90
+ int endTopic = ret.indexOf (' :' , startTopic + 1 );
91
+ int startPayload = endTopic + 1 ;
92
+ int endPayload = ret.indexOf (' :' , startPayload + 1 );
93
+ String topic = ret.substring (startTopic, endTopic);
94
+ String payload = ret.substring (startPayload, endPayload);
95
+ messageReceived (topic, payload, (char *)payload.c_str (), payload.length ());
96
+ }
97
+ }
85
98
}
86
99
87
100
boolean YunMQTTClient::connected () {
88
- // return this->client->isConnected();
101
+ // TODO: fix!
102
+ return true ;
89
103
}
90
104
91
105
void YunMQTTClient::disconnect () {
92
- // this->client->disconnect();
106
+ // TODO: implement!
93
107
}
0 commit comments