Skip to content
Browse files

Merge branch 'master' of github.com:rep/hpfeeds

  • Loading branch information...
2 parents 34be135 + 52fa4df commit f18712dd62255654c8f8d695761cce384fa3592f @rep committed May 22, 2015
Showing with 433 additions and 284 deletions.
  1. +10 −0 README.md
  2. +345 −284 appsupport/libhpfeeds/tools/hpclient.c
  3. +31 −0 broker/README.md
  4. +37 −0 broker/add_user.py
  5. +10 −0 broker/dump_users.py
View
10 README.md
@@ -10,6 +10,7 @@ This is the reference implementation repository. By now hpfeeds exists in other
- Ruby: https://github.com/fw42/hpfeedsrb
- More Ruby: https://github.com/vicvega/hpfeeds-ruby
- JS (within node.js): https://github.com/fw42/honeymap/blob/master/server/node_modules/hpfeeds/index.js
+ - C++: https://github.com/tentacool/tentacool
## About
hpfeeds is a lightweight authenticated publish-subscribe protocol that supports arbitrary binary payloads.
@@ -22,6 +23,15 @@ Access to channels is given to so-called Authkeys which essentially are pairs of
To support multiple data sources and sinks per user we manage the Authkeys in this webinterface after a quick login with a user account. User accounts are only needed for the webinterface - to use the data feed channels, only Authkeys are necessary. Different Authkeys can be granted distinct access rights for channels.
+## Installation
+
+```
+git clone https://github.com/rep/hpfeeds/
+cd hpfeeds
+python setup.py build
+python setup.py install
+```
+
## Wire Protocol
Each message carries a message header. The message types can make use of "parameters" that are being sent as (length,data) pairs.
View
629 appsupport/libhpfeeds/tools/hpclient.c
@@ -25,6 +25,8 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
+#include <stdbool.h>
+#include <time.h>
#define MAXLEN 1000000
#define READ_BLOCK_SIZE 32767
@@ -39,322 +41,381 @@ S_ERROR,
S_TERMINATE
} session_state_t;
-session_state_t session_state; // global session state
+session_state_t session_state; // global session state
typedef enum {
C_SUBSCRIBE,
C_PUBLISH,
C_UNKNOWN } cmd_t;
-u_char *read_msg(int s) {
- u_char *buffer;
- u_int32_t msglen;
- int len;
- int templen;
- char tempbuf[READ_BLOCK_SIZE];
+unsigned totmsgs = 0;
- if (read(s, &msglen, 4) != 4) {
- perror("read()");
- exit(EXIT_FAILURE);
- }
+u_char *read_msg(int s) {
+ u_char *buffer;
+ u_int32_t msglen;
+ int len;
+ int templen;
+ int readlen;
+ char tempbuf[READ_BLOCK_SIZE];
+
+ if (read(s, &msglen, 4) != 4) {
+ perror("read()");
+ exit(EXIT_FAILURE);
+ }
- if ((buffer = malloc(ntohl(msglen))) == NULL) {
- perror("malloc()");
- exit(EXIT_FAILURE);
- }
+ if ((buffer = malloc(ntohl(msglen))) == NULL) {
+ perror("malloc()");
+ exit(EXIT_FAILURE);
+ }
- *(u_int32_t *) buffer = msglen;
- msglen = ntohl(msglen);
+ *(u_int32_t *) buffer = msglen;
+ msglen = ntohl(msglen);
len = 4;
templen = len;
while ((templen > 0) && (len < msglen)) {
- templen = read(s, tempbuf, READ_BLOCK_SIZE);
+ readlen = (msglen - 4 < READ_BLOCK_SIZE ? msglen - 4 : READ_BLOCK_SIZE);
+ templen = read(s, tempbuf, readlen);
memcpy(buffer + len, tempbuf, templen);
len += templen;
}
- if (len != msglen) {
- perror("read()");
- exit(EXIT_FAILURE);
- }
+ if (len != msglen) {
+ perror("read()");
+ exit(EXIT_FAILURE);
+ }
- return buffer;
+ return buffer;
}
void sigh(int sig) {
- switch (sig) {
- case SIGINT:
- if (session_state != S_TERMINATE) {
- if (write(STDOUT_FILENO, "\rSIGINT, signal again to terminate now.\n", 40) == -1) {
- perror("write()");
- exit(EXIT_FAILURE);
- }
- session_state = S_TERMINATE;
- } else {
- exit(EXIT_SUCCESS);
- }
- break;
- default:
- break;
- }
- return;
+ switch (sig) {
+ case SIGINT:
+ if (session_state != S_TERMINATE) {
+ if (write(STDOUT_FILENO, "\rSIGINT, signal again to terminate now.\n", 40) == -1) {
+ perror("write()");
+ exit(EXIT_FAILURE);
+ }
+ session_state = S_TERMINATE;
+ } else {
+ exit(EXIT_SUCCESS);
+ }
+ break;
+ default:
+ break;
+ }
+ return;
}
void usage(char *argv0) {
- fprintf(stderr, "Usage: %s -h host -p port [ -S | -P ] -c channel -i ident -s secret\n", argv0);
+ fprintf(stderr, "Usage: %s -h host -p port [ -S | -P ] -c channel -i ident -s secret [-t times | -f] [-b] [-d delay]\n", argv0);
fprintf(stderr, " -S subscribe to channel, print msg to stdout\n");
fprintf(stderr, " -P publish to channel, read msg from stdin\n");
+ fprintf(stderr, " -t times repeats the message\n");
+ fprintf(stderr, " -f repeats the message forever\n");
+ fprintf(stderr, " -b run the benchmark instead of printing\n");
+ fprintf(stderr, " -d delay wait time between messages (msec)\n");
+}
+
+void print_benchmark(int signo)
+{
+ printf("\rProcessing %u msgs/s %c%c%c%c", totmsgs, 8, 8, 8, 8);
+ fflush(stdout);
+ totmsgs = 0;
+ alarm(1);
}
int main(int argc, char *argv[]) {
- cmd_t hpfdcmd;
- hpf_msg_t *msg;
- hpf_chunk_t *chunk;
- u_char *data;
- char *errmsg, *channel, *ident, *secret;
- int s, opt;
- struct hostent *he;
- struct sockaddr_in host;
- u_int32_t nonce = 0;
- u_int32_t payload_len;
- u_char* buf;
- int len;
- int templen;
- char tempbuf[READ_BLOCK_SIZE];
-
- buf = (u_char*)malloc(sizeof(u_char) * MAXLEN);
-
- hpfdcmd = C_UNKNOWN;
- channel = NULL;
- ident = NULL;
- secret = NULL;
- msg = NULL;
-
- memset(&host, 0, sizeof(struct sockaddr_in));
- host.sin_family = AF_INET;
-
- while ((opt = getopt(argc, argv, "SPc:h:i:p:s:")) != -1) {
- switch (opt) {
- case 'S':
- hpfdcmd = C_SUBSCRIBE;
- break;
- case 'P':
- hpfdcmd = C_PUBLISH;
- break;
- case 'c':
- channel = optarg;
- break;
- case 'h':
- if ((he = gethostbyname(optarg)) == NULL) {
- perror("gethostbyname()");
- exit(EXIT_FAILURE);
- }
-
- if (he->h_addrtype != AF_INET) {
- fprintf(stderr, "Unsupported address type\n");
- exit(EXIT_FAILURE);
- }
-
- host.sin_addr = *(struct in_addr *) he->h_addr;
-
- break;
- case 'i':
- ident = optarg;
- break;
- case 'p':
- host.sin_port = htons(strtoul(optarg, 0, 0));
-
- break;
- case 's':
- secret = optarg;
- break;
- default:
- usage(argv[0]);
- exit(EXIT_FAILURE);
- }
- }
-
- if (hpfdcmd == C_UNKNOWN || !channel || !ident || !secret || host.sin_addr.s_addr == INADDR_ANY || host.sin_port == 0) {
- usage(argv[0]);
- exit(EXIT_FAILURE);
- }
-
- // install sigint handler
- if (signal(SIGINT, sigh) == SIG_ERR) {
- perror("signal()");
- exit(EXIT_FAILURE);
- }
-
- // connect to broker
- if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
- perror("socket()");
- exit(EXIT_FAILURE);
- }
- fprintf(stderr, "connecting to %s:%u\n", inet_ntoa(host.sin_addr), ntohs(host.sin_port));
- if (connect(s, (struct sockaddr *) &host, sizeof(host)) == -1) {
- perror("connect()");
- exit(EXIT_FAILURE);
- }
-
- session_state = S_INIT; // initial session state
-
- // this is our little session state machine
- for (;;) switch (session_state) {
- case S_INIT:
- // read info message
- if ((data = read_msg(s)) == NULL) break;
- msg = (hpf_msg_t *) data;
-
- switch (msg->hdr.opcode) {
- case OP_INFO:
-
- chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr), ntohl(msg->hdr.msglen) - sizeof(msg->hdr));
- if (chunk == NULL) {
- fprintf(stderr, "invalid message format\n");
- exit(EXIT_FAILURE);
- }
-
- nonce = *(u_int32_t *) (data + sizeof(msg->hdr) + chunk->len + 1);
-
- session_state = S_AUTH;
-
- free(data);
-
- break;
- case OP_ERROR:
- session_state = S_ERROR;
- break;
- default:
- fprintf(stderr, "unknown server message (type %u)\n", msg->hdr.opcode);
- exit(EXIT_FAILURE);
- }
-
- break;
- case S_AUTH:
- // send auth message
- fprintf(stderr, "sending authentication...\n");
- msg = hpf_msg_auth(nonce, (u_char *) ident, strlen(ident), (u_char *) secret, strlen(secret));
-
- if (write(s, (u_char *) msg, ntohl(msg->hdr.msglen)) == -1) {
- perror("write()");
- exit(EXIT_FAILURE);
- }
- hpf_msg_delete(msg);
-
- if (hpfdcmd == C_SUBSCRIBE)
- session_state = S_SUBSCRIBE;
- else
- session_state = S_PUBLISH;
- break;
- case S_SUBSCRIBE:
- // send subscribe message
- fprintf(stderr, "subscribing to channel...\n");
- msg = hpf_msg_subscribe((u_char *) ident, strlen(ident), (u_char *) channel, strlen(channel));
-
- if (write(s, (u_char *) msg, ntohl(msg->hdr.msglen)) == -1) {
- perror("write()");
- exit(EXIT_FAILURE);
- }
- hpf_msg_delete(msg);
-
- session_state = S_RECVMSGS;
- break;
- case S_RECVMSGS:
- // read server message
- if ((data = read_msg(s)) == NULL) break;
- msg = (hpf_msg_t *) data;
-
- switch (msg->hdr.opcode) {
- case OP_PUBLISH:
- // skip chunks
- payload_len = hpf_msg_getsize(msg) - sizeof(msg->hdr);
-
- chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr), ntohl(msg->hdr.msglen) - sizeof(msg->hdr));
- if (chunk == NULL) {
- fprintf(stderr, "invalid message format\n");
- exit(EXIT_FAILURE);
- }
- payload_len -= chunk->len + 1;
-
- chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr) + chunk->len + 1, ntohl(msg->hdr.msglen) - sizeof(msg->hdr) - chunk->len - 1);
- if (chunk == NULL) {
- fprintf(stderr, "invalid message format\n");
- exit(EXIT_FAILURE);
- }
- payload_len -= chunk->len + 1;
-
- if (write(STDOUT_FILENO, data + hpf_msg_getsize(msg) - payload_len, payload_len) == -1) {
- perror("write()");
- exit(EXIT_FAILURE);
- }
- putchar('\n');
-
- free(data);
-
- // we just remain in S_SUBSCRIBED
- break;
- case OP_ERROR:
- session_state = S_ERROR;
- break;
- default:
- fprintf(stderr, "unknown server message (type %u)\n", msg->hdr.opcode);
- exit(EXIT_FAILURE);
- }
-
- break;
- case S_PUBLISH:
- // send publish message
- len = 0;
- templen = 0;
- memset(tempbuf, 0x0, READ_BLOCK_SIZE);
- while ((templen = read(STDIN_FILENO, tempbuf, READ_BLOCK_SIZE)) > 0 && len < MAXLEN) {
- memcpy(buf + len, tempbuf, templen);
- len += templen;
- if(buf[len - 1] == '\n') {
- buf[len - 1] = 0;
- len --;
- }
- }
- fprintf(stderr, "publish %d bytes to channel...\n", len);
- msg = hpf_msg_publish((u_char *) ident, strlen(ident), (u_char *) channel, strlen(channel),buf,len);
- if (write(s, (u_char *) msg, ntohl(msg->hdr.msglen)) == -1) {
- perror("write()");
- exit(EXIT_FAILURE);
- }
- close(s);
- hpf_msg_delete(msg);
- exit(EXIT_SUCCESS);
- break;
- break;
- case S_ERROR:
- if (msg) {
- // msg is still valid
- if ((errmsg = calloc(1, msg->hdr.msglen - sizeof(msg->hdr))) == NULL) {
- perror("calloc()");
- exit(EXIT_FAILURE);
- }
- memcpy(errmsg, msg->data, ntohl(msg->hdr.msglen) - sizeof(msg->hdr));
+ cmd_t hpfdcmd;
+ hpf_msg_t *msg;
+ hpf_chunk_t *chunk;
+ u_char *data;
+ char *errmsg, *channel, *ident, *secret;
+ int s, opt;
+ struct hostent *he;
+ struct sockaddr_in host;
+ u_int32_t nonce = 0;
+ u_int32_t payload_len;
+ u_char* buf;
+ int len;
+ int templen;
+ char tempbuf[READ_BLOCK_SIZE];
+ u_int32_t times = 1;
+ int i;
+ bool benchmark = false;
+ struct timespec delay = { .tv_sec = 0, .tv_nsec = 0 };
+ bool have_delay = false;
+ unsigned ret = 0;
+
+ buf = (u_char*)malloc(sizeof(u_char) * MAXLEN);
+
+ hpfdcmd = C_UNKNOWN;
+ channel = NULL;
+ ident = NULL;
+ secret = NULL;
+ msg = NULL;
+
+ memset(&host, 0, sizeof(struct sockaddr_in));
+ host.sin_family = AF_INET;
+
+ while ((opt = getopt(argc, argv, "SPc:h:i:p:s:t:fbd:")) != -1) {
+ switch (opt) {
+ case 'S':
+ hpfdcmd = C_SUBSCRIBE;
+ break;
+ case 'P':
+ hpfdcmd = C_PUBLISH;
+ break;
+ case 'c':
+ channel = optarg;
+ break;
+ case 'h':
+ if ((he = gethostbyname(optarg)) == NULL) {
+ perror("gethostbyname()");
+ exit(EXIT_FAILURE);
+ }
+
+ if (he->h_addrtype != AF_INET) {
+ fprintf(stderr, "Unsupported address type\n");
+ exit(EXIT_FAILURE);
+ }
+
+ host.sin_addr = *(struct in_addr *) he->h_addr;
+
+ break;
+ case 'i':
+ ident = optarg;
+ break;
+ case 'p':
+ host.sin_port = htons(strtoul(optarg, 0, 0));
+
+ break;
+ case 's':
+ secret = optarg;
+ break;
+ case 't':
+ times = strtol(optarg, NULL, 10);
+ break;
+ case 'f':
+ times = -1;
+ break;
+ case 'b':
+ benchmark = true;
+ break;
+ case 'd':
+ have_delay = true;
+ i = strtol(optarg, NULL, 10);
+ delay.tv_sec = i / 1000;
+ delay.tv_nsec = (i % 1000) * 1000;
+ printf("Setting delay: %lus %lums\n", delay.tv_sec, delay.tv_nsec / 1000);
+ break;
+ default:
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+ }
- fprintf(stderr, "server error: '%s'\n", errmsg);
- free(errmsg);
+ if (benchmark) {
+ printf("Running in benchmark mode\n");
+ signal(SIGALRM, print_benchmark);
+ alarm(1);
+ }
+
+ if (hpfdcmd == C_UNKNOWN || !channel || !ident || !secret || host.sin_addr.s_addr == INADDR_ANY || host.sin_port == 0) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ // install sigint handler
+ if (signal(SIGINT, sigh) == SIG_ERR) {
+ perror("signal()");
+ exit(EXIT_FAILURE);
+ }
- free(msg);
- }
+ // connect to broker
+ if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
+ perror("socket()");
+ exit(EXIT_FAILURE);
+ }
+ fprintf(stderr, "connecting to %s:%u\n", inet_ntoa(host.sin_addr), ntohs(host.sin_port));
+ if (connect(s, (struct sockaddr *) &host, sizeof(host)) == -1) {
+ perror("connect()");
+ exit(EXIT_FAILURE);
+ }
- session_state = S_TERMINATE;
- break;
- case S_TERMINATE:
- fprintf(stderr, "terminated.\n");
- close(s);
- return EXIT_SUCCESS;
- default:
- fprintf(stderr, "unknown session state\n");
- close(s);
- exit(EXIT_FAILURE);
- }
+ session_state = S_INIT; // initial session state
+
+ // this is our little session state machine
+ for (;;) switch (session_state) {
+ case S_INIT:
+ // read info message
+ if ((data = read_msg(s)) == NULL) break;
+ msg = (hpf_msg_t *) data;
+
+ switch (msg->hdr.opcode) {
+ case OP_INFO:
+
+ chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr), ntohl(msg->hdr.msglen) - sizeof(msg->hdr));
+ if (chunk == NULL) {
+ fprintf(stderr, "invalid message format\n");
+ exit(EXIT_FAILURE);
+ }
+
+ nonce = *(u_int32_t *) (data + sizeof(msg->hdr) + chunk->len + 1);
+
+ session_state = S_AUTH;
+
+ free(data);
+
+ break;
+ case OP_ERROR:
+ session_state = S_ERROR;
+ break;
+ default:
+ fprintf(stderr, "unknown server message (type %u)\n", msg->hdr.opcode);
+ exit(EXIT_FAILURE);
+ }
+
+ break;
+ case S_AUTH:
+ // send auth message
+ fprintf(stderr, "sending authentication...\n");
+ msg = hpf_msg_auth(nonce, (u_char *) ident, strlen(ident), (u_char *) secret, strlen(secret));
+
+ if (write(s, (u_char *) msg, ntohl(msg->hdr.msglen)) == -1) {
+ perror("write()");
+ exit(EXIT_FAILURE);
+ }
+ hpf_msg_delete(msg);
+
+ if (hpfdcmd == C_SUBSCRIBE)
+ session_state = S_SUBSCRIBE;
+ else
+ session_state = S_PUBLISH;
+ break;
+ case S_SUBSCRIBE:
+ // send subscribe message
+ fprintf(stderr, "subscribing to channel...\n");
+ msg = hpf_msg_subscribe((u_char *) ident, strlen(ident), (u_char *) channel, strlen(channel));
+
+ if (write(s, (u_char *) msg, ntohl(msg->hdr.msglen)) == -1) {
+ perror("write()");
+ exit(EXIT_FAILURE);
+ }
+ hpf_msg_delete(msg);
+
+ session_state = S_RECVMSGS;
+ break;
+ case S_RECVMSGS:
+ // read server message
+ if ((data = read_msg(s)) == NULL) break;
+ msg = (hpf_msg_t *) data;
+
+ switch (msg->hdr.opcode) {
+ case OP_PUBLISH:
+ // skip chunks
+ payload_len = hpf_msg_getsize(msg) - sizeof(msg->hdr);
+
+ chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr), ntohl(msg->hdr.msglen) - sizeof(msg->hdr));
+ if (chunk == NULL) {
+ fprintf(stderr, "invalid message format\n");
+ exit(EXIT_FAILURE);
+ }
+ payload_len -= chunk->len + 1;
+
+ chunk = hpf_msg_get_chunk(data + sizeof(msg->hdr) + chunk->len + 1, ntohl(msg->hdr.msglen) - sizeof(msg->hdr) - chunk->len - 1);
+ if (chunk == NULL) {
+ fprintf(stderr, "invalid message format\n");
+ exit(EXIT_FAILURE);
+ }
+ payload_len -= chunk->len + 1;
+
+ if (!benchmark) {
+ if (write(STDOUT_FILENO, data + hpf_msg_getsize(msg) - payload_len, payload_len) == -1) {
+ perror("write()");
+ exit(EXIT_FAILURE);
+ }
+ dprintf(STDOUT_FILENO, "\n");
+ } else {
+ totmsgs++;
+ }
+ free(data);
+
+ // we just remain in S_SUBSCRIBED
+ break;
+ case OP_ERROR:
+ session_state = S_ERROR;
+ break;
+ default:
+ fprintf(stderr, "unknown server message (type %u)\n", msg->hdr.opcode);
+ exit(EXIT_FAILURE);
+ }
+
+ break;
+ case S_PUBLISH:
+ // send publish message
+ len = 0;
+ templen = 0;
+ memset(tempbuf, 0x0, READ_BLOCK_SIZE);
+ while ((templen = read(STDIN_FILENO, tempbuf, READ_BLOCK_SIZE)) > 0 && len < MAXLEN) {
+ memcpy(buf + len, tempbuf, templen);
+ len += templen;
+ if(buf[len - 1] == '\n') {
+ buf[len - 1] = 0;
+ len --;
+ }
+ }
+ fprintf(stderr, "publish %d bytes to channel for %u times...\n", len, times);
+ for (i = 0; i < times; i++) {
+ msg = hpf_msg_publish((u_char *) ident, strlen(ident), (u_char *) channel, strlen(channel),buf,len);
+ ret = write(s, (u_char *) msg, ntohl(msg->hdr.msglen));
+ if (ret == -1) {
+ perror("write()");
+ exit(EXIT_FAILURE);
+ }
+ if (ret < ntohl(msg->hdr.msglen)) {
+ if (write(s, (u_char *)msg + ret, ntohl(msg->hdr.msglen) - ret) == -1) {
+ perror("write()");
+ exit(EXIT_FAILURE);
+ }
+ hpf_msg_delete(msg);
+ if (have_delay) {
+ nanosleep(&delay, NULL);
+ }
+ }
+ totmsgs++;
+ }
+ close(s);
+ exit(EXIT_SUCCESS);
+ break;
+ case S_ERROR:
+ if (msg) {
+ // msg is still valid
+ if ((errmsg = calloc(1, msg->hdr.msglen - sizeof(msg->hdr))) == NULL) {
+ perror("calloc()");
+ exit(EXIT_FAILURE);
+ }
+ memcpy(errmsg, msg->data, ntohl(msg->hdr.msglen) - sizeof(msg->hdr));
+
+ fprintf(stderr, "server error: '%s'\n", errmsg);
+ free(errmsg);
+
+ free(msg);
+ }
+
+ session_state = S_TERMINATE;
+ break;
+ case S_TERMINATE:
+ fprintf(stderr, "terminated.\n");
+ close(s);
+ return EXIT_SUCCESS;
+ default:
+ fprintf(stderr, "unknown session state\n");
+ close(s);
+ exit(EXIT_FAILURE);
+ }
- close(s);
+ close(s);
- return EXIT_SUCCESS;
+ return EXIT_SUCCESS;
}
View
31 broker/README.md
@@ -0,0 +1,31 @@
+# hpfeeds broker
+
+## Adding a user (new sensor or new client)
+
+The hpfeeds broker stores user credentials in mongodb. The `add_user.py` and `dump_users.py` scripts assist in setting up new users and troubleshooting. `add_user.py` can also be used to update an existing user.
+
+Adding a new sensor (publish only)
+
+ $ python add_user.py dionaea.1234 6t5r4e46g7y8j8 dionaea.events,dionaea.capture ""
+ inserted {'subscribe': [], 'secret': '6t5r4e46g7y8j8', 'identifier': 'dionaea.1234', 'publish': ['dionaea.events', 'dionaea.capture']}
+
+Adding a new client (subscribe only)
+
+ $ python add_user.py webapp.4567 p0o9i8u7ycj "" dionaea.events,dionaea.capture
+ inserted {'subscribe': ['dionaea.events', 'dionaea.capture'], 'secret': 'p0o9i8u7ycj', 'identifier': 'webapp.4567', 'publish': []}
+
+Updating a user:
+
+ $ python add_user.py webapp.4567 abc12345678 "" dionaea.events,dionaea.capture,thug.files,kippo.events
+ updated {'subscribe': ['dionaea.events', 'dionaea.capture', 'thug.files', 'kippo.events'], 'secret': 'abc12345678', 'identifier': 'webapp.4567', 'publish': []}
+
+## Dumping the users from mongodb
+
+ $ python dump_users.py
+ {u'subscribe': [], u'secret': u'6t5r4e46g7y8j8', u'_id': ObjectId('5368e587b391d1d314123a33'), u'publish': [u'dionaea.events', u'dionaea.capture'], u'identifier': u'dionaea.1234'}
+ {u'subscribe': [u'dionaea.events', u'dionaea.capture'], u'secret': u'p0o9i8u7ycj', u'_id': ObjectId('5368e5d7b391d1d314123a34'), u'publish': [], u'identifier': u'webapp.4567'}
+
+## Running the Broker
+
+ $ python feedbroker.py
+
View
37 broker/add_user.py
@@ -0,0 +1,37 @@
+#!/usr/bin/python
+
+import pymongo
+import sys
+
+def handle_list(arg):
+ if arg:
+ return arg.split(",")
+ else:
+ return []
+
+if len(sys.argv) < 5:
+ print >> sys.stderr, "Usage: %s <ident> <secret> <publish> <subscribe>"%sys.argv[0]
+ sys.exit(1)
+
+ident = sys.argv[1]
+secret = sys.argv[2]
+publish = handle_list(sys.argv[3])
+subscribe = handle_list(sys.argv[4])
+
+rec = {
+ "identifier": ident,
+ "secret": secret,
+ "publish": publish,
+ "subscribe":subscribe
+}
+
+client = pymongo.MongoClient()
+res = client.hpfeeds.auth_key.update({"identifier": ident}, {"$set": rec}, upsert=True)
+client.fsync()
+client.disconnect()
+
+if res['updatedExisting']:
+ print "updated %s"%rec
+else:
+ print "inserted %s"%(rec)
+
View
10 broker/dump_users.py
@@ -0,0 +1,10 @@
+#!/usr/bin/python
+
+import pymongo
+import sys
+
+client = pymongo.MongoClient()
+for doc in client.hpfeeds.auth_key.find():
+ print doc
+
+

0 comments on commit f18712d

Please sign in to comment.
Something went wrong with that request. Please try again.