From d630feaeab6d3e37ba86bd615602f699d2c1f2a2 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 17 Feb 2010 17:13:37 -0800 Subject: [PATCH] Barbershop will now periodically save state to disk. On startup it will also attempt to load state from disk. --- src/barbershop.c | 74 ++++++++++++++++++++++++++++++++++++++++ src/barbershop.h | 4 ++- tests/check_barbershop.c | 1 + 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/src/barbershop.c b/src/barbershop.c index 115db13..7b6c679 100644 --- a/src/barbershop.c +++ b/src/barbershop.c @@ -34,6 +34,7 @@ THE SOFTWARE. #include #include #include +#include #include "scores.h" #include "bst.h" @@ -41,6 +42,8 @@ THE SOFTWARE. #include "stats.h" #include +pthread_mutex_t scores_mutex; + static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) { char *s, *e; size_t ntokens = 0; @@ -108,7 +111,9 @@ void on_read(int fd, short ev, void *arg) { Position lookup = Find(item_id, items); if (lookup == NULL) { items = Insert(item_id, score, items); + pthread_mutex_lock(&scores_mutex); scores = promoteItem(scores, score, item_id, -1); + pthread_mutex_unlock(&scores_mutex); app_stats.items += 1; } else { int old_score = lookup->score; @@ -118,19 +123,25 @@ void on_read(int fd, short ev, void *arg) { lookup->score += score; } assert(lookup->score > old_score); + pthread_mutex_lock(&scores_mutex); scores = promoteItem(scores, lookup->score, item_id, old_score); + pthread_mutex_unlock(&scores_mutex); } app_stats.updates += 1; reply(fd, "OK\r\n"); } else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "peak") == 0) { int next; + pthread_mutex_lock(&scores_mutex); scores = PeakNext(scores, &next); + pthread_mutex_unlock(&scores_mutex); char msg[32]; sprintf(msg, "%d\r\n", next); reply(fd, msg); } else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "next") == 0) { int next; + pthread_mutex_lock(&scores_mutex); scores = NextItem(scores, &next); + pthread_mutex_unlock(&scores_mutex); if (next != -1) { Position lookup = Find( next, items ); if (lookup != NULL) { @@ -182,11 +193,16 @@ int main(int argc, char **argv) { items = MakeEmpty(NULL); scores = NULL; + load_snapshot("barbershop.snapshot"); + time(&app_stats.started_at); app_stats.version = "00.01.00"; app_stats.updates = 0; app_stats.items = 0; app_stats.pools = 0; + + pthread_t garbage_collector; + pthread_create(&garbage_collector, NULL, (void *) gc_thread, NULL); int listen_fd; struct sockaddr_in listen_addr; @@ -224,3 +240,61 @@ void reply(int fd, char *buffer) { printf("ERROR writing to socket"); } } + +void gc_thread() { + while (1) { + sleep(60); + pthread_mutex_lock(&scores_mutex); + sync_to_disk(scores, "barbershop.snapshot"); + pthread_mutex_unlock(&scores_mutex); + } + pthread_exit(0); +} + +void load_snapshot(char *filename) { + FILE *file_in; + + file_in = fopen(filename, "r"); + if (file_in == NULL) { + return; + } + + char line[80]; + int item_id, score; + pthread_mutex_lock(&scores_mutex); + while(fgets(line, 80, file_in) != NULL) { + sscanf(line, "%d %d", &item_id, &score); + items = Insert(item_id, score, items); + scores = promoteItem(scores, score, item_id, -1); + app_stats.items += 1; + printf("Loading %d with score %d\n", item_id, score); + } + pthread_mutex_unlock(&scores_mutex); + fclose(file_in); +} + +void sync_to_disk(PoolNode *head, char *filename) { + int cur_char; + FILE *out_file; + + remove(filename); + + out_file = fopen(filename, "w"); + if (out_file == NULL) { + fprintf(stderr, "Can not open output file\n"); + exit (8); + } + + MemberNode *member; + while (head) { + member = head->members; + while (member) { + fprintf(out_file, "%d %d\n", member->item, head->score); + member = member->next; + } + head = head->next; + } + + fclose(out_file); + return (0); +} diff --git a/src/barbershop.h b/src/barbershop.h index 221c8fc..f06b291 100644 --- a/src/barbershop.h +++ b/src/barbershop.h @@ -54,4 +54,6 @@ void on_accept(int fd, short ev, void *arg); int main(int argc, char **argv); int setnonblock(int fd); void reply(int fd, char *buffer); - +void gc_thread(); +void load_snapshot(char *filename); +void sync_to_disk(PoolNode *head, char *filename); diff --git a/tests/check_barbershop.c b/tests/check_barbershop.c index 8110d6e..4673d97 100644 --- a/tests/check_barbershop.c +++ b/tests/check_barbershop.c @@ -75,6 +75,7 @@ START_TEST (test_scattered_adds) { e = promoteItem(e, 7, 5002, -1); e = promoteItem(e, 8, 5001, 5); e = promoteItem(e, 1, 5003, -1); + sync_to_disk(e, "barbershop.dump"); int next = -1; e = NextItem(e, &next); fail_unless(next == 5000);