Skip to content

Commit

Permalink
libhpfeeds: corrected chunk length field to byte, completed hpclient …
Browse files Browse the repository at this point in the history
…demo utility
  • Loading branch information
tillmannw committed Nov 10, 2011
1 parent fba1076 commit 4d8d495
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 103 deletions.
2 changes: 1 addition & 1 deletion libhpfeeds/include/hpfeeds.h
Expand Up @@ -36,7 +36,7 @@ typedef struct {
} hpf_msg_t;

typedef struct {
u_int32_t len;
u_char len;
u_char data[];
} hpf_chunk_t;

Expand Down
2 changes: 2 additions & 0 deletions libhpfeeds/tools/Makefile.am
@@ -1,6 +1,8 @@
AM_CFLAGS = -I../include -Werror -Wall -g -static
AM_LDFLAGS = -lhpfeeds -L../src

bin_PROGRAMS = hpclient

hpclient_LDFLAGS = ${AM_LDFLAGS}
hpclient_LDADD = ../src/libhpfeeds.la
hpclient_SOURCES = hpclient.c util.c
Expand Down
278 changes: 247 additions & 31 deletions libhpfeeds/tools/hpclient.c
Expand Up @@ -17,50 +17,266 @@
*/

#include <arpa/inet.h>
#include <getopt.h>
#include <hpfeeds.h>
#include <netdb.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "util.h"

char *msg_type2str(u_int32_t type) {
switch (type) {
case 0: return "OP_ERROR";
case 1: return "OP_INFO";
case 2: return "OP_AUTH";
case 3: return "OP_PUBLISH";
case 4: return "OP_SUBSCRIBE";
typedef enum {
S_INIT,
S_GOT_INFO,
S_AUTH_SENT,
S_RECV_MSGS,
S_ERROR,
S_TERMINATE
} session_state_t;

session_state_t session_state; // global session state


u_char *read_msg(int s) {
u_char *buffer;
u_int32_t msglen;

if (read(s, &msglen, 4) == -1) {
perror("read()");
exit(EXIT_FAILURE);
}

if ((buffer = malloc(msglen)) == NULL) {
perror("malloc()");
exit(EXIT_FAILURE);
}
return "unknonwn";

*(u_int32_t *) buffer = msglen;

if (read(s, buffer + 4, msglen - 4) == -1) {
perror("read()");
exit(EXIT_FAILURE);
}

return buffer;
}

void sigh(int sig) {
switch (sig) {
case SIGINT:
if (session_state != S_TERMINATE) {
if (write(STDOUT_FILENO, "\rSIGINT, signal again to terminate now.\n", 40) == -1) {
perror("write()");
exit(EXIT_FAILURE);
}
session_state = S_TERMINATE;
} else {
exit(EXIT_SUCCESS);
}
break;
default:
break;
}
return;
}

int main(int argc, char *argv[]) {
hpf_msg_t *msg;
hpf_chunk_t *chunk;
u_char *data;
char *errmsg, *channel, *ident, *secret;
int s, opt;
struct hostent *he;
struct sockaddr_in host;
u_int32_t nonce = 0;
u_int32_t payload_len;

channel = ident = secret = NULL;

memset(&host, 0, sizeof(struct sockaddr_in));
host.sin_family = AF_INET;

while ((opt = getopt(argc, argv, "c:h:i:p:s:")) != -1) {
switch (opt) {
case 'c':
channel = optarg;
break;
case 'h':
if ((he = gethostbyname(optarg)) == NULL) {
perror("gethostbyname()");
exit(EXIT_FAILURE);
}

if (he->h_addrtype != AF_INET) {
fprintf(stderr, "Unsupported address type\n");
exit(EXIT_FAILURE);
}

host.sin_addr = *(struct in_addr *) he->h_addr;

break;
case 'i':
ident = optarg;
break;
case 'p':
host.sin_port = htons(strtoul(optarg, 0, 0));

msg = hpf_msg_error((u_char *) "failure", 7);
printf("message type %u (%s):\n", hpf_msg_gettype(msg), msg_type2str(hpf_msg_gettype(msg)));
hd((u_char *) msg, ntohl(msg->hdr.msglen));
if (msg) hpf_msg_delete(msg);
break;
case 's':
secret = optarg;
break;
default:
printf("Usage: %s -h host -p port -c channel -i ident -s secret\n", argv[0]);
exit(EXIT_FAILURE);
}
}

if (!channel || !ident || !secret || host.sin_addr.s_addr == INADDR_ANY || host.sin_port == 0) {
printf("Usage: %s -h host -p port -c channel -i ident -s secret\n", argv[0]);
exit(EXIT_FAILURE);
}

// install sigint handler
if (signal(SIGINT, sigh) == SIG_ERR) {
perror("signal()");
exit(EXIT_FAILURE);
}

// connect to broker
if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
perror("socket()");
exit(EXIT_FAILURE);
}
printf("connecting to %s:%u\n", inet_ntoa(host.sin_addr), ntohs(host.sin_port));
if (connect(s, (struct sockaddr *) &host, sizeof(host)) == -1) {
perror("connect()");
exit(EXIT_FAILURE);
}

session_state = S_INIT; // initial session state

// this is our little session state machine
for (;;) switch (session_state) {
case S_INIT:
// read info message
if ((data = read_msg(s)) == NULL) break;
msg = (hpf_msg_t *) data;

switch (msg->hdr.opcode) {
case OP_INFO:

chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr), ntohl(msg->hdr.msglen) - sizeof(msg->hdr));
if (chunk == NULL) {
fprintf(stderr, "invalid message format\n");
exit(EXIT_FAILURE);
}

nonce = *(u_int32_t *) (data + sizeof(msg->hdr) + chunk->len + 1);

session_state = S_GOT_INFO;
break;
case OP_ERROR:
session_state = S_ERROR;
break;
default:
fprintf(stderr, "unknown server message (type %u)\n", msg->hdr.opcode);
exit(EXIT_FAILURE);
}

free(data);

break;
case S_GOT_INFO:
// send auth message
printf("sending authentication...\n");
msg = hpf_msg_auth(nonce, (u_char *) ident, strlen(ident), (u_char *) secret, strlen(secret));

if (write(s, (u_char *) msg, ntohl(msg->hdr.msglen)) == -1) {
perror("write()");
exit(EXIT_FAILURE);
}
hpf_msg_delete(msg);

msg = hpf_msg_info(123456789, (u_char *) "@hp1", 4);
printf("message type %u (%s):\n", hpf_msg_gettype(msg), msg_type2str(hpf_msg_gettype(msg)));
hd((u_char *) msg, ntohl(msg->hdr.msglen));
if (msg) hpf_msg_delete(msg);

msg = hpf_msg_auth(987654321, (u_char *) "test@hp1", 8, (u_char *) "secret", 6);
printf("message type %u (%s):\n", hpf_msg_gettype(msg), msg_type2str(hpf_msg_gettype(msg)));
hd((u_char *) msg, ntohl(msg->hdr.msglen));
if (msg) hpf_msg_delete(msg);
session_state = S_AUTH_SENT;
break;
case S_AUTH_SENT:
// send subscribe message
printf("subscribing to channel...\n");
msg = hpf_msg_subscribe((u_char *) ident, strlen(ident), (u_char *) channel, strlen(channel));

if (write(s, (u_char *) msg, ntohl(msg->hdr.msglen)) == -1) {
perror("write()");
exit(EXIT_FAILURE);
}
hpf_msg_delete(msg);

msg = hpf_msg_publish((u_char *) "test@hp1", 8, (u_char *) "testchannel", 11, (u_char *) "this is a test", 14);
printf("message type %u (%s):\n", hpf_msg_gettype(msg), msg_type2str(hpf_msg_gettype(msg)));
hd((u_char *) msg, ntohl(msg->hdr.msglen));
if (msg) hpf_msg_delete(msg);

msg = hpf_msg_subscribe((u_char *) "test@hp1", 8, (u_char *) "testchannel", 11);
printf("message type %u (%s):\n", hpf_msg_gettype(msg), msg_type2str(hpf_msg_gettype(msg)));
hd((u_char *) msg, ntohl(msg->hdr.msglen));
if (msg) hpf_msg_delete(msg);
session_state = S_RECV_MSGS;
break;
case S_RECV_MSGS:
// read server message
if ((data = read_msg(s)) == NULL) break;
msg = (hpf_msg_t *) data;

switch (msg->hdr.opcode) {
case OP_PUBLISH:
// skip chunks
payload_len = hpf_msg_getsize(msg) - sizeof(msg->hdr);

chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr), ntohl(msg->hdr.msglen) - sizeof(msg->hdr));
if (chunk == NULL) {
fprintf(stderr, "invalid message format\n");
exit(EXIT_FAILURE);
}
payload_len -= chunk->len + 1;

chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr) + chunk->len + 1, ntohl(msg->hdr.msglen) - sizeof(msg->hdr) - chunk->len - 1);
if (chunk == NULL) {
fprintf(stderr, "invalid message format\n");
exit(EXIT_FAILURE);
}
payload_len -= chunk->len + 1;

if (write(STDOUT_FILENO, data + hpf_msg_getsize(msg) - payload_len, payload_len) == -1) {
perror("write()");
exit(EXIT_FAILURE);
}
putchar('\n');

// we just remain in S_SUBSCRIBED
break;
case OP_ERROR:
if ((errmsg = calloc(1, msg->hdr.msglen - sizeof(msg->hdr))) == NULL) {
perror("calloc()");
exit(EXIT_FAILURE);
}
memcpy(errmsg, msg->data, msg->hdr.msglen - sizeof(msg->hdr));

fprintf(stderr, "server error: '%s'\n", errmsg);
free(errmsg);

session_state = S_TERMINATE;
break;
default:
fprintf(stderr, "unknown server message (type %u)\n", msg->hdr.opcode);
exit(EXIT_FAILURE);
}

free(data);

break;
case S_TERMINATE:
printf("terminated.\n");
close(s);
return EXIT_SUCCESS;
default:
fprintf(stderr, "unknown session state\n");
close(s);
exit(EXIT_FAILURE);
}

close(s);

return EXIT_SUCCESS;
}
47 changes: 0 additions & 47 deletions libhpfeeds/tools/util.c

This file was deleted.

24 changes: 0 additions & 24 deletions libhpfeeds/tools/util.h

This file was deleted.

0 comments on commit 4d8d495

Please sign in to comment.