Skip to content

Commit

Permalink
Only fire CLOUD_CONNECTED events after having subscribed to topics
Browse files Browse the repository at this point in the history
CL: azure, gcp: Only fire CLOUD_CONNECTED events after having subscribed to topics

PUBLISHED_FROM=4500519dc7dcd38e4b581d22f6627a9497b0573f
  • Loading branch information
Deomid Ryabkov authored and cesantabot committed Jan 22, 2019
1 parent a67dda4 commit 762721b
Showing 1 changed file with 44 additions and 24 deletions.
68 changes: 44 additions & 24 deletions src/mgos_gcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ struct jwt_printer_ctx {
};

struct gcp_state {
bool connected;
unsigned int want_acks : 8;
unsigned int have_acks : 8;
unsigned int connected : 8;
mgos_timer_id token_ttl_timer_id;
};

Expand Down Expand Up @@ -190,6 +192,13 @@ static void mgos_gcp_mqtt_connect(struct mg_connection *c,
(void) client_id;
}

static void mgos_gcp_tigger_connected(struct gcp_state *state) {
if (!state->connected || state->have_acks != state->want_acks) return;
struct mgos_cloud_arg arg = {.type = MGOS_CLOUD_GCP};
mgos_event_trigger(MGOS_EVENT_CLOUD_CONNECTED, &arg);
mgos_event_trigger(MGOS_GCP_EV_CONNECT, NULL);
}

static void mgos_gcp_mqtt_ev(struct mg_connection *nc, int ev, void *ev_data,
void *user_data) {
struct gcp_state *state = (struct gcp_state *) user_data;
Expand All @@ -198,16 +207,13 @@ static void mgos_gcp_mqtt_ev(struct mg_connection *nc, int ev, void *ev_data,
case MG_EV_MQTT_CONNACK: {
int code = ((struct mg_mqtt_message *) ev_data)->connack_ret_code;
state->connected = (code == 0);
if (state->connected) {
struct mgos_cloud_arg arg = {.type = MGOS_CLOUD_GCP};
mgos_event_trigger(MGOS_EVENT_CLOUD_CONNECTED, &arg);
mgos_event_trigger(MGOS_GCP_EV_CONNECT, NULL);
}
mgos_gcp_tigger_connected(state);
break;
}
case MG_EV_MQTT_DISCONNECT: {
if (state->connected) {
state->connected = false;
state->have_acks = 0;
struct mgos_cloud_arg arg = {.type = MGOS_CLOUD_GCP};
mgos_event_trigger(MGOS_EVENT_CLOUD_DISCONNECTED, &arg);
mgos_event_trigger(MGOS_GCP_EV_CLOSE, NULL);
Expand Down Expand Up @@ -285,40 +291,52 @@ bool mgos_gcp_send_event_subf(const char *subfolder, const char *json_fmt,
}

bool mgos_gcp_is_connected(void) {
return (s_state != NULL && s_state->connected);
if (s_state == NULL) return false;
return (s_state->connected && s_state->have_acks >= s_state->want_acks);
}

static void mgos_gcp_config_ev(struct mg_connection *nc, const char *topic,
int topic_len, const char *msg, int msg_len,
void *ud) {
static void mgos_gcp_config_ev(struct mg_connection *nc, int ev, void *ev_data,
void *user_data) {
struct gcp_state *state = (struct gcp_state *) user_data;
if (ev == MG_EV_MQTT_SUBACK) {
state->have_acks++;
mgos_gcp_tigger_connected(state);
return;
} else if (ev != MG_EV_MQTT_PUBLISH) {
return;
}
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
struct mgos_gcp_config_arg arg = {
.value = MG_MK_STR_N(msg, msg_len),
.value = mm->payload,
};
LOG(LL_DEBUG, ("Config: '%.*s'", (int) arg.value.len, arg.value.p));
mgos_event_trigger(MGOS_GCP_EV_CONFIG, &arg);
(void) nc;
(void) ud;
(void) topic;
(void) topic_len;
}

static void mgos_gcp_command_ev(struct mg_connection *nc, const char *topic,
int topic_len, const char *msg, int msg_len,
void *ud) {
static void mgos_gcp_command_ev(struct mg_connection *nc, int ev, void *ev_data,
void *user_data) {
struct gcp_state *state = (struct gcp_state *) user_data;
if (ev == MG_EV_MQTT_SUBACK) {
state->have_acks++;
mgos_gcp_tigger_connected(state);
return;
} else if (ev != MG_EV_MQTT_PUBLISH) {
return;
}
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
struct mgos_gcp_command_arg arg = {
.value = MG_MK_STR_N(msg, msg_len),
.value = mm->payload,
};
struct mg_str ts = MG_MK_STR_N(topic, topic_len);
const char *sc = mg_strstr(ts, mg_mk_str("/commands/"));
const char *sc = mg_strstr(mm->topic, mg_mk_str("/commands/"));
if (sc != NULL) {
arg.subfolder.p = sc + 10;
arg.subfolder.len = (ts.p + ts.len) - arg.subfolder.p;
arg.subfolder.len = (mm->topic.p + mm->topic.len) - arg.subfolder.p;
}
LOG(LL_DEBUG, ("Command ('%.*s'): '%.*s'", (int) arg.subfolder.len,
arg.subfolder.p, (int) arg.value.len, arg.value.p));
mgos_event_trigger(MGOS_GCP_EV_COMMAND, &arg);
(void) nc;
(void) ud;
}

bool mgos_gcp_init(void) {
Expand Down Expand Up @@ -365,14 +383,16 @@ bool mgos_gcp_init(void) {
if (mgos_sys_config_get_gcp_enable_config()) {
char *topic = NULL;
mg_asprintf(&topic, 0, "/devices/%.*s/config", (int) did.len, did.p);
mgos_mqtt_sub(topic, mgos_gcp_config_ev, NULL);
mgos_mqtt_global_subscribe(mg_mk_str(topic), mgos_gcp_config_ev, state);
free(topic);
state->want_acks++;
}
if (mgos_sys_config_get_gcp_enable_commands()) {
char *topic = NULL;
mg_asprintf(&topic, 0, "/devices/%.*s/commands/#", (int) did.len, did.p);
mgos_mqtt_sub(topic, mgos_gcp_command_ev, NULL);
mgos_mqtt_global_subscribe(mg_mk_str(topic), mgos_gcp_command_ev, state);
free(topic);
state->want_acks++;
}
return true;
}

0 comments on commit 762721b

Please sign in to comment.