Skip to content

Commit

Permalink
ss: mqtt: add support for retained message
Browse files Browse the repository at this point in the history
  • Loading branch information
Chunho Lee authored and lws-team committed Mar 15, 2022
1 parent 4bf39f5 commit 3af7a16
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 4 deletions.
1 change: 1 addition & 0 deletions include/libwebsockets/lws-mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ typedef struct lws_mqtt_publish_param_s {
0 */
uint8_t dup:1; /* Retried PUBLISH,
for QoS > 0 */
uint8_t retain:1; /* Retained message */
} lws_mqtt_publish_param_t;

typedef struct topic_elem {
Expand Down
1 change: 1 addition & 0 deletions include/libwebsockets/lws-secure-streams-policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ typedef struct lws_ss_policy {
uint8_t birth_qos;
uint8_t birth_retain;
uint8_t aws_iot;
uint8_t retain;

} mqtt;

Expand Down
2 changes: 1 addition & 1 deletion lib/roles/mqtt/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,7 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
* payload (if any)
*/
if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
0, pub->qos, 0)) {
pub->dup, pub->qos, pub->retain)) {
lwsl_err("%s: Failed to fill fixed header\n", __func__);
return 1;
}
Expand Down
4 changes: 4 additions & 0 deletions lib/secure-streams/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,10 @@ Set the topic this streamtype subscribes to

Set the QOS level for this streamtype

### `mqtt_retain`

Set to true if this streamtype should use MQTT's "retain" feature.

### `mqtt_keep_alive`

16-bit number representing MQTT keep alive for the stream.
Expand Down
7 changes: 7 additions & 0 deletions lib/secure-streams/policy-json.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.mqtt_topic",
"s[].*.mqtt_subscribe",
"s[].*.mqtt_qos",
"s[].*.mqtt_retain",
"s[].*.mqtt_keep_alive",
"s[].*.mqtt_clean_start",
"s[].*.mqtt_will_topic",
Expand Down Expand Up @@ -218,6 +219,7 @@ typedef enum {
LSSPPT_MQTT_TOPIC,
LSSPPT_MQTT_SUBSCRIBE,
LSSPPT_MQTT_QOS,
LSSPPT_MQTT_RETAIN,
LSSPPT_MQTT_KEEPALIVE,
LSSPPT_MQTT_CLEAN_START,
LSSPPT_MQTT_WILL_TOPIC,
Expand Down Expand Up @@ -997,6 +999,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
a->curr[LTY_POLICY].p->u.mqtt.qos = (uint8_t)atoi(ctx->buf);
break;

case LSSPPT_MQTT_RETAIN:
a->curr[LTY_POLICY].p->u.mqtt.retain =
reason == LEJPCB_VAL_TRUE;
break;

case LSSPPT_MQTT_KEEPALIVE:
a->curr[LTY_POLICY].p->u.mqtt.keep_alive = (uint16_t)atoi(ctx->buf);
break;
Expand Down
10 changes: 7 additions & 3 deletions lib/secure-streams/protocols/ss-mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ secstream_mqtt_subscribe(struct lws *wsi)
static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
const char* topic,
lws_mqtt_qos_levels_t qos, int f)
lws_mqtt_qos_levels_t qos, uint8_t retain, int f)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
size_t used_in, used_out, topic_limit;
Expand Down Expand Up @@ -188,6 +188,7 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
mqpp.packet_id = (uint16_t)(h->txord - 1);
mqpp.qos = qos;
mqpp.retain = !!retain;
mqpp.payload = buf;
if (h->writeable_len)
mqpp.payload_len = (uint32_t)h->writeable_len;
Expand Down Expand Up @@ -407,7 +408,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
wsi->mqtt->inside_birth = 1;
return secstream_mqtt_publish(wsi, buf + LWS_PRE,
used_out, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM);
h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
LWSSS_FLAG_EOM);
}
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f);
Expand Down Expand Up @@ -435,7 +438,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,

return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos, f);
h->policy->u.mqtt.qos,
h->policy->u.mqtt.retain, f);
}

case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,9 @@ int main(int argc, const char **argv)
if (pol->u.mqtt.aws_iot)
printf("\t\t\t.aws_iot = %u,\n",
pol->u.mqtt.aws_iot);
if (pol->u.mqtt.retain)
printf("\t\t\t.retain = %u,\n",
pol->u.mqtt.retain);

printf("\t\t}\n\t},\n");

Expand Down

0 comments on commit 3af7a16

Please sign in to comment.