Skip to content

Commit

Permalink
redefined variables and finish all commands
Browse files Browse the repository at this point in the history
  • Loading branch information
yerstd committed May 9, 2020
1 parent 9b2685d commit 4740064
Show file tree
Hide file tree
Showing 14 changed files with 410 additions and 248 deletions.
13 changes: 13 additions & 0 deletions .editorconfig
@@ -0,0 +1,13 @@
#editorconfig.org
root = true

[*.c,*.h]
indent_style = space
indent_size = 4
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_trailing_newline = true

[*.md]
trim_trailing_whitespace = false
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -4,3 +4,4 @@ test-nsqd
test-lookupd
*.o
*.dSYM
.DS_Store
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -35,7 +35,7 @@ test-nsqd.o: test.c
test-nsqd: test-nsqd.o libnsq.a
$(CC) -o $@ $^ $(LIBS)

test-lookupd: test.o libnsq.a
test-lookupd: test-nsqd.o libnsq.a
$(CC) -o $@ $^ $(LIBS)

clean:
Expand Down
217 changes: 191 additions & 26 deletions command.c
@@ -1,49 +1,214 @@
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <evbuffsock.h>
#include <stdlib.h>

const static char * NEW_LINE = "\n";
#include "nsq.h"

const static char * NEW_LINE = "\n";
const static char * NEW_SPACE = " ";
const static int MAX_BUF_SIZE = 128;
const static int MIN_BUF_SIZE = 64;
const static int BUF_DELTER = 2;

void nsq_subscribe(struct Buffer *buf, const char *topic, const char *channel)
void *nsq_buf_malloc(size_t buf_size, size_t n, size_t l)
{
char b[MAX_BUF_SIZE];
size_t n;
if (buf_size - n >= MIN_BUF_SIZE || buf_size - l >= MIN_BUF_SIZE) {
return NULL;
}

void *buf = NULL;
while (1) {
if (buf_size - n < MIN_BUF_SIZE || buf_size - l < MIN_BUF_SIZE) {
buf_size *= BUF_DELTER;
continue;
}
buf = malloc(buf_size * sizeof(char));
assert(NULL != buf);
break;
}

return buf;
}

void nsq_buffer_add(nsqBuf *buf, const char *name, const nsqCmdParams params[], size_t psize, const char *body, const size_t body_length)
{
size_t buf_size = MAX_BUF_SIZE;
char *b = malloc(buf_size * sizeof(char));
char *nb = NULL;
assert(NULL != b);
size_t n = 0;
size_t l = 0;

l = strlen(name);
memcpy(b, name, l);
n += l;

n = sprintf(b, "SUB %s %s%s", topic, channel, NEW_LINE);
if (NULL != params) {
for (int i = 0; i < psize; i++) {
memcpy(b+n, NEW_SPACE, 1);
n += 1;

switch (params[i].t) {
case NSQ_PARAM_TYPE_INT:
l = sprintf(b+n, "%d", *((int *)params[i].v));
break;
case NSQ_PARAM_TYPE_CHAR:
nb = nsq_buf_malloc(buf_size, n, strlen(params[i].v));
if (NULL != nb) {
memcpy(nb, b, n);
free(b);
b = nb;
}
l = strlen((char *)params[i].v);
memcpy(b+n, (char *)params[i].v, l);
break;
}
n += l;
}
}
memcpy(b+n, NEW_LINE, 1);
n += 1;

if (NULL != body) {
uint32_t vv = htonl((uint32_t)body_length);
memcpy(b+n, &vv, 4);
n += 4;

nb = nsq_buf_malloc(buf_size, n, body_length);
if (NULL != nb) {
memcpy(nb, b, n);
free(b);
b = nb;
}
memcpy(b+n, body, body_length);
n += body_length;
}

buffer_add(buf, b, n);
}

void nsq_ready(struct Buffer *buf, int count)
void nsq_subscribe(nsqBuf *buf, const char *topic, const char *channel)
{
char b[MAX_BUF_SIZE];
size_t n;
const char *name = "SUB";
const nsqCmdParams params[2] = {
{(void *)topic, NSQ_PARAM_TYPE_CHAR},
{(void *)channel, NSQ_PARAM_TYPE_CHAR},
};
nsq_buffer_add(buf, name, params, 2, NULL, 0);
}

