Skip to content

Commit

Permalink
Revert "Remove obsolete code in MQTT lib"
Browse files Browse the repository at this point in the history
This reverts commit 9822d64.
  • Loading branch information
CurlyMoo committed Sep 21, 2020
1 parent 9822d64 commit d7e0482
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 10 deletions.
21 changes: 21 additions & 0 deletions libs/pilight/core/mqtt-client.c
Expand Up @@ -106,6 +106,10 @@ static void client_close_cb(uv_poll_t *req) {
struct mqtt_client_t *client = custom_poll_data->data;
char buffer[BUFFER_SIZE] = { 0 };

if(!uv_is_closing((uv_handle_t *)client->async_req)) {
uv_close((uv_handle_t *)client->async_req, close_cb);
}

struct mqtt_pkt_t pkt;
pkt.type = MQTT_DISCONNECTED;

Expand Down Expand Up @@ -182,6 +186,17 @@ static void client_close_cb(uv_poll_t *req) {
mqtt_client_remove(req);
}

static void do_write(uv_async_t *handle) {
/*
* Make sure we execute in the main thread
*/
const uv_thread_t pth_cur_id = uv_thread_self();
assert(uv_thread_equal(&pth_main_id, &pth_cur_id));

struct mqtt_client_t *client = handle->data;
uv_custom_write(client->poll_req);
}

static void ping(uv_timer_t *handle) {
mqtt_ping(handle->data);
}
Expand Down Expand Up @@ -525,6 +540,12 @@ int mqtt_client(char *ip, int port, char *clientid, char *willtopic, char *willm
memset(&client->messages[i], 0, sizeof(struct mqtt_message_t));
}

if((client->async_req = MALLOC(sizeof(uv_async_t))) == NULL) {
OUT_OF_MEMORY /*LCOV_EXCL_LINE*/
}
client->async_req->data = client;
uv_async_init(uv_default_loop(), client->async_req, do_write);

if((client->timeout_req = MALLOC(sizeof(uv_timer_t))) == NULL) {
OUT_OF_MEMORY /*LCOV_EXCL_LINE*/
}
Expand Down
20 changes: 10 additions & 10 deletions libs/pilight/core/mqtt-common.c
Expand Up @@ -810,7 +810,7 @@ int mqtt_ping(struct mqtt_client_t *client) {
iobuf_append(&client->send_iobuf, (void *)buf, len);
FREE(buf);
}
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand All @@ -832,7 +832,7 @@ int mqtt_subscribe(struct mqtt_client_t *client, char *topic, int qos) {
FREE(buf);
}
mqtt_free(&pkt);
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand All @@ -855,7 +855,7 @@ int mqtt_pubrec(struct mqtt_client_t *client, int msgid) {
FREE(buf);
}
mqtt_free(&pkt);
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand All @@ -878,7 +878,7 @@ int mqtt_pubrel(struct mqtt_client_t *client, int msgid) {
FREE(buf);
}
mqtt_free(&pkt);
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand All @@ -901,7 +901,7 @@ int mqtt_pubcomp(struct mqtt_client_t *client, int msgid) {
FREE(buf);
}
mqtt_free(&pkt);
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand All @@ -924,7 +924,7 @@ int mqtt_puback(struct mqtt_client_t *client, int msgid) {
FREE(buf);
}
mqtt_free(&pkt);
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand All @@ -944,7 +944,7 @@ int mqtt_suback(struct mqtt_client_t *client, int msgid, int qos) {
FREE(buf);
}
mqtt_free(&pkt);
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand All @@ -962,7 +962,7 @@ int mqtt_disconnect(struct mqtt_client_t *client) {
iobuf_append(&client->send_iobuf, (void *)buf, len);
FREE(buf);
}
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand All @@ -980,7 +980,7 @@ int mqtt_pingresp(struct mqtt_client_t *client) {
iobuf_append(&client->send_iobuf, (void *)buf, len);
FREE(buf);
}
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand Down Expand Up @@ -1056,7 +1056,7 @@ int mqtt_publish(struct mqtt_client_t *client, int dub, int qos, int retain, cha
FREE(buf);
}
mqtt_free(&pkt);
uv_custom_write(client->poll_req);
uv_async_send(client->async_req);
}
return 0;
}
Expand Down
24 changes: 24 additions & 0 deletions libs/pilight/core/mqtt-server.c
Expand Up @@ -144,6 +144,10 @@ static void client_close_cb(uv_poll_t *req) {
char buffer[BUFFER_SIZE] = { 0 };

if(client != NULL) {
if(!uv_is_closing((uv_handle_t *)client->async_req)) {
uv_close((uv_handle_t *)client->async_req, close_cb);
}

struct mqtt_subscription_t *subscription = NULL;
while(client->subscriptions) {
subscription = client->subscriptions;
Expand Down Expand Up @@ -338,6 +342,17 @@ static void client_write_cb(uv_poll_t *req) {
uv_custom_read(req);
}

static void do_write(uv_async_t *handle) {
/*
* Make sure we execute in the main thread
*/
const uv_thread_t pth_cur_id = uv_thread_self();
assert(uv_thread_equal(&pth_main_id, &pth_cur_id));

struct mqtt_client_t *client = handle->data;
uv_custom_write(client->poll_req);
}

static void client_read_cb(uv_poll_t *req, ssize_t *nread, char *buf) {
struct uv_custom_poll_t *custom_poll_data = req->data;
struct mqtt_client_t *client = custom_poll_data->data;
Expand Down Expand Up @@ -629,6 +644,9 @@ static void client_read_cb(uv_poll_t *req, ssize_t *nread, char *buf) {
*nread = 0;

if(ret == -1) {
if(!uv_is_closing((uv_handle_t *)client->async_req)) {
uv_close((uv_handle_t *)client->async_req, close_cb);
}
uv_custom_close(req);
} else if(disconnect == 0) {
uv_custom_write(req);
Expand Down Expand Up @@ -727,6 +745,12 @@ static void server_read_cb(uv_poll_t *req, ssize_t *nread, char *buf) {
uv_timer_start(client->timeout_req, timeout, (client->keepalive*1.5)*1000, 0);
#endif

if((client->async_req = MALLOC(sizeof(uv_async_t))) == NULL) {
OUT_OF_MEMORY /*LCOV_EXCL_LINE*/
}
client->async_req->data = client;
uv_async_init(uv_default_loop(), client->async_req, do_write);

uv_custom_poll_init(&custom_poll_data, poll_req, client);

custom_poll_data->read_cb = client_read_cb;
Expand Down
1 change: 1 addition & 0 deletions libs/pilight/core/mqtt.h
Expand Up @@ -106,6 +106,7 @@ typedef struct mqtt_subscription_t {

typedef struct mqtt_client_t {
uv_poll_t *poll_req;
uv_async_t *async_req;
uv_timer_t *ping_req;
uv_timer_t *timeout_req;

Expand Down

0 comments on commit d7e0482

Please sign in to comment.