Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Basic support for setting queue parameters.

  • Loading branch information...
commit 637a809ffd218cc2f9e86b723079fb467caeb7d0 1 parent 376b034
@postwait authored
View
2  fq.h
@@ -27,6 +27,8 @@
#define FQ_PROTO_STATUS 0x57a7
#define FQ_PROTO_STATUSREQ 0xc7a7
+#define FQ_DEFAULT_QUEUE_TYPE "mem"
+
#define MAX_RK_LEN 127
typedef struct fq_rk {
unsigned char name[MAX_RK_LEN];
View
24 fq_client.c
@@ -47,6 +47,7 @@ struct fq_conn_s {
char *user;
char *pass;
char *queue;
+ char *queue_type;
fq_rk key;
int cmd_fd;
int cmd_hb_needed;
@@ -145,11 +146,17 @@ fq_client_do_auth(fq_conn_s *conn_s) {
int len;
uint16_t cmd;
char error[1024];
+ char *queue_composed;
if(fq_write_uint16(conn_s->cmd_fd, FQ_PROTO_AUTH_CMD)) return -1;
if(fq_write_uint16(conn_s->cmd_fd, FQ_PROTO_AUTH_PLAIN)) return -2;
if(fq_write_short_cmd(conn_s->cmd_fd, strlen(conn_s->user), conn_s->user) < 0)
return -3;
- if(fq_write_short_cmd(conn_s->cmd_fd, strlen(conn_s->queue), conn_s->queue) < 0)
+ len = strlen(conn_s->queue) +
+ 1 + strlen(conn_s->queue_type);
+ queue_composed = alloca(len+1);
+ memcpy(queue_composed, conn_s->queue, strlen(conn_s->queue)+1); /* include null terminator */
+ memcpy(queue_composed + strlen(conn_s->queue) + 1, conn_s->queue_type, strlen(conn_s->queue_type));
+ if(fq_write_short_cmd(conn_s->cmd_fd, len, queue_composed) < 0)
return -4;
if(fq_write_short_cmd(conn_s->cmd_fd, strlen(conn_s->pass), conn_s->pass) < 0)
return -5;
@@ -554,6 +561,7 @@ fq_client_init(fq_client *conn_ptr, int peermode,
int
fq_client_creds(fq_client conn, const char *host, unsigned short port,
const char *sender, const char *pass) {
+ char qname[39];
fq_conn_s *conn_s;
conn_s = conn;
@@ -563,15 +571,23 @@ fq_client_creds(fq_client conn, const char *host, unsigned short port,
/* parse the user info */
conn_s->user = strdup(sender);
conn_s->queue = strchr(conn_s->user, '/');
- if(conn_s->queue) *conn_s->queue++ = '\0';
- if(!conn_s->queue) {
+ if(conn_s->queue) {
+ *conn_s->queue++ = '\0';
+ conn_s->queue_type = strchr(conn_s->queue, '/');
+ if(conn_s->queue_type) {
+ *conn_s->queue_type++ = '\0';
+ }
+ }
+ if(!conn_s->queue || conn_s->queue[0] == '\0') {
uuid_t out;
- char qname[39];
uuid_generate(out);
qname[0] = 'q'; qname[1] = '-';
uuid_unparse_lower(out, qname+2);
conn_s->queue = qname;
}
+ conn_s->queue_type = strdup(conn_s->queue_type ?
+ conn_s->queue_type :
+ FQ_DEFAULT_QUEUE_TYPE);
conn_s->queue = strdup(conn_s->queue);
conn_s->pass = strdup(pass);
View
2  fq_rcvr.c
@@ -49,7 +49,7 @@ int main(int argc, char **argv) {
memcpy(breq.exchange.name, "maryland", 8);
breq.exchange.len = 8;
breq.peermode = 0;
- breq.program = (char *)"prefix:\"test.prefix.\" sample(0.2)";
+ breq.program = (char *)"prefix:\"test.prefix.\" sample(1)";
fq_client_bind(c, &breq);
while(breq.out__route_id == 0) usleep(100);
View
3  fqd.h
@@ -16,6 +16,7 @@
typedef void * fqd_queue_impl_data;
typedef struct fqd_queue_impl {
+ const char *name;
fqd_queue_impl_data (*setup)(fq_rk *, uint32_t *count);
void (*enqueue)(fqd_queue_impl_data, fq_msg *);
fq_msg *(*dequeue)(fqd_queue_impl_data);
@@ -99,7 +100,7 @@ extern void fqd_remote_client_ref(remote_client *);
extern void fqd_remote_client_deref(remote_client *);
extern fq_rk *fqd_queue_name(fqd_queue *q);
-extern fqd_queue *fqd_queue_get(fq_rk *);
+extern fqd_queue *fqd_queue_get(fq_rk *, const char *, const char *);
extern uint32_t fqd_queue_get_backlog_limit(fqd_queue *);
extern void fqd_queue_set_backlog_limit(fqd_queue *, uint32_t);
extern queue_policy_t fqd_queue_get_policy(fqd_queue *);
View
30 fqd_ccs.c
@@ -20,19 +20,39 @@ fqd_ccs_auth(remote_client *client) {
if(method == 0) {
char buf[40];
unsigned char pass[10240];
+ char queue_detail[1024], *end_of_qd;
+ char *qtype = NULL, *qparams = NULL;
int len;
len = fq_read_short_cmd(client->fd, sizeof(client->user.name),
client->user.name);
if(len < 0 || len > (int)sizeof(client->user.name)) return -3;
client->user.len = len & 0xff;
- len = fq_read_short_cmd(client->fd, sizeof(queue_name.name),
- queue_name.name);
- if(len < 0 || len > (int)sizeof(queue_name.name)) return -4;
- queue_name.len = len & 0xff;
+ len = fq_read_short_cmd(client->fd, sizeof(queue_detail)-1,
+ queue_detail);
+ if(len < 0) return -4;
+ queue_detail[len] = '\0';
+ end_of_qd = memchr(queue_detail, '\0', len);
+ if(!end_of_qd) {
+ if(len < 0 || len > (int)sizeof(queue_name.name)) return -4;
+ queue_name.len = len & 0xff;
+ memcpy(queue_name.name, queue_detail, queue_name.len);
+ }
+ else if(end_of_qd - queue_detail <= 0xff) {
+ queue_name.len = end_of_qd - queue_detail;
+ memcpy(queue_name.name, queue_detail, queue_name.len);
+ qtype = end_of_qd + 1;
+ if(*qtype) qparams = strchr(qtype, ':');
+ else qtype = NULL;
+ if(qparams) *qparams++ = '\0';
+ }
+ else {
+ return -7;
+ }
len = fq_read_short_cmd(client->fd, sizeof(pass), pass);
if(len < 0 || len > (int)sizeof(queue_name.name)) return -5;
- client->queue = fqd_queue_get(&queue_name);
+ client->queue = fqd_queue_get(&queue_name, qtype, qparams);
+ if(client->queue == NULL) return -6;
/* do AUTH */
buf[0] = '\0';
View
2  fqd_config.c
@@ -390,7 +390,7 @@ fixup_config_write_context(void) {
next = (current + 1) % CONFIG_RING_SIZE;
nextnext = (current + 2) % CONFIG_RING_SIZE;
- if(!FQGC(next).dirty) return;
+ //if(!FQGC(next).dirty) return;
fq_debug(FQ_DEBUG_CONFIG, "Swapping to next running config\n");
pthread_mutex_lock(&global_config.writelock);
View
47 fqd_queue.c
@@ -29,6 +29,8 @@ struct fqd_queue {
fqd_queue_impl_data *impl_data;
};
+static void fqd_queue_free(fqd_queue *q);
+
fq_rk *
fqd_queue_name(fqd_queue *q) {
return &q->name;
@@ -131,7 +133,7 @@ fqd_queue_deref(fqd_queue *q) {
if(zero) {
fq_debug(FQ_DEBUG_CONFIG, "dropping queue(%p) %.*s\n",
(void *)q, q->name.len, q->name.name);
- free(q);
+ fqd_queue_free(q);
}
}
uint32_t
@@ -154,12 +156,43 @@ static void
fqd_queue_free(fqd_queue *q) {
pthread_mutex_destroy(&q->lock);
pthread_cond_destroy(&q->cv);
+ q->impl->dispose(q->impl_data);
free(q);
}
fqd_queue *
-fqd_queue_get(fq_rk *qname) {
+fqd_queue_get(fq_rk *qname, const char *type, const char *params) {
+ bool error = false;
fqd_queue *q = NULL;
fqd_config *config;
+ char *params_copy, *lastsep = NULL, *tok;
+
+ bool private = true;
+ queue_policy_t policy = FQ_POLICY_DROP;
+ uint32_t backlog_limit = DEFAULT_QUEUE_LIMIT;
+ fqd_queue_impl *queue_impl = &fqd_queue_mem_impl;
+
+ if(!type) type = FQ_DEFAULT_QUEUE_TYPE;
+ if(strcmp(type, "mem")) {
+ return NULL;
+ }
+ params_copy = strdup(params ? params : "");
+ tok = NULL;
+ while(NULL != (tok = strtok_r(tok ? NULL : params_copy, ":", &lastsep))) {
+ if(!strcmp(tok, "private")) private = true;
+ else if(!strcmp(tok, "public")) private = false;
+ else if(!strcmp(tok, "drop")) policy = FQ_POLICY_DROP;
+ else if(!strcmp(tok, "block")) policy = FQ_POLICY_BLOCK;
+ else if(!strncmp(tok, "backlog=", 8)) {
+ backlog_limit = atoi(tok + 8);
+ }
+ else {
+ error = true;
+ fq_debug(FQ_DEBUG_CONN, "error parsing: %s\n", tok);
+ }
+ }
+ free(params_copy);
+ if(error) return NULL;
+
config = fqd_config_get();
q = fqd_config_get_registered_queue(config, qname);
@@ -167,18 +200,16 @@ fqd_queue_get(fq_rk *qname) {
fqd_queue *nq;
nq = calloc(1, sizeof(*nq));
nq->refcnt = 0;
- nq->private = true;
- nq->policy = FQ_POLICY_DROP;
- nq->backlog_limit = DEFAULT_QUEUE_LIMIT;
+ nq->private = private;
+ nq->policy = policy;
+ nq->backlog_limit = backlog_limit;
pthread_mutex_init(&nq->lock, NULL);
pthread_cond_init(&nq->cv, NULL);
memcpy(&nq->name, qname, sizeof(*qname));
- nq->impl = &fqd_queue_mem_impl;
+ nq->impl = queue_impl;
nq->impl_data = nq->impl->setup(qname, &nq->backlog);
q = fqd_config_register_queue(nq, NULL);
if(nq != q) {
- /* race */
- nq->impl->dispose(nq->impl_data);
fqd_queue_free(nq);
}
}
View
5 fqd_queue_mem.c
@@ -22,7 +22,7 @@ static fq_msg *queue_mem_dequeue(fqd_queue_impl_data f) {
fq_msg *m;
if(ck_fifo_mpmc_dequeue(&d->q, &m, &garbage) == true) {
ck_pr_dec_uint(&d->qlen);
- free(garbage);
+ if(garbage != d->qhead) free(garbage);
return m;
}
return NULL;
@@ -42,11 +42,12 @@ static void queue_mem_dispose(fqd_queue_impl_data f) {
while(NULL != (m = queue_mem_dequeue(d))) {
fq_msg_deref(m);
}
- free(d->qhead);
+ if(d->qhead) free(d->qhead);
free(d);
}
fqd_queue_impl fqd_queue_mem_impl = {
+ .name = "mem",
.setup = queue_mem_setup,
.enqueue = queue_mem_enqueue,
.dequeue = queue_mem_dequeue,
View
4 fqd_routemgr.c
@@ -418,6 +418,7 @@ expr_free(exprnode_t *e) {
}
free(e->args);
}
+ free(e);
}
#define EAT_SPACE(p) while(*p != '\0' && isspace(*p)) (p)++
@@ -511,6 +512,7 @@ rule_parse(const char **cp, int errlen, char *err) {
(*cp)++;
EAT_SPACE(*cp); if(**cp == '\0') goto busted;
nr = calloc(1, sizeof(*nr));
+ nr->refcnt = 1;
nr->left = rule_parse(cp, errlen, err);
if(nr->left == NULL) goto busted;
EAT_SPACE(*cp); if(**cp == '\0') goto busted;
@@ -572,6 +574,7 @@ rule_parse(const char **cp, int errlen, char *err) {
if(**cp != ')') goto busted;
(*cp)++;
nr = calloc(1, sizeof(*nr));
+ nr->refcnt = 1;
nr->expr = rule_compose_expression(term, nargs, args, errlen, err);
if(!nr->expr) {
int i;
@@ -611,7 +614,6 @@ prog_compile(const char *program, int errlen, char *err) {
if(errlen>0) err[0] = '\0';
nr = rule_parse(&program, errlen, err);
EAT_SPACE(program);
- if(nr) nr->refcnt = 1;
if(*program) {
if(err && err[0] == '\0') snprintf(err, errlen, "trailing trash: %s", program);
prog_free(nr);
Please sign in to comment.
Something went wrong with that request. Please try again.