Skip to content

Commit

Permalink
Implemented soft-state timeouts for service routing entries in servd
Browse files Browse the repository at this point in the history
  • Loading branch information
Erik Nordström committed Mar 22, 2012
1 parent 64b9d48 commit 7d7fdce
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 77 deletions.
1 change: 1 addition & 0 deletions autogen.sh
Expand Up @@ -55,6 +55,7 @@
# detection behaviors. Similarly the _VERSION variables will override
# the minimum required version numbers.
#
#
# Examples:
#
# To obtain help on usage:
Expand Down
1 change: 1 addition & 0 deletions include/common/signal.h
Expand Up @@ -12,6 +12,7 @@ typedef struct signal {

int signal_init(struct signal *s);
void signal_destroy(struct signal *s);
int signal_clear_val(struct signal *s, int *val);
int signal_clear(struct signal *s);
int signal_get_fd(struct signal *s);
int signal_is_raised(const struct signal *s);
Expand Down
7 changes: 4 additions & 3 deletions include/common/timer.h
Expand Up @@ -6,20 +6,21 @@
#include <sys/time.h>
#include <pthread.h>
#include "heap.h"
#include "signal.h"

struct timer {
struct heapitem hi;
struct timespec timeout;
long expires; /* micro seconds */
int (*callback)(struct timer *t);
void (*callback)(struct timer *t);
void (*destruct)(struct timer *t);
void *data;
};

struct timer_queue {
struct heap queue;
pthread_mutex_t lock;
int pipefd[2];
struct signal signal;
pthread_t thr;
};

Expand All @@ -40,7 +41,7 @@ enum signal_result {
};

void timer_init(struct timer *t);
struct timer *timer_new_callback(int (*callback)(struct timer *t), void *data);
struct timer *timer_new_callback(void (*callback)(struct timer *t), void *data);
void timer_free(struct timer *t);
int timer_add(struct timer_queue *tq, struct timer *t);
void timer_del(struct timer_queue *tq, struct timer *t);
Expand Down
15 changes: 11 additions & 4 deletions src/common/signal.c
Expand Up @@ -49,20 +49,27 @@ void signal_destroy(struct signal *s)
close(s->fd[1]);
}

int signal_clear(struct signal *s)
int signal_clear_val(struct signal *s, int *val)
{
int val = 0, ret = 1;
int ret = 1;

while (ret > 0) {
ret = read(s->fd[0], &val, sizeof(val));
ret = read(s->fd[0], val, sizeof(*val));

if (ret == -1) {
if (errno == EWOULDBLOCK)
ret = 0;
}
}

return ret;
return ret > 0 ? 1 : ret;
}

int signal_clear(struct signal *s)
{
int val;

return signal_clear_val(s, &val);
}

int signal_get_fd(struct signal *s)
Expand Down
62 changes: 14 additions & 48 deletions src/common/timer.c
Expand Up @@ -52,7 +52,7 @@ static int heap_cmp(const struct heapitem *h1, const struct heapitem *h2)
return timespec_lt(&t1->timeout, &t2->timeout);
}

struct timer *timer_new_callback(int (*callback)(struct timer *),
struct timer *timer_new_callback(void (*callback)(struct timer *),
void *data)
{
struct timer *t = malloc(sizeof(struct timer));
Expand All @@ -69,44 +69,19 @@ struct timer *timer_new_callback(int (*callback)(struct timer *),

int timer_queue_get_signal(struct timer_queue *tq)
{
return tq->pipefd[0];
return signal_get_fd(&tq->signal);
}

int timer_queue_signal_raise(struct timer_queue *tq)
{
char s = 'w';
struct pollfd fds;
int ret = 0;

memset(&fds, 0, sizeof(fds));
fds.fd = tq->pipefd[0];
fds.events = POLLIN;
fds.revents = 0;

ret = poll(&fds, 1, 0);

if (ret == 1) {
/* Signal already raised */
return 0;
} else if (ret == 0) {
ret = write(tq->pipefd[1], &s, 1);
}

return ret;
return signal_raise(&tq->signal);
}

enum signal_result timer_queue_signal_lower(struct timer_queue *tq)
{
struct pollfd fds;
char s = 0;
int ret = 0;

memset(&fds, 0, sizeof(fds));
fds.fd = tq->pipefd[0];
fds.events = POLLIN;
fds.revents = 0;

ret = poll(&fds, 1, 0);
int val = 0, ret = 0;

ret = signal_clear_val(&tq->signal, &val);

switch (ret) {
case -1:
Expand All @@ -116,11 +91,9 @@ enum signal_result timer_queue_signal_lower(struct timer_queue *tq)
ret = TIMER_SIGNAL_NONE;
break;
default:
ret = read(tq->pipefd[0], &s, 1);

if (ret == 0)
if (val == 0)
ret = TIMER_SIGNAL_EXIT;
else if (ret == -1)
else if (val == -1)
ret = TIMER_SIGNAL_ERROR;
else
ret = TIMER_SIGNAL_SET;
Expand Down Expand Up @@ -259,23 +232,22 @@ int timer_next_timeout_timeval(struct timer_queue *tq,
int timer_handle_timeout(struct timer_queue *tq)
{
struct timer *t;
int ret = 0;

pthread_mutex_lock(&tq->lock);

if (heap_empty(&tq->queue)) {
pthread_mutex_unlock(&tq->lock);
return 0;
return -1;
}

t = heap_remove_first_entry(&tq->queue, struct timer, hi);

pthread_mutex_unlock(&tq->lock);

if (t->callback)
ret = t->callback(t);
t->callback(t);

return ret;
return 0;
}

void timer_list_destroy(struct timer_queue *tq)
Expand All @@ -302,11 +274,10 @@ int timer_queue_init(struct timer_queue *tq)

memset(tq, 0, sizeof(*tq));

ret = pipe(tq->pipefd);
ret = signal_init(&tq->signal);

if (ret == -1) {
LOG_ERR("pipe failed: %s\n",
strerror(errno));
LOG_ERR("Signal init failed\n");
}

heap_init(&tq->queue, 0, heap_cmp);
Expand All @@ -318,12 +289,7 @@ int timer_queue_init(struct timer_queue *tq)

void timer_queue_fini(struct timer_queue *tq)
{
if (tq->pipefd[0] != -1)
close(tq->pipefd[0]);

if (tq->pipefd[1] != -1)
close(tq->pipefd[1]);

signal_destroy(&tq->signal);
timer_list_destroy(tq);
heap_fini(&tq->queue);
}
5 changes: 2 additions & 3 deletions src/servd/ifa.c
Expand Up @@ -166,11 +166,10 @@ static int ifaddrs_find(void)
return ret;
}

static int ifaddrs_timer_timeout(struct timer *t)
static void ifaddrs_timer_timeout(struct timer *t)
{
ifaddrs_find();

return timer_schedule_secs(timer_q, iftimer, 5);
timer_schedule_secs(timer_q, iftimer, 5);
}

int ifaddrs_init(struct timer_queue *tq)
Expand Down

0 comments on commit 7d7fdce

Please sign in to comment.