n = sprintf(b, "RDY %d%s", count, NEW_LINE);
buffer_add(buf, b, n);
void nsq_ready(nsqBuf *buf, int count)
{
const char *name = "RDY";
const nsqCmdParams params[1] = {
{&count, NSQ_PARAM_TYPE_INT},
};
nsq_buffer_add(buf, name, params, 1, NULL, 0);
}

void nsq_finish(struct Buffer *buf, const char *id)
void nsq_finish(nsqBuf *buf, const char *id)
{
char b[MAX_BUF_SIZE];
size_t n;
const char *name = "FIN";
const nsqCmdParams params[1] = {
{(void *)id, NSQ_PARAM_TYPE_CHAR},
};
nsq_buffer_add(buf, name, params, 1, NULL, 0);
}

n = sprintf(b, "FIN %s%s", id, NEW_LINE);
buffer_add(buf, b, n);
void nsq_requeue(nsqBuf *buf, const char *id, int timeout_ms)
{
const char *name = "REQ";
const nsqCmdParams params[2] = {
{(void *)id, NSQ_PARAM_TYPE_CHAR},
{&timeout_ms, NSQ_PARAM_TYPE_INT},
};
nsq_buffer_add(buf, name, params, 2, NULL, 0);
}

void nsq_requeue(struct Buffer *buf, const char *id, int timeout_ms)
void nsq_nop(nsqBuf *buf)
{
char b[MAX_BUF_SIZE];
size_t n;
nsq_buffer_add(buf, "NOP", NULL, 0, NULL, 0);
}

n = sprintf(b, "REQ %s %d%s", id, timeout_ms, NEW_LINE);
buffer_add(buf, b, n);
void nsq_publish(nsqBuf *buf, const char *topic, const char *body)
{
const char *name = "PUB";
const nsqCmdParams params[1] = {
{(void *)topic, NSQ_PARAM_TYPE_CHAR},
};
nsq_buffer_add(buf, name, params, 1, body, strlen(body));
}

void nsq_nop(struct Buffer *buf)
void nsq_defer_publish(nsqBuf *buf, const char *topic, const char *body, int defer_time_sec)
{
char b[MAX_BUF_SIZE];
size_t n;
n = sprintf(b, "NOP%s", NEW_LINE);
buffer_add(buf, b, n);
const char *name = "DPUB";
const nsqCmdParams params[2] = {
{(void *)topic, NSQ_PARAM_TYPE_CHAR},
{&defer_time_sec, NSQ_PARAM_TYPE_INT},
};
nsq_buffer_add(buf, name, params, 2, body, strlen(body));
}

void nsq_multi_publish(nsqBuf *buf, const char *topic, const char **body, const size_t body_size)
{
const char *name = "MPUB";
const nsqCmdParams params[1] = {
{(void *)topic, NSQ_PARAM_TYPE_CHAR},
};

size_t s = 4;
for (int i = 0; i<body_size; i++) {
s += strlen(body[i])+4;
}
char *b = malloc(s * sizeof(char));
assert(NULL != b);

size_t n = 0;
uint32_t v = 0;
v = htonl((uint32_t)body_size);
memcpy(b+n, &v, 4);
n += 4;

size_t l = 0;
for (int i = 0; i < body_size; i++) {
l = strlen(body[i]);
v = htonl((uint32_t)l);
memcpy(b+n, &v, 4);
n += 4;

l = strlen(body[i]);
memcpy(b+n, body[i], l);
n += l;
}

nsq_buffer_add(buf, name, params, 1, b, s);
}

void nsq_touch(nsqBuf *buf, const char *id)
{
const char *name = "TOUCH";
const nsqCmdParams params[1] = {
{(void *)id, NSQ_PARAM_TYPE_CHAR},
};
nsq_buffer_add(buf, name, params, 1, NULL, 0);
}

void nsq_cleanly_close_connection(nsqBuf *buf)
{
const char *name = "CLS";
nsq_buffer_add(buf, name, NULL, 0, NULL, 0);
}

void nsq_auth(nsqBuf *buf, const char *secret)
{
const char *name = "AUTH";
nsq_buffer_add(buf, name, NULL, 0, secret, strlen(secret));
}

//TODO: should handle object to json string
void nsq_identify(nsqBuf *buf, const char *json_body)
{
const char *name = "IDENTIFY";
nsq_buffer_add(buf, name, NULL, 1, json_body, strlen(json_body));
}

0 comments on commit 4740064

Please sign in to comment.