Skip to content

Commit

Permalink
Barbershop will now periodically save state to disk. On startup it wi…
Browse files Browse the repository at this point in the history
…ll also attempt to load state from disk.
  • Loading branch information
Nick committed Feb 18, 2010
1 parent a9e0139 commit d630fea
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
74 changes: 74 additions & 0 deletions src/barbershop.c
Expand Up @@ -34,13 +34,16 @@ THE SOFTWARE.
#include <err.h> #include <err.h>
#include <time.h> #include <time.h>
#include <assert.h> #include <assert.h>
#include <pthread.h>


#include "scores.h" #include "scores.h"
#include "bst.h" #include "bst.h"
#include "barbershop.h" #include "barbershop.h"
#include "stats.h" #include "stats.h"
#include <event.h> #include <event.h>


pthread_mutex_t scores_mutex;

static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) { static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
char *s, *e; char *s, *e;
size_t ntokens = 0; size_t ntokens = 0;
Expand Down Expand Up @@ -108,7 +111,9 @@ void on_read(int fd, short ev, void *arg) {
Position lookup = Find(item_id, items); Position lookup = Find(item_id, items);
if (lookup == NULL) { if (lookup == NULL) {
items = Insert(item_id, score, items); items = Insert(item_id, score, items);
pthread_mutex_lock(&scores_mutex);
scores = promoteItem(scores, score, item_id, -1); scores = promoteItem(scores, score, item_id, -1);
pthread_mutex_unlock(&scores_mutex);
app_stats.items += 1; app_stats.items += 1;
} else { } else {
int old_score = lookup->score; int old_score = lookup->score;
Expand All @@ -118,19 +123,25 @@ void on_read(int fd, short ev, void *arg) {
lookup->score += score; lookup->score += score;
} }
assert(lookup->score > old_score); assert(lookup->score > old_score);
pthread_mutex_lock(&scores_mutex);
scores = promoteItem(scores, lookup->score, item_id, old_score); scores = promoteItem(scores, lookup->score, item_id, old_score);
pthread_mutex_unlock(&scores_mutex);
} }
app_stats.updates += 1; app_stats.updates += 1;
reply(fd, "OK\r\n"); reply(fd, "OK\r\n");
} else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "peak") == 0) { } else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "peak") == 0) {
int next; int next;
pthread_mutex_lock(&scores_mutex);
scores = PeakNext(scores, &next); scores = PeakNext(scores, &next);
pthread_mutex_unlock(&scores_mutex);
char msg[32]; char msg[32];
sprintf(msg, "%d\r\n", next); sprintf(msg, "%d\r\n", next);
reply(fd, msg); reply(fd, msg);
} else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "next") == 0) { } else if (ntokens == 2 && strcmp(tokens[COMMAND_TOKEN].value, "next") == 0) {
int next; int next;
pthread_mutex_lock(&scores_mutex);
scores = NextItem(scores, &next); scores = NextItem(scores, &next);
pthread_mutex_unlock(&scores_mutex);
if (next != -1) { if (next != -1) {
Position lookup = Find( next, items ); Position lookup = Find( next, items );
if (lookup != NULL) { if (lookup != NULL) {
Expand Down Expand Up @@ -182,11 +193,16 @@ int main(int argc, char **argv) {
items = MakeEmpty(NULL); items = MakeEmpty(NULL);
scores = NULL; scores = NULL;


load_snapshot("barbershop.snapshot");

time(&app_stats.started_at); time(&app_stats.started_at);
app_stats.version = "00.01.00"; app_stats.version = "00.01.00";
app_stats.updates = 0; app_stats.updates = 0;
app_stats.items = 0; app_stats.items = 0;
app_stats.pools = 0; app_stats.pools = 0;

pthread_t garbage_collector;
pthread_create(&garbage_collector, NULL, (void *) gc_thread, NULL);


int listen_fd; int listen_fd;
struct sockaddr_in listen_addr; struct sockaddr_in listen_addr;
Expand Down Expand Up @@ -224,3 +240,61 @@ void reply(int fd, char *buffer) {
printf("ERROR writing to socket"); printf("ERROR writing to socket");
} }
} }

void gc_thread() {
while (1) {
sleep(60);
pthread_mutex_lock(&scores_mutex);
sync_to_disk(scores, "barbershop.snapshot");
pthread_mutex_unlock(&scores_mutex);
}
pthread_exit(0);
}

void load_snapshot(char *filename) {
FILE *file_in;

file_in = fopen(filename, "r");
if (file_in == NULL) {
return;
}

char line[80];
int item_id, score;
pthread_mutex_lock(&scores_mutex);
while(fgets(line, 80, file_in) != NULL) {
sscanf(line, "%d %d", &item_id, &score);
items = Insert(item_id, score, items);
scores = promoteItem(scores, score, item_id, -1);
app_stats.items += 1;
printf("Loading %d with score %d\n", item_id, score);
}
pthread_mutex_unlock(&scores_mutex);
fclose(file_in);
}

void sync_to_disk(PoolNode *head, char *filename) {
int cur_char;
FILE *out_file;

remove(filename);

out_file = fopen(filename, "w");
if (out_file == NULL) {
fprintf(stderr, "Can not open output file\n");
exit (8);
}

MemberNode *member;
while (head) {
member = head->members;
while (member) {
fprintf(out_file, "%d %d\n", member->item, head->score);
member = member->next;
}
head = head->next;
}

fclose(out_file);
return (0);
}
4 changes: 3 additions & 1 deletion src/barbershop.h
Expand Up @@ -54,4 +54,6 @@ void on_accept(int fd, short ev, void *arg);
int main(int argc, char **argv); int main(int argc, char **argv);
int setnonblock(int fd); int setnonblock(int fd);
void reply(int fd, char *buffer); void reply(int fd, char *buffer);

void gc_thread();
void load_snapshot(char *filename);
void sync_to_disk(PoolNode *head, char *filename);
1 change: 1 addition & 0 deletions tests/check_barbershop.c
Expand Up @@ -75,6 +75,7 @@ START_TEST (test_scattered_adds) {
e = promoteItem(e, 7, 5002, -1); e = promoteItem(e, 7, 5002, -1);
e = promoteItem(e, 8, 5001, 5); e = promoteItem(e, 8, 5001, 5);
e = promoteItem(e, 1, 5003, -1); e = promoteItem(e, 1, 5003, -1);
sync_to_disk(e, "barbershop.dump");
int next = -1; int next = -1;
e = NextItem(e, &next); e = NextItem(e, &next);
fail_unless(next == 5000); fail_unless(next == 5000);
Expand Down

0 comments on commit d630fea

Please sign in to comment.