From 47400646f9b265e08e3c8f723213b292af8c7886 Mon Sep 17 00:00:00 2001 From: stackerzzq Date: Sun, 10 May 2020 00:03:54 +0800 Subject: [PATCH] redefined variables and finish all commands --- .editorconfig | 13 +++ .gitignore | 1 + Makefile | 2 +- command.c | 217 ++++++++++++++++++++++++++++++++++++++++------ http.c | 57 ++++++------ http.h | 16 ++-- json.h | 2 +- message.c | 14 +-- nsq.h | 162 ++++++++++++++++++---------------- nsqd_connection.c | 58 ++++++------- nsqlookupd.c | 23 ++--- reader.c | 68 +++++++-------- test.c | 24 +++-- utlist.h | 1 - 14 files changed, 410 insertions(+), 248 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..6d58508 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,13 @@ +#editorconfig.org +root = true + +[*.c,*.h] +indent_style = space +indent_size = 4 +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_trailing_newline = true + +[*.md] +trim_trailing_whitespace = false diff --git a/.gitignore b/.gitignore index e0f4959..5a3eeb2 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ test-nsqd test-lookupd *.o *.dSYM +.DS_Store diff --git a/Makefile b/Makefile index f409176..3348e61 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ test-nsqd.o: test.c test-nsqd: test-nsqd.o libnsq.a $(CC) -o $@ $^ $(LIBS) -test-lookupd: test.o libnsq.a +test-lookupd: test-nsqd.o libnsq.a $(CC) -o $@ $^ $(LIBS) clean: diff --git a/command.c b/command.c index 5a9a50b..ac25a4f 100644 --- a/command.c +++ b/command.c @@ -1,49 +1,214 @@ +#include +#include #include -#include +#include -const static char * NEW_LINE = "\n"; +#include "nsq.h" + +const static char * NEW_LINE = "\n"; +const static char * NEW_SPACE = " "; const static int MAX_BUF_SIZE = 128; +const static int MIN_BUF_SIZE = 64; +const static int BUF_DELTER = 2; -void nsq_subscribe(struct Buffer *buf, const char *topic, const char *channel) +void *nsq_buf_malloc(size_t buf_size, size_t n, size_t l) { - char b[MAX_BUF_SIZE]; - size_t n; + if (buf_size - n >= MIN_BUF_SIZE || buf_size - l >= MIN_BUF_SIZE) { + return NULL; + } + + void *buf = NULL; + while (1) { + if (buf_size - n < MIN_BUF_SIZE || buf_size - l < MIN_BUF_SIZE) { + buf_size *= BUF_DELTER; + continue; + } + buf = malloc(buf_size * sizeof(char)); + assert(NULL != buf); + break; + } + + return buf; +} + +void nsq_buffer_add(nsqBuf *buf, const char *name, const nsqCmdParams params[], size_t psize, const char *body, const size_t body_length) +{ + size_t buf_size = MAX_BUF_SIZE; + char *b = malloc(buf_size * sizeof(char)); + char *nb = NULL; + assert(NULL != b); + size_t n = 0; + size_t l = 0; + + l = strlen(name); + memcpy(b, name, l); + n += l; - n = sprintf(b, "SUB %s %s%s", topic, channel, NEW_LINE); + if (NULL != params) { + for (int i = 0; i < psize; i++) { + memcpy(b+n, NEW_SPACE, 1); + n += 1; + + switch (params[i].t) { + case NSQ_PARAM_TYPE_INT: + l = sprintf(b+n, "%d", *((int *)params[i].v)); + break; + case NSQ_PARAM_TYPE_CHAR: + nb = nsq_buf_malloc(buf_size, n, strlen(params[i].v)); + if (NULL != nb) { + memcpy(nb, b, n); + free(b); + b = nb; + } + l = strlen((char *)params[i].v); + memcpy(b+n, (char *)params[i].v, l); + break; + } + n += l; + } + } + memcpy(b+n, NEW_LINE, 1); + n += 1; + + if (NULL != body) { + uint32_t vv = htonl((uint32_t)body_length); + memcpy(b+n, &vv, 4); + n += 4; + + nb = nsq_buf_malloc(buf_size, n, body_length); + if (NULL != nb) { + memcpy(nb, b, n); + free(b); + b = nb; + } + memcpy(b+n, body, body_length); + n += body_length; + } + buffer_add(buf, b, n); } -void nsq_ready(struct Buffer *buf, int count) +void nsq_subscribe(nsqBuf *buf, const char *topic, const char *channel) { - char b[MAX_BUF_SIZE]; - size_t n; + const char *name = "SUB"; + const nsqCmdParams params[2] = { + {(void *)topic, NSQ_PARAM_TYPE_CHAR}, + {(void *)channel, NSQ_PARAM_TYPE_CHAR}, + }; + nsq_buffer_add(buf, name, params, 2, NULL, 0); +} - n = sprintf(b, "RDY %d%s", count, NEW_LINE); - buffer_add(buf, b, n); +void nsq_ready(nsqBuf *buf, int count) +{ + const char *name = "RDY"; + const nsqCmdParams params[1] = { + {&count, NSQ_PARAM_TYPE_INT}, + }; + nsq_buffer_add(buf, name, params, 1, NULL, 0); } -void nsq_finish(struct Buffer *buf, const char *id) +void nsq_finish(nsqBuf *buf, const char *id) { - char b[MAX_BUF_SIZE]; - size_t n; + const char *name = "FIN"; + const nsqCmdParams params[1] = { + {(void *)id, NSQ_PARAM_TYPE_CHAR}, + }; + nsq_buffer_add(buf, name, params, 1, NULL, 0); +} - n = sprintf(b, "FIN %s%s", id, NEW_LINE); - buffer_add(buf, b, n); +void nsq_requeue(nsqBuf *buf, const char *id, int timeout_ms) +{ + const char *name = "REQ"; + const nsqCmdParams params[2] = { + {(void *)id, NSQ_PARAM_TYPE_CHAR}, + {&timeout_ms, NSQ_PARAM_TYPE_INT}, + }; + nsq_buffer_add(buf, name, params, 2, NULL, 0); } -void nsq_requeue(struct Buffer *buf, const char *id, int timeout_ms) +void nsq_nop(nsqBuf *buf) { - char b[MAX_BUF_SIZE]; - size_t n; + nsq_buffer_add(buf, "NOP", NULL, 0, NULL, 0); +} - n = sprintf(b, "REQ %s %d%s", id, timeout_ms, NEW_LINE); - buffer_add(buf, b, n); +void nsq_publish(nsqBuf *buf, const char *topic, const char *body) +{ + const char *name = "PUB"; + const nsqCmdParams params[1] = { + {(void *)topic, NSQ_PARAM_TYPE_CHAR}, + }; + nsq_buffer_add(buf, name, params, 1, body, strlen(body)); } -void nsq_nop(struct Buffer *buf) +void nsq_defer_publish(nsqBuf *buf, const char *topic, const char *body, int defer_time_sec) { - char b[MAX_BUF_SIZE]; - size_t n; - n = sprintf(b, "NOP%s", NEW_LINE); - buffer_add(buf, b, n); + const char *name = "DPUB"; + const nsqCmdParams params[2] = { + {(void *)topic, NSQ_PARAM_TYPE_CHAR}, + {&defer_time_sec, NSQ_PARAM_TYPE_INT}, + }; + nsq_buffer_add(buf, name, params, 2, body, strlen(body)); +} + +void nsq_multi_publish(nsqBuf *buf, const char *topic, const char **body, const size_t body_size) +{ + const char *name = "MPUB"; + const nsqCmdParams params[1] = { + {(void *)topic, NSQ_PARAM_TYPE_CHAR}, + }; + + size_t s = 4; + for (int i = 0; i -#ifdef DEBUG -#define _DEBUG(...) fprintf(stdout, __VA_ARGS__) -#else -#define _DEBUG(...) do {;} while (0) -#endif +#include "http.h" static void timer_cb(EV_P_ struct ev_timer *w, int revents); static int multi_timer_cb(CURLM *multi, long timeout_ms, void *arg) { - struct HttpClient *httpc = (struct HttpClient *)arg; + httpClient *httpc = (httpClient *)arg; ev_timer_stop(httpc->loop, &httpc->timer_event); if (timeout_ms > 0) { @@ -25,14 +20,14 @@ static int multi_timer_cb(CURLM *multi, long timeout_ms, void *arg) return 0; } -static void check_multi_info(struct HttpClient *httpc) +static void check_multi_info(httpClient *httpc) { char *eff_url; CURLMsg *msg; int msgs_left; int status_code; - struct HttpRequest *req; - struct HttpResponse *resp; + httpRequest *req; + httpResponse *resp; CURL *easy; while ((msg = curl_multi_info_read(httpc->multi, &msgs_left))) { @@ -51,7 +46,7 @@ static void check_multi_info(struct HttpClient *httpc) static size_t write_cb(char *ptr, size_t size, size_t nmemb, void *arg) { - struct HttpRequest *req = (struct HttpRequest *)arg; + httpRequest *req = (httpRequest *)arg; size_t realsize = size * nmemb; buffer_add(req->data, ptr, realsize); @@ -61,7 +56,7 @@ static size_t write_cb(char *ptr, size_t size, size_t nmemb, void *arg) static void event_cb(EV_P_ struct ev_io *w, int revents) { - struct HttpClient *httpc = (struct HttpClient *)w->data; + httpClient *httpc = (httpClient *)w->data; CURLMcode rc; int action = (revents & EV_READ ? CURL_POLL_IN : 0) | (revents & EV_WRITE ? CURL_POLL_OUT : 0); @@ -74,7 +69,7 @@ static void event_cb(EV_P_ struct ev_io *w, int revents) static void timer_cb(EV_P_ struct ev_timer *w, int revents) { - struct HttpClient *httpc = (struct HttpClient *)w->data; + httpClient *httpc = (httpClient *)w->data; CURLMcode rc; rc = curl_multi_socket_action(httpc->multi, CURL_SOCKET_TIMEOUT, 0, &httpc->still_running); @@ -86,8 +81,8 @@ static void timer_cb(EV_P_ struct ev_timer *w, int revents) static int sock_cb(CURL *e, curl_socket_t s, int what, void *arg, void *sock_arg) { - struct HttpClient *httpc = (struct HttpClient *)arg; - struct HttpSocket *sock = (struct HttpSocket *)sock_arg; + httpClient *httpc = (httpClient *)arg; + httpSocket *sock = (httpSocket *)sock_arg; int kind = (what & CURL_POLL_IN ? EV_READ : 0) | (what & CURL_POLL_OUT ? EV_WRITE : 0); if (what == CURL_POLL_REMOVE) { @@ -100,7 +95,7 @@ static int sock_cb(CURL *e, curl_socket_t s, int what, void *arg, void *sock_arg } else { int with_new = 0; if (!sock) { - sock = (struct HttpSocket *)calloc(1, sizeof(struct HttpSocket)); + sock = (httpSocket *)calloc(1, sizeof(httpSocket)); with_new = 1; } @@ -124,11 +119,11 @@ static int sock_cb(CURL *e, curl_socket_t s, int what, void *arg, void *sock_arg return 0; } -struct HttpClient *new_http_client(struct ev_loop *loop) +httpClient *new_http_client(struct ev_loop *loop) { - struct HttpClient *httpc; + httpClient *httpc; - httpc = (struct HttpClient *)malloc(sizeof(struct HttpClient)); + httpc = (httpClient *)malloc(sizeof(httpClient)); httpc->loop = loop; httpc->multi = curl_multi_init(); ev_timer_init(&httpc->timer_event, timer_cb, 0., 0.); @@ -141,7 +136,7 @@ struct HttpClient *new_http_client(struct ev_loop *loop) return httpc; } -void free_http_client(struct HttpClient *httpc) +void free_http_client(httpClient *httpc) { if (httpc) { curl_multi_cleanup(httpc->multi); @@ -150,12 +145,12 @@ void free_http_client(struct HttpClient *httpc) } } -struct HttpRequest *new_http_request(const char *url, - void (*callback)(struct HttpRequest *req, struct HttpResponse *resp, void *arg), void *cb_arg) +httpRequest *new_http_request(const char *url, + void (*callback)(httpRequest *req, httpResponse *resp, void *arg), void *cb_arg) { - struct HttpRequest *req; + httpRequest *req; - req = (struct HttpRequest *)calloc(1, sizeof(struct HttpRequest)); + req = (httpRequest *)calloc(1, sizeof(httpRequest)); req->data = new_buffer(4096, 0); req->easy = curl_easy_init(); if (!req->easy) { @@ -176,7 +171,7 @@ struct HttpRequest *new_http_request(const char *url, return req; } -void free_http_request(struct HttpRequest *req) +void free_http_request(httpRequest *req) { if (req) { curl_multi_remove_handle(req->httpc->multi, req->easy); @@ -187,25 +182,25 @@ void free_http_request(struct HttpRequest *req) } } -struct HttpResponse *new_http_response(int status_code, void *data) +httpResponse *new_http_response(int status_code, void *data) { - struct HttpResponse *resp; + httpResponse *resp; - resp = (struct HttpResponse *)malloc(sizeof(struct HttpResponse)); + resp = (httpResponse *)malloc(sizeof(httpResponse)); resp->status_code = status_code; resp->data = data; return resp; } -void free_http_response(struct HttpResponse *resp) +void free_http_response(httpResponse *resp) { if (resp) { free(resp); } } -int http_client_get(struct HttpClient *httpc, struct HttpRequest *req) +int http_client_get(httpClient *httpc, httpRequest *req) { CURLMcode rc; diff --git a/http.h b/http.h index 1bac097..2f83a31 100644 --- a/http.h +++ b/http.h @@ -4,19 +4,19 @@ #include #include -struct HttpClient { +typedef struct HttpClient { CURLM *multi; struct ev_loop *loop; struct ev_timer timer_event; int still_running; -}; +} httpClient; -struct HttpResponse { +typedef struct HttpResponse { int status_code; struct Buffer *data; -}; +} httpResponse; -struct HttpRequest { +typedef struct HttpRequest { CURL *easy; char *url; struct HttpClient *httpc; @@ -24,9 +24,9 @@ struct HttpRequest { struct Buffer *data; void (*callback)(struct HttpRequest *req, struct HttpResponse *resp, void *arg); void *cb_arg; -}; +} httpRequest; -struct HttpSocket { +typedef struct HttpSocket { curl_socket_t sockfd; CURL *easy; int action; @@ -34,7 +34,7 @@ struct HttpSocket { struct ev_io ev; int evset; struct HttpClient *httpc; -}; +} httpSocket; struct HttpClient *new_http_client(struct ev_loop *loop); void free_http_client(struct HttpClient *httpc); diff --git a/json.h b/json.h index 58bb922..9a09311 100644 --- a/json.h +++ b/json.h @@ -15,7 +15,7 @@ typedef void nsq_json_tokener_t; #include typedef struct json_object nsq_json_t; -typedef int nsq_json_size_t; +typedef size_t nsq_json_size_t; typedef int32_t nsq_json_int_t; typedef struct json_tokener nsq_json_tokener_t; diff --git a/message.c b/message.c index 288c265..f216ff8 100644 --- a/message.c +++ b/message.c @@ -1,11 +1,5 @@ #include "nsq.h" -#ifdef DEBUG -#define _DEBUG(...) fprintf(stdout, __VA_ARGS__) -#else -#define _DEBUG(...) do {;} while (0) -#endif - uint64_t ntoh64(const uint8_t *data) { return (uint64_t)(data[7]) | (uint64_t)(data[6])<<8 | (uint64_t)(data[5])<<16 | (uint64_t)(data[4])<<24 | @@ -13,12 +7,12 @@ uint64_t ntoh64(const uint8_t *data) { (uint64_t)(data[1])<<48 | (uint64_t)(data[0])<<56; } -struct NSQMessage *nsq_decode_message(const char *data, size_t data_length) +nsqMsg *nsq_decode_message(const char *data, size_t data_length) { - struct NSQMessage *msg; + nsqMsg *msg; size_t body_length; - msg = (struct NSQMessage *)malloc(sizeof(struct NSQMessage)); + msg = (nsqMsg *)malloc(sizeof(nsqMsg)); msg->timestamp = (int64_t)ntoh64((uint8_t *)data); msg->attempts = ntohs(*(uint16_t *)(data+8)); memcpy(&msg->id, data+10, 16); @@ -30,7 +24,7 @@ struct NSQMessage *nsq_decode_message(const char *data, size_t data_length) return msg; } -void free_nsq_message(struct NSQMessage *msg) +void free_nsq_message(nsqMsg *msg) { if (msg) { free(msg->body); diff --git a/nsq.h b/nsq.h index 73fd252..68bc686 100644 --- a/nsq.h +++ b/nsq.h @@ -1,20 +1,53 @@ #ifndef __nsq_h #define __nsq_h -#include -#include -#include -#include #include #include -#include "utlist.h" +#ifdef DEBUG +#define _DEBUG(...) fprintf(stdout, __VA_ARGS__) +#else +#define _DEBUG(...) do {;} while (0) +#endif typedef enum {NSQ_FRAME_TYPE_RESPONSE, NSQ_FRAME_TYPE_ERROR, NSQ_FRAME_TYPE_MESSAGE} frame_type; -struct NSQDConnection; -struct NSQMessage; +typedef enum {NSQ_PARAM_TYPE_INT, NSQ_PARAM_TYPE_CHAR} nsq_cmd_param_type; +typedef struct Buffer nsqBuf; +typedef struct BufferedSocket nsqBufdSock; +typedef struct NSQCmdParams { + void *v; + int t; +} nsqCmdParams; -struct NSQReaderCfg { +typedef struct NSQMessage { + int64_t timestamp; + uint16_t attempts; + char id[16+1]; + size_t body_length; + char *body; +} nsqMsg; +typedef struct NSQLookupdEndpoint { + char *address; + int port; + struct NSQLookupdEndpoint *next; +} nsqLE; +typedef struct NSQDConnection { + char *address; + int port; + struct BufferedSocket *bs; + struct Buffer *command_buf; + uint32_t current_msg_size; + uint32_t current_frame_type; + char *current_data; + struct ev_loop *loop; + ev_timer *reconnect_timer; + void (*connect_callback)(struct NSQDConnection *conn, void *arg); + void (*close_callback)(struct NSQDConnection *conn, void *arg); + void (*msg_callback)(struct NSQDConnection *conn, nsqMsg *msg, void *arg); + void *arg; + struct NSQDConnection *next; +} nsqdConn; +typedef struct NSQReaderCfg { ev_tstamp lookupd_interval; size_t command_buf_len; size_t command_buf_capacity; @@ -22,91 +55,70 @@ struct NSQReaderCfg { size_t read_buf_capacity; size_t write_buf_len; size_t write_buf_capacity; -}; +} nsqRdrCfg; -struct NSQReader { +typedef struct NSQReader { char *topic; char *channel; void *ctx; //context for call back int max_in_flight; - struct NSQDConnection *conns; + nsqdConn *conns; struct NSQDConnInfo *infos; - struct NSQLookupdEndpoint *lookupd; + nsqLE *lookupd; struct ev_timer lookupd_poll_timer; struct ev_loop *loop; - struct NSQReaderCfg *cfg; + nsqRdrCfg *cfg; void *httpc; - void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn); - void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn); - void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx); -}; + void (*connect_callback)(struct NSQReader *rdr, nsqdConn *conn); + void (*close_callback)(struct NSQReader *rdr, nsqdConn *conn); + void (*msg_callback)(struct NSQReader *rdr, nsqdConn *conn, nsqMsg *msg, void *ctx); +} nsqRdr; -struct NSQReader *new_nsq_reader(struct ev_loop *loop, const char *topic, const char *channel, void *ctx, - struct NSQReaderCfg *cfg, - void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn), - void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn), - void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx)); -void free_nsq_reader(struct NSQReader *rdr); -int nsq_reader_connect_to_nsqd(struct NSQReader *rdr, const char *address, int port); -int nsq_reader_connect_to_nsqlookupd(struct NSQReader *rdr); -int nsq_reader_add_nsqlookupd_endpoint(struct NSQReader *rdr, const char *address, int port); -void nsq_reader_set_loop(struct NSQReader *rdr, struct ev_loop *loop); +nsqRdr *new_nsq_reader(struct ev_loop *loop, const char *topic, const char *channel, void *ctx, + nsqRdrCfg *cfg, + void (*connect_callback)(nsqRdr *rdr, nsqdConn *conn), + void (*close_callback)(nsqRdr *rdr, nsqdConn *conn), + void (*msg_callback)(nsqRdr *rdr, nsqdConn *conn, nsqMsg *msg, void *ctx)); +void free_nsq_reader(nsqRdr *rdr); +int nsq_reader_connect_to_nsqd(nsqRdr *rdr, const char *address, int port); +int nsq_reader_connect_to_nsqlookupd(nsqRdr *rdr); +int nsq_reader_add_nsqlookupd_endpoint(nsqRdr *rdr, const char *address, int port); +void nsq_reader_set_loop(nsqRdr *rdr, struct ev_loop *loop); void nsq_run(struct ev_loop *loop); -struct NSQDConnection { - char *address; - int port; - struct BufferedSocket *bs; - struct Buffer *command_buf; - uint32_t current_msg_size; - uint32_t current_frame_type; - char *current_data; - struct ev_loop *loop; - ev_timer *reconnect_timer; - void (*connect_callback)(struct NSQDConnection *conn, void *arg); - void (*close_callback)(struct NSQDConnection *conn, void *arg); - void (*msg_callback)(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg); - void *arg; - struct NSQDConnection *next; -}; - -struct NSQDConnection *new_nsqd_connection(struct ev_loop *loop, const char *address, int port, - void (*connect_callback)(struct NSQDConnection *conn, void *arg), - void (*close_callback)(struct NSQDConnection *conn, void *arg), - void (*msg_callback)(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg), +nsqdConn *new_nsqd_connection(struct ev_loop *loop, const char *address, int port, + void (*connect_callback)(nsqdConn *conn, void *arg), + void (*close_callback)(nsqdConn *conn, void *arg), + void (*msg_callback)(nsqdConn *conn, nsqMsg *msg, void *arg), void *arg); -void free_nsqd_connection(struct NSQDConnection *conn); -int nsqd_connection_connect(struct NSQDConnection *conn); -void nsqd_connection_disconnect(struct NSQDConnection *conn); +void free_nsqd_connection(nsqdConn *conn); +int nsqd_connection_connect(nsqdConn *conn); +void nsqd_connection_disconnect(nsqdConn *conn); -void nsqd_connection_init_timer(struct NSQDConnection *conn, +void nsqd_connection_init_timer(nsqdConn *conn, void (*reconnect_callback)(EV_P_ ev_timer *w, int revents)); -void nsqd_connection_stop_timer(struct NSQDConnection *conn); +void nsqd_connection_stop_timer(nsqdConn *conn); -void nsq_subscribe(struct Buffer *buf, const char *topic, const char *channel); -void nsq_ready(struct Buffer *buf, int count); -void nsq_finish(struct Buffer *buf, const char *id); -void nsq_requeue(struct Buffer *buf, const char *id, int timeout_ms); -void nsq_nop(struct Buffer *buf); +void *nsq_buf_malloc(size_t buf_size, size_t n, size_t l); +void nsq_buffer_add(nsqBuf *buf, const char *name, const nsqCmdParams params[], size_t psize, const char *body, const size_t body_length); -struct NSQMessage { - int64_t timestamp; - uint16_t attempts; - char id[16+1]; - size_t body_length; - char *body; -}; - -struct NSQMessage *nsq_decode_message(const char *data, size_t data_length); -void free_nsq_message(struct NSQMessage *msg); +void nsq_subscribe(nsqBuf *buf, const char *topic, const char *channel); +void nsq_ready(nsqBuf *buf, int count); +void nsq_finish(nsqBuf *buf, const char *id); +void nsq_requeue(nsqBuf *buf, const char *id, int timeout_ms); +void nsq_nop(nsqBuf *buf); +void nsq_publish(nsqBuf *buf, const char *topic, const char *body); +void nsq_multi_publish(nsqBuf *buf, const char *topic, const char **body, const size_t body_size); +void nsq_defer_publish(nsqBuf *buf, const char *topic, const char *body, int defer_time_sec); +void nsq_touch(nsqBuf *buf, const char *id); +void nsq_cleanly_close_connection(nsqBuf *buf); +void nsq_auth(nsqBuf *buf, const char *secret); +void nsq_identify(nsqBuf *buf, const char *json_body); -struct NSQLookupdEndpoint { - char *address; - int port; - struct NSQLookupdEndpoint *next; -}; +nsqMsg *nsq_decode_message(const char *data, size_t data_length); +void free_nsq_message(nsqMsg *msg); -struct NSQLookupdEndpoint *new_nsqlookupd_endpoint(const char *address, int port); -void free_nsqlookupd_endpoint(struct NSQLookupdEndpoint *nsqlookupd_endpoint); +nsqLE *new_nsqlookupd_endpoint(const char *address, int port); +void free_nsqlookupd_endpoint(nsqLE *nsqlookupd_endpoint); #endif diff --git a/nsqd_connection.c b/nsqd_connection.c index 76f74f7..b100f1e 100644 --- a/nsqd_connection.c +++ b/nsqd_connection.c @@ -1,17 +1,11 @@ #include "nsq.h" -#ifdef DEBUG -#define _DEBUG(...) fprintf(stdout, __VA_ARGS__) -#else -#define _DEBUG(...) do {;} while (0) -#endif +static void nsqd_connection_read_size(nsqBufdSock *buffsock, void *arg); +static void nsqd_connection_read_data(nsqBufdSock *buffsock, void *arg); -static void nsqd_connection_read_size(struct BufferedSocket *buffsock, void *arg); -static void nsqd_connection_read_data(struct BufferedSocket *buffsock, void *arg); - -static void nsqd_connection_connect_cb(struct BufferedSocket *buffsock, void *arg) +static void nsqd_connection_connect_cb(nsqBufdSock *buffsock, void *arg) { - struct NSQDConnection *conn = (struct NSQDConnection *)arg; + nsqdConn *conn = (nsqdConn *)arg; _DEBUG("%s: %p\n", __FUNCTION__, arg); @@ -26,9 +20,9 @@ static void nsqd_connection_connect_cb(struct BufferedSocket *buffsock, void *ar buffered_socket_read_bytes(buffsock, 4, nsqd_connection_read_size, conn); } -static void nsqd_connection_read_size(struct BufferedSocket *buffsock, void *arg) +static void nsqd_connection_read_size(nsqBufdSock *buffsock, void *arg) { - struct NSQDConnection *conn = (struct NSQDConnection *)arg; + nsqdConn *conn = (nsqdConn *)arg; uint32_t *msg_size_be; _DEBUG("%s: %p\n", __FUNCTION__, arg); @@ -44,10 +38,10 @@ static void nsqd_connection_read_size(struct BufferedSocket *buffsock, void *arg buffered_socket_read_bytes(buffsock, conn->current_msg_size, nsqd_connection_read_data, conn); } -static void nsqd_connection_read_data(struct BufferedSocket *buffsock, void *arg) +static void nsqd_connection_read_data(nsqBufdSock *buffsock, void *arg) { - struct NSQDConnection *conn = (struct NSQDConnection *)arg; - struct NSQMessage *msg; + nsqdConn *conn = (nsqdConn *)arg; + nsqMsg *msg; conn->current_frame_type = ntohl(*((uint32_t *)buffsock->read_buf->data)); buffer_drain(buffsock->read_buf, 4); @@ -78,9 +72,9 @@ static void nsqd_connection_read_data(struct BufferedSocket *buffsock, void *arg buffered_socket_read_bytes(buffsock, 4, nsqd_connection_read_size, conn); } -static void nsqd_connection_close_cb(struct BufferedSocket *buffsock, void *arg) +static void nsqd_connection_close_cb(nsqBufdSock *buffsock, void *arg) { - struct NSQDConnection *conn = (struct NSQDConnection *)arg; + nsqdConn *conn = (nsqdConn *)arg; _DEBUG("%s: %p\n", __FUNCTION__, arg); @@ -89,23 +83,23 @@ static void nsqd_connection_close_cb(struct BufferedSocket *buffsock, void *arg) } } -static void nsqd_connection_error_cb(struct BufferedSocket *buffsock, void *arg) +static void nsqd_connection_error_cb(nsqBufdSock *buffsock, void *arg) { - struct NSQDConnection *conn = (struct NSQDConnection *)arg; + nsqdConn *conn = (nsqdConn *)arg; _DEBUG("%s: conn %p\n", __FUNCTION__, conn); } -struct NSQDConnection *new_nsqd_connection(struct ev_loop *loop, const char *address, int port, - void (*connect_callback)(struct NSQDConnection *conn, void *arg), - void (*close_callback)(struct NSQDConnection *conn, void *arg), - void (*msg_callback)(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg), +nsqdConn *new_nsqd_connection(struct ev_loop *loop, const char *address, int port, + void (*connect_callback)(nsqdConn *conn, void *arg), + void (*close_callback)(nsqdConn *conn, void *arg), + void (*msg_callback)(nsqdConn *conn, nsqMsg *msg, void *arg), void *arg) { - struct NSQDConnection *conn; - struct NSQReader *rdr = (struct NSQReader *)arg; + nsqdConn *conn; + nsqRdr *rdr = (nsqRdr *)arg; - conn = (struct NSQDConnection *)malloc(sizeof(struct NSQDConnection)); + conn = (nsqdConn *)malloc(sizeof(nsqdConn)); conn->address = strdup(address); conn->port = port; conn->command_buf = new_buffer(rdr->cfg->command_buf_len, rdr->cfg->command_buf_capacity); @@ -127,7 +121,7 @@ struct NSQDConnection *new_nsqd_connection(struct ev_loop *loop, const char *add return conn; } -void free_nsqd_connection(struct NSQDConnection *conn) +void free_nsqd_connection(nsqdConn *conn) { if (conn) { nsqd_connection_stop_timer(conn); @@ -138,26 +132,26 @@ void free_nsqd_connection(struct NSQDConnection *conn) } } -int nsqd_connection_connect(struct NSQDConnection *conn) +int nsqd_connection_connect(nsqdConn *conn) { return buffered_socket_connect(conn->bs); } -void nsqd_connection_disconnect(struct NSQDConnection *conn) +void nsqd_connection_disconnect(nsqdConn *conn) { buffered_socket_close(conn->bs); } -void nsqd_connection_init_timer(struct NSQDConnection *conn, +void nsqd_connection_init_timer(nsqdConn *conn, void (*reconnect_callback)(EV_P_ ev_timer *w, int revents)) { - struct NSQReader *rdr = (struct NSQReader *)conn->arg; + nsqRdr *rdr = (nsqRdr *)conn->arg; conn->reconnect_timer = (ev_timer *)malloc(sizeof(ev_timer)); ev_timer_init(conn->reconnect_timer, reconnect_callback, rdr->cfg->lookupd_interval, rdr->cfg->lookupd_interval); conn->reconnect_timer->data = conn; } -void nsqd_connection_stop_timer(struct NSQDConnection *conn) +void nsqd_connection_stop_timer(nsqdConn *conn) { if (conn && conn->reconnect_timer) { ev_timer_stop(conn->loop, conn->reconnect_timer); diff --git a/nsqlookupd.c b/nsqlookupd.c index 5b469bf..c562130 100644 --- a/nsqlookupd.c +++ b/nsqlookupd.c @@ -1,19 +1,14 @@ +#include "http.h" #include "json.h" #include "nsq.h" -#include "http.h" - -#ifdef DEBUG -#define _DEBUG(...) fprintf(stdout, __VA_ARGS__) -#else -#define _DEBUG(...) do {;} while (0) -#endif +#include "utlist.h" -void nsq_lookupd_request_cb(struct HttpRequest *req, struct HttpResponse *resp, void *arg) +void nsq_lookupd_request_cb(httpRequest *req, httpResponse *resp, void *arg) { - struct NSQReader *rdr = (struct NSQReader *)arg; + nsqRdr *rdr = (nsqRdr *)arg; nsq_json_t *jsobj, *data, *producers, *producer, *broadcast_address_obj, *tcp_port_obj; nsq_json_tokener_t *jstok; - struct NSQDConnection *conn; + nsqdConn *conn; const char *broadcast_address; int i, found, tcp_port; @@ -81,11 +76,11 @@ void nsq_lookupd_request_cb(struct HttpRequest *req, struct HttpResponse *resp, free_http_request(req); } -struct NSQLookupdEndpoint *new_nsqlookupd_endpoint(const char *address, int port) +nsqLE *new_nsqlookupd_endpoint(const char *address, int port) { - struct NSQLookupdEndpoint *nsqlookupd_endpoint; + nsqLE *nsqlookupd_endpoint; - nsqlookupd_endpoint = (struct NSQLookupdEndpoint *)malloc(sizeof(struct NSQLookupdEndpoint)); + nsqlookupd_endpoint = (nsqLE *)malloc(sizeof(nsqLE)); nsqlookupd_endpoint->address = strdup(address); nsqlookupd_endpoint->port = port; nsqlookupd_endpoint->next = NULL; @@ -93,7 +88,7 @@ struct NSQLookupdEndpoint *new_nsqlookupd_endpoint(const char *address, int port return nsqlookupd_endpoint; } -void free_nsqlookupd_endpoint(struct NSQLookupdEndpoint *nsqlookupd_endpoint) +void free_nsqlookupd_endpoint(nsqLE *nsqlookupd_endpoint) { if (nsqlookupd_endpoint) { free(nsqlookupd_endpoint->address); diff --git a/reader.c b/reader.c index 14141fc..b403b3b 100644 --- a/reader.c +++ b/reader.c @@ -1,12 +1,8 @@ +#include + +#include "http.h" #include "nsq.h" #include "utlist.h" -#include "http.h" - -#ifdef DEBUG -#define _DEBUG(...) fprintf(stdout, __VA_ARGS__) -#else -#define _DEBUG(...) do {;} while (0) -#endif #define DEFAULT_LOOKUPD_INTERVAL 5. #define DEFAULT_COMMAND_BUF_LEN 4096 @@ -16,9 +12,9 @@ #define DEFAULT_WRITE_BUF_LEN 16 * 1024 #define DEFAULT_WRITE_BUF_CAPACITY 16 * 1024 -static void nsq_reader_connect_cb(struct NSQDConnection *conn, void *arg) +static void nsq_reader_connect_cb(nsqdConn *conn, void *arg) { - struct NSQReader *rdr = (struct NSQReader *)arg; + nsqRdr *rdr = (nsqRdr *)arg; _DEBUG("%s: %p\n", __FUNCTION__, rdr); @@ -37,9 +33,9 @@ static void nsq_reader_connect_cb(struct NSQDConnection *conn, void *arg) buffered_socket_write_buffer(conn->bs, conn->command_buf); } -static void nsq_reader_msg_cb(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg) +static void nsq_reader_msg_cb(nsqdConn *conn, nsqMsg *msg, void *arg) { - struct NSQReader *rdr = (struct NSQReader *)arg; + nsqRdr *rdr = (nsqRdr *)arg; _DEBUG("%s: %p %p\n", __FUNCTION__, msg, rdr); @@ -49,9 +45,9 @@ static void nsq_reader_msg_cb(struct NSQDConnection *conn, struct NSQMessage *ms } } -static void nsq_reader_close_cb(struct NSQDConnection *conn, void *arg) +static void nsq_reader_close_cb(nsqdConn *conn, void *arg) { - struct NSQReader *rdr = (struct NSQReader *)arg; + nsqRdr *rdr = (nsqRdr *)arg; _DEBUG("%s: %p\n", __FUNCTION__, rdr); @@ -69,12 +65,12 @@ static void nsq_reader_close_cb(struct NSQDConnection *conn, void *arg) } } -void nsq_lookupd_request_cb(struct HttpRequest *req, struct HttpResponse *resp, void *arg); +void nsq_lookupd_request_cb(httpRequest *req, httpResponse *resp, void *arg); static void nsq_reader_reconnect_cb(EV_P_ struct ev_timer *w, int revents) { - struct NSQDConnection *conn = (struct NSQDConnection *)w->data; - struct NSQReader *rdr = (struct NSQReader *)conn->arg; + nsqdConn *conn = (nsqdConn *)w->data; + nsqRdr *rdr = (nsqRdr *)conn->arg; if (rdr->lookupd == NULL) { _DEBUG("%s: There is no lookupd, try to reconnect to nsqd directly\n", __FUNCTION__); @@ -86,9 +82,9 @@ static void nsq_reader_reconnect_cb(EV_P_ struct ev_timer *w, int revents) static void nsq_reader_lookupd_poll_cb(EV_P_ struct ev_timer *w, int revents) { - struct NSQReader *rdr = (struct NSQReader *)w->data; - struct NSQLookupdEndpoint *nsqlookupd_endpoint; - struct HttpRequest *req; + nsqRdr *rdr = (nsqRdr *)w->data; + nsqLE *nsqlookupd_endpoint; + httpRequest *req; int i, idx, count = 0; char buf[256]; @@ -117,16 +113,16 @@ static void nsq_reader_lookupd_poll_cb(EV_P_ struct ev_timer *w, int revents) ev_timer_again(rdr->loop, &rdr->lookupd_poll_timer); } -struct NSQReader *new_nsq_reader(struct ev_loop *loop, const char *topic, const char *channel, void *ctx, - struct NSQReaderCfg *cfg, - void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn), - void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn), - void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx)) +nsqRdr *new_nsq_reader(struct ev_loop *loop, const char *topic, const char *channel, void *ctx, + nsqRdrCfg *cfg, + void (*connect_callback)(nsqRdr *rdr, nsqdConn *conn), + void (*close_callback)(nsqRdr *rdr, nsqdConn *conn), + void (*msg_callback)(nsqRdr *rdr, nsqdConn *conn, nsqMsg *msg, void *ctx)) { - struct NSQReader *rdr; + nsqRdr *rdr; - rdr = (struct NSQReader *)malloc(sizeof(struct NSQReader)); - rdr->cfg = (struct NSQReaderCfg *)malloc(sizeof(struct NSQReaderCfg)); + rdr = (nsqRdr *)malloc(sizeof(nsqRdr)); + rdr->cfg = (nsqRdrCfg *)malloc(sizeof(nsqRdrCfg)); if (cfg == NULL) { rdr->cfg->lookupd_interval = DEFAULT_LOOKUPD_INTERVAL; rdr->cfg->command_buf_len = DEFAULT_COMMAND_BUF_LEN; @@ -160,10 +156,10 @@ struct NSQReader *new_nsq_reader(struct ev_loop *loop, const char *topic, const return rdr; } -void free_nsq_reader(struct NSQReader *rdr) +void free_nsq_reader(nsqRdr *rdr) { - struct NSQDConnection *conn; - struct NSQLookupdEndpoint *nsqlookupd_endpoint; + nsqdConn *conn; + nsqLE *nsqlookupd_endpoint; if (rdr) { // TODO: this should probably trigger disconnections and then keep @@ -181,10 +177,10 @@ void free_nsq_reader(struct NSQReader *rdr) } } -int nsq_reader_add_nsqlookupd_endpoint(struct NSQReader *rdr, const char *address, int port) +int nsq_reader_add_nsqlookupd_endpoint(nsqRdr *rdr, const char *address, int port) { - struct NSQLookupdEndpoint *nsqlookupd_endpoint; - struct NSQDConnection *conn; + nsqLE *nsqlookupd_endpoint; + nsqdConn *conn; if (rdr->lookupd == NULL) { // Stop reconnect timers, use lookupd timer instead @@ -203,9 +199,9 @@ int nsq_reader_add_nsqlookupd_endpoint(struct NSQReader *rdr, const char *addres return 1; } -int nsq_reader_connect_to_nsqd(struct NSQReader *rdr, const char *address, int port) +int nsq_reader_connect_to_nsqd(nsqRdr *rdr, const char *address, int port) { - struct NSQDConnection *conn; + nsqdConn *conn; int rc; conn = new_nsqd_connection(rdr->loop, address, port, @@ -224,6 +220,6 @@ int nsq_reader_connect_to_nsqd(struct NSQReader *rdr, const char *address, int p void nsq_run(struct ev_loop *loop) { - srand(time(NULL)); + srand((unsigned)time(NULL)); ev_loop(loop, 0); } diff --git a/test.c b/test.c index 3cab312..c87b5e5 100644 --- a/test.c +++ b/test.c @@ -1,12 +1,6 @@ #include "nsq.h" -#ifdef DEBUG -#define _DEBUG(...) fprintf(stdout, __VA_ARGS__) -#else -#define _DEBUG(...) do {;} while (0) -#endif - -static void message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx) +static void message_handler(nsqRdr *rdr, nsqdConn *conn, nsqMsg *msg, void *ctx) { _DEBUG("%s: %lld, %d, %s, %lu, %.*s\n", __FUNCTION__, msg->timestamp, msg->attempts, msg->id, msg->body_length, (int)msg->body_length, msg->body); @@ -32,18 +26,22 @@ static void message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, int main(int argc, char **argv) { - struct NSQReader *rdr; + if (argc < 4) { + printf("not enough args from command line\n"); + return 1; + } + nsqRdr *rdr; struct ev_loop *loop; void *ctx = NULL; //(void *)(new TestNsqMsgContext()); loop = ev_default_loop(0); - rdr = new_nsq_reader(loop, "test", "ch", (void *)ctx, - NULL, NULL, NULL, message_handler); + rdr = new_nsq_reader(loop, argv[2], argv[3], ctx, NULL, NULL, NULL, message_handler); + #ifdef NSQD_STANDALONE - nsq_reader_connect_to_nsqd(rdr, "127.0.0.1", 4150); - nsq_reader_connect_to_nsqd(rdr, "127.0.0.1", 14150); + nsq_reader_connect_to_nsqd(rdr, argv[1], 4150); +// nsq_reader_connect_to_nsqd(rdr, "127.0.0.1", 14150); #else - nsq_reader_add_nsqlookupd_endpoint(rdr, "127.0.0.1", 4161); + nsq_reader_add_nsqlookupd_endpoint(rdr, argv[1], 4161); #endif nsq_run(loop); diff --git a/utlist.h b/utlist.h index 6bccec7..ac0c736 100644 --- a/utlist.h +++ b/utlist.h @@ -725,4 +725,3 @@ do { } while (0) \ #endif /* UTLIST_H */ -