Permalink
Browse files

Added next and stats functionality. Added garbage collection queues a…

…lthough actual garbage collection in items and scores is unimplemented.
  • Loading branch information...
1 parent 1573547 commit 8b19b0854e3a9606df63097e7ad0391be346efbd @ngerakines committed Feb 1, 2010
Showing with 330 additions and 121 deletions.
  1. +3 −3 Makefile
  2. +131 −4 README.markdown
  3. +112 −79 barbershop.c
  4. +1 −0 bst.c
  5. +60 −35 scores.c
  6. +1 −0 scores.h
  7. +22 −0 stats.h
View
@@ -25,9 +25,9 @@ all: barbershop
# Deps (use make dep to generate this)
client.o: client.c
-barbershop.o: barbershop.c
-bst.o: bst.c bst.h
-scores.o: scores.c scores.h
+barbershop.o: barbershop.c stats.h
+bst.o: bst.c bst.h stats.h
+scores.o: scores.c scores.h stats.h
benchmark.o: benchmark.c
barbershop: $(OBJ)
View
@@ -4,19 +4,146 @@ Barbershop is a fast, lightweight priority queue system for a specific
use case that is undisclosed. The goal is to create a TCP/IP based
service that uses libev(ent) to create a shifting priority queue.
-
Internally a list of items is tracked, each having a priority. Some
connections to the daemon set/update item priorities while other
connections want to pop something off of the list based on priority.
As items are added, the hash of items and priorities is updated.
+# Protocol
+
+Clients of barbershop communicate with server through TCP connections.
+A given running barbershop server listens on some port; clients connect
+to that port, send commands to the server, read responses and eventually
+close the connection.
+
+There is no need to send any command to end the session. A client may
+just close the connection at any moment it no longer needs it. Note,
+however, that clients are encouraged to cache their connections rather
+than reopen them every time they need to store or retrieve data. Caching
+connections will eliminate the overhead associated with establishing a
+TCP connection (the overhead of preparing for a new connection on the
+server side is insignificant compared to this).
+
+Commands lines are always terminated by \r\n. Unstructured data is also
+terminated by \r\n, even though \r, \n or any other 8-bit characters
+may also appear inside the data. Therefore, when a client retrieves
+data from a server, it must use the length of the data block (which it
+will be provided with) to determine where the data block ends, and not
+the fact that \r\n follows the end of the data block, even though it
+does.
+
+## Items and Priority Values
+
+Items and their priority scores are unsigned 32bit integers.
+
+## Commands
+
+There are three types of commands. The update command is used to create
+or update an item's priority. If the item given is being introduced then
+it's base priority is set, else the priority is incremented by the given
+amount.
+
+The next command is used to return the next ordered by priority.
+
+Last, the stats command is used to get babershop daemon stats and usage
+information.
+
+# Error responses
+
+Each command sent by a client may be answered with an error string
+from the server. These error strings come in three types:
+
+"ERROR\r\n"
+
+The client sent a nonexistent command name.
+
+"CLIENT_ERROR <error>\r\n"
+
+Some sort of client error in the input line, i.e. the input doesn't
+conform to the protocol in some way. <error> is a human-readable error
+string.
+
+"SERVER\_ERROR <error>\r\n"
+
+Some sort of server error prevents the server from carrying out the
+command. <error> is a human-readable error string. In cases of severe
+server errors, which make it impossible to continue serving the client
+(this shouldn't normally happen), the server will close the connection
+after sending the error line. This is the only case in which the server
+closes a connection to a client.
+
+In the descriptions of individual commands below, these error lines
+are not again specifically mentioned, but clients must allow for their
+possibility.
+
+## Storage commands
+
+First, the client sends a command line which looks like this:
+
+ <command name> <item id> <priority>\r\n
+
+* <command name> is "update".
+* <item id> is the unique 32bit integer representing the item.
+* <priority> is a 32bit integer that the priority will either be set to or increment by.
+
+After sending the command line the client awaits the reply which may be:
+
+* "OK\r\n", to indicate success.
+* One of the possible client or server error messages listed above.
+
+
+## Retrieval command
+
+The retrieval commands "next" operates like this:
+
+ next\r\n
+
+After sending the command line the client awaits the reply which may be:
+
+* "-1\r\n", indicates that there are no items to act on.
+* "<32u>\r\n", the item id for the client to process.
+* One of the possible client or server error messages listed above.
+
+## Statistics
+
+The command "stats" is used to query the server about statistics it
+maintains and other internal data.
+
+ stats\r\n
+
+Upon receiving the "stats" command the server sents a number of lines
+which look like this:
+
+ STAT <name> <value>\r\n
+
+The server terminates this list with the line:
+
+ END\r\n
+
+In each line of statistics, <name> is the name of this statistic, and
+<value> is the data. The following is the list of all names sent in
+response to the "stats" command, together with the type of the value
+sent for this name, and the meaning of the value.
+
+In the type column below, "32u" means a 32-bit unsigned integer, "64u"
+means a 64-bit unsigner integer. '32u:32u' means two 32-but unsigned
+integers separated by a colon.
+
+* 'uptime' (32u) Number of seconds this server has been running.
+* 'version' (string) Version string of this server.
+* 'updates' (32u) Number of update commands received by this server.
+* 'items' (32u) Number of non-garbage collected items.
+* 'items\_gc' (32u) Number of items that would remain after garbage collection.
+* 'pools' (32u) Number of non-garbage collected pools.
+* 'pools_gc' (32u) Number of pools that would remain after garbage collection.
+
# TODO
- * Add next functionality.
- * Add stats structure and expose it.
+ * Start running it through memory leak detection tools.
+ * Add more info to stats interface.
* Make fast and lightweight.
* Clean up the binary search tree interface and make it items specific.
- * Move the scores container to it's own source file.
+ * Add ability to put items into a limbo pool until worker response with a comfirmation message that item has been processed.
* Create a legit benchmark tool.
* Add input args to the client tool.
View
@@ -14,94 +14,118 @@ Copyright (c) 2010 Nick Gerakines <nick at gerakines dot net>
#include <fcntl.h>
#include <errno.h>
#include <err.h>
+#include <time.h>
#include "scores.h"
#include "bst.h"
#include "barbershop.h"
+#include "stats.h"
#include <event.h>
static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
- char *s, *e;
- size_t ntokens = 0;
- for (s = e = command; ntokens < max_tokens - 1; ++e) {
- if (*e == ' ') {
- if (s != e) {
- tokens[ntokens].value = s;
- tokens[ntokens].length = e - s;
- ntokens++;
- *e = '\0';
- }
- s = e + 1;
- }
- else if (*e == '\0') {
- if (s != e) {
- tokens[ntokens].value = s;
- tokens[ntokens].length = e - s;
- ntokens++;
- }
- break; /* string end */
- }
- }
- tokens[ntokens].value = *e == '\0' ? NULL : e;
- tokens[ntokens].length = 0;
- ntokens++;
- return ntokens;
+ char *s, *e;
+ size_t ntokens = 0;
+ for (s = e = command; ntokens < max_tokens - 1; ++e) {
+ if (*e == ' ') {
+ if (s != e) {
+ tokens[ntokens].value = s;
+ tokens[ntokens].length = e - s;
+ ntokens++;
+ *e = '\0';
+ }
+ s = e + 1;
+ } else if (*e == '\0') {
+ if (s != e) {
+ tokens[ntokens].value = s;
+ tokens[ntokens].length = e - s;
+ ntokens++;
+ }
+ break;
+ }
+ }
+ tokens[ntokens].value = *e == '\0' ? NULL : e;
+ tokens[ntokens].length = 0;
+ ntokens++;
+ return ntokens;
}
+// TODO: Create an actual source file that decomposes input and
+// determines execution.
void on_read(int fd, short ev, void *arg) {
- struct client *client = (struct client *)arg;
- char buf[8196];
- int len;
- len = read(fd, buf, sizeof(buf));
- if (len == 0) {
- printf("Client disconnected.\n");
- close(fd);
- event_del(&client->ev_read);
- free(client);
- return;
- } else if (len < 0) {
- printf("Socket failure, disconnecting client: %s", strerror(errno));
- close(fd);
- event_del(&client->ev_read);
- free(client);
- return;
- }
- /* This could probably be done better. */
- char* nl;
- nl = strrchr(buf, '\r');
- if (nl) { *nl = '\0'; }
- nl = strrchr(buf, '\n');
- if (nl) { *nl = '\0'; }
- /* Figure out what they want to do. */
- token_t tokens[MAX_TOKENS];
- size_t ntokens;
- ntokens = tokenize_command((char*)buf, tokens, MAX_TOKENS);
- if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN].value, "update") == 0) {
- int item_id = atoi(tokens[KEY_TOKEN].value);
- int score = atoi(tokens[VALUE_TOKEN].value);
+ struct client *client = (struct client *)arg;
+ // TODO: Find out what a reasonable size limit is on network input.
+ char buf[8196];
+ int len = read(fd, buf, sizeof(buf));
+ if (len == 0) {
+ printf("Client disconnected.\n");
+ close(fd);
+ event_del(&client->ev_read);
+ free(client);
+ return;
+ } else if (len < 0) {
+ printf("Socket failure, disconnecting client: %s", strerror(errno));
+ close(fd);
+ event_del(&client->ev_read);
+ free(client);
+ return;
+ }
+ // TOOD: Find a better way to do this {
+ char* nl;
+ nl = strrchr(buf, '\r');
+ if (nl) { *nl = '\0'; }
+ nl = strrchr(buf, '\n');
+ if (nl) { *nl = '\0'; }
+ // }
+ token_t tokens[MAX_TOKENS];
+ size_t ntokens = tokenize_command((char*)buf, tokens, MAX_TOKENS);
+ // TODO: Add support for the 'quit' command.
+ if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN].value, "update") == 0) {
+ int item_id = atoi(tokens[KEY_TOKEN].value);
+ int score = atoi(tokens[VALUE_TOKEN].value);
- Position lookup = Find( item_id, items );
- if (lookup == NULL) {
- items = Insert(item_id, score, items);
- scores = AddScoreToPool(scores, score, item_id);
- } else {
- int old_score = lookup->score;
- lookup->score += score;
- scores = PurgeThenAddScoreToPool(scores, lookup->score, item_id, old_score);
- }
- reply(fd, "OK\n");
- } else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "next") == 0) {
- reply(fd, "OK\n");
- } else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0) {
- printf("Dumping items tree:\n");
- DumpItems(items);
- printf("Dumping score buckets:\n");
- DumpScores(scores);
- printf("Client wants server stats.\n");
- reply(fd, "OK\n");
- } else {
- reply(fd, "NOOP\n");
- }
+ Position lookup = Find( item_id, items );
+ if (lookup == NULL) {
+ items = Insert(item_id, score, items);
+ scores = AddScoreToPool(scores, score, item_id);
+ app_stats.items += 1;
+ app_stats.items_gc += 1;
+ } else {
+ int old_score = lookup->score;
+ lookup->score += score;
+ scores = PurgeThenAddScoreToPool(scores, lookup->score, item_id, old_score);
+ }
+ app_stats.updates += 1;
+ reply(fd, "OK\r\n");
+ } else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "next") == 0) {
+ int next = GetNextItem(scores);
+ if (next != -1) {
+ app_stats.items_gc -= 1;
+ }
+ char msg[32];
+ sprintf(msg, "%d\r\n", next);
+ reply(fd, msg);
+ } else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0) {
+ // TODO: Find out of the stats output buffer can be reduced.
+ char out[128];
+ time_t current_time;
+ time(&current_time);
+ sprintf(out, "STAT uptime %d\r\n", (int)(current_time - app_stats.started_at)); reply(fd, out);
+ sprintf(out, "STAT version %s\r\n", app_stats.version); reply(fd, out);
+ sprintf(out, "STAT updates %d\r\n", app_stats.updates); reply(fd, out);
+ sprintf(out, "STAT items %d\r\n", app_stats.items); reply(fd, out);
+ sprintf(out, "STAT pools %d\r\n", app_stats.pools); reply(fd, out);
+ sprintf(out, "STAT pools_gc %d\r\n", app_stats.pools_gc); reply(fd, out);
+ sprintf(out, "STAT items_gc %d\r\n", app_stats.items_gc); reply(fd, out);
+ reply(fd, "END\r\n");
+ /*
+ printf("Dumping items tree:\n");
+ DumpItems(items);
+ printf("Dumping score buckets:\n");
+ DumpScores(scores);
+ */
+ } else {
+ reply(fd, "ERROR\r\n");
+ }
}
void on_accept(int fd, short ev, void *arg) {
@@ -127,8 +151,17 @@ void on_accept(int fd, short ev, void *arg) {
}
int main(int argc, char **argv) {
- items = MakeEmpty(NULL);
- scores = PrepScoreBucket(NULL);
+ items = MakeEmpty(NULL);
+ scores = PrepScoreBucket(NULL);
+
+ time(&app_stats.started_at);
+ app_stats.version = "00.01.00";
+ app_stats.updates = 0;
+ app_stats.items = 0;
+ app_stats.pools = 0;
+ app_stats.items_gc = 0;
+ app_stats.pools_gc = 0;
+
int listen_fd;
struct sockaddr_in listen_addr;
int reuseaddr_on = 1;
View
1 bst.c
@@ -1,4 +1,5 @@
#include "bst.h"
+#include "stats.h"
#include <stdlib.h>
#include <stdio.h>
Oops, something went wrong.

0 comments on commit 8b19b08

Please sign in to comment.