Skip to content

Commit

Permalink
Converted k8055mqtt to use new version 1.x of mosquitto client library
Browse files Browse the repository at this point in the history
  • Loading branch information
njh committed Dec 22, 2012
1 parent cc1f826 commit 3c3f594
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 35 deletions.
3 changes: 3 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ AC_FUNC_REALLOC
dnl ############## Check for packages we depend upon

PKG_CHECK_MODULES(LIBUSB, libusb-1.0 >= 1.0.8)

AC_CHECK_LIB([mosquitto], [mosquitto_lib_init],, AC_ERROR([Misssing mosquitto client library]))
AC_CHECK_FUNC([mosquitto_connack_string],, AC_ERROR([Misssing the mosquitto_connack_string function]))
AC_CHECK_FUNC([mosquitto_log_callback_set],, AC_ERROR([Misssing the mosquitto_log_callback_set function]))
AC_CHECK_HEADERS([mosquitto.h], [], [AC_ERROR([Missing mosquitto header file])])


Expand Down
52 changes: 17 additions & 35 deletions src/k8055mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,48 +25,28 @@ k8055_t *dev = NULL;



static void connect_callback(void *obj, int result)
static void connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
if(!result){
if(!rc){
printf("Connected to MQTT server.\n");
mqtt_connected = 1;
} else {
switch(result) {
case 0x01:
fprintf(stderr, "Connection Refused: unacceptable protocol version\n");
break;
case 0x02:
fprintf(stderr, "Connection Refused: identifier rejected\n");
break;
case 0x03:
// FIXME: if broker is unavailable, sleep and try connecting again
fprintf(stderr, "Connection Refused: broker unavailable\n");
break;
case 0x04:
fprintf(stderr, "Connection Refused: bad user name or password\n");
break;
case 0x05:
fprintf(stderr, "Connection Refused: not authorised\n");
break;
default:
fprintf(stderr, "Connection Refused: unknown reason\n");
break;
}

const char *str = mosquitto_connack_string(rc);
fprintf(stderr, "Connection Refused: %s\n", str);
mqtt_connected = 0;
}
}


static void disconnect_callback(void *obj)
static void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
mqtt_connected = 0;

// FIXME: re-establish the connection
// FIXME: keep count of re-connects
}

static void message_callback(void *userdata, const struct mosquitto_message *mesg)
static void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *mesg)
{
size_t prefix_len = strlen(mqtt_prefix);
size_t topic_len = 0;
Expand Down Expand Up @@ -115,11 +95,14 @@ static void message_callback(void *userdata, const struct mosquitto_message *mes
}
}

static void log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
{
printf("LOG: %s\n", str);
}

static mqtt_subscribe(struct mosquitto *mosq, const char* topic)
{
uint16_t mid;
int res;
int mid, res;

res = mosquitto_subscribe(mosq, &mid, topic, 0);
// FIXME: check for success
Expand All @@ -133,23 +116,22 @@ static struct mosquitto * mqtt_initialise(const char* host, int port)
int res = 0;

// Create a new MQTT client
mosq = mosquitto_new(mqtt_client_id, NULL);
mosq = mosquitto_new(mqtt_client_id, true, NULL);
if (!mosq) {
fprintf(stderr, "Failed to initialise MQTT client.\n");
return NULL;
}

// Configure logging
mosquitto_log_init(mosq, MOSQ_LOG_INFO | MOSQ_LOG_WARNING | MOSQ_LOG_ERR, MOSQ_LOG_STDOUT);

// FIXME: add support for username and password

// Setup callbacks
mosquitto_log_callback_set(mosq, log_callback);
mosquitto_connect_callback_set(mosq, connect_callback);
mosquitto_disconnect_callback_set(mosq, disconnect_callback);
mosquitto_message_callback_set(mosq, message_callback);

printf("Connecting to %s:%d...\n", host, port);
res = mosquitto_connect(mosq, host, port, DEFAULT_KEEP_ALIVE, 1);
res = mosquitto_connect(mosq, host, port, DEFAULT_KEEP_ALIVE);
if (res) {
fprintf(stderr, "Unable to connect (%d).\n", res);
mosquitto_destroy(mosq);
Expand All @@ -159,7 +141,7 @@ static struct mosquitto * mqtt_initialise(const char* host, int port)
// Wait until connected
while (!mqtt_connected) {
// FIXME: add timeout
int res = mosquitto_loop(mosq, 500);
int res = mosquitto_loop(mosq, 500, 1);
if (res != MOSQ_ERR_SUCCESS) exit(EXIT_FAILURE);
}

Expand Down Expand Up @@ -250,7 +232,7 @@ int main(int argc, char **argv)

while (keep_running) {
// Wait for network packets for a maximum of 0.5s
res = mosquitto_loop(mosq, 500);
res = mosquitto_loop(mosq, 500, 1);

// Write output changes and poll for inputs
k8055_device_poll(dev);
Expand Down

0 comments on commit 3c3f594

Please sign in to comment.