Skip to content

Commit

Permalink
Dev (#39)
Browse files Browse the repository at this point in the history
refactor, restart fixes
  • Loading branch information
tezc committed May 7, 2021
1 parent e4c5a69 commit c4da599
Show file tree
Hide file tree
Showing 23 changed files with 471 additions and 272 deletions.
21 changes: 8 additions & 13 deletions lib/sc/sc_timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ void sc_timer_clear(struct sc_timer *t)
{
const uint32_t cap = t->wheel * WHEEL_COUNT;

t->count = 0;
t->head = 0;

for (uint32_t i = 0; i < cap; i++) {
t->list[i].timeout = UINT64_MAX;
t->list[i].timeout = SC_TIMER_INVALID;
t->list[i].data = NULL;
}
}
Expand All @@ -89,7 +88,7 @@ static bool expand(struct sc_timer *t)
}

for (uint32_t i = 0; i < cap; i++) {
alloc[i].timeout = UINT64_MAX;
alloc[i].timeout = SC_TIMER_INVALID;
alloc[i].data = NULL;
}

Expand Down Expand Up @@ -121,7 +120,7 @@ uint64_t sc_timer_add(struct sc_timer *t, uint64_t timeout, uint64_t type,
wheel_pos = (pos * t->wheel);
for (seq = 0; seq < t->wheel; seq++) {
index = wheel_pos + seq;
if (t->list[index].timeout == UINT64_MAX) {
if (t->list[index].timeout == SC_TIMER_INVALID) {
goto out;
}
}
Expand All @@ -131,10 +130,9 @@ uint64_t sc_timer_add(struct sc_timer *t, uint64_t timeout, uint64_t type,
}

index = (pos * t->wheel) + (seq);
assert(t->list[index].timeout == UINT64_MAX);
assert(t->list[index].timeout == SC_TIMER_INVALID);

out:
t->count++;
t->list[index].timeout = timeout + t->timestamp;
t->list[index].type = type;
t->list[index].data = data;
Expand All @@ -153,11 +151,9 @@ void sc_timer_cancel(struct sc_timer *t, uint64_t *id)
return;
}

t->count--;
pos = (((uint32_t) *id) * t->wheel) + (*id >> 32u);

assert(t->list[pos].timeout != UINT64_MAX);
t->list[pos].timeout = UINT64_MAX;
t->list[pos].timeout = SC_TIMER_INVALID;
*id = SC_TIMER_INVALID;
}

Expand All @@ -167,7 +163,7 @@ uint64_t sc_timer_timeout(struct sc_timer *t, uint64_t timestamp, void *arg,
void (*callback)(void *, uint64_t, uint64_t, void *))
{
const uint64_t time = timestamp - t->timestamp;
uint32_t wheel, base;
uint32_t wheel, base, timeout;
uint32_t head = t->head;
uint32_t wheels = (uint32_t) (sc_timer_min(time / TICK, WHEEL_COUNT));
struct sc_timer_data *item;
Expand All @@ -187,10 +183,9 @@ uint64_t sc_timer_timeout(struct sc_timer *t, uint64_t timestamp, void *arg,
item = &t->list[base + i];

if (item->timeout <= t->timestamp) {
uint64_t timeout = item->timeout;
item->timeout = UINT64_MAX;
timeout = item->timeout;
item->timeout = SC_TIMER_INVALID;

t->count--;
callback(arg, timeout, item->type, item->data);

// Recalculates position each time because there
Expand Down
1 change: 0 additions & 1 deletion lib/sc/sc_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ struct sc_timer {
uint64_t timestamp;
uint32_t head;
uint32_t wheel;
uint32_t count;
struct sc_timer_data *list;
};

Expand Down
7 changes: 4 additions & 3 deletions src/aux.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ int aux_prepare(struct aux *aux)
"total_memory TEXT,"
"used_memory_bytes TEXT,"
"used_memory TEXT,"
"fsync_count TEXT,"
"fsync_max_ms TEXT,"
"fsync_average_ms TEXT,"
"snapshot_success TEXT,"
Expand Down Expand Up @@ -328,7 +329,7 @@ int aux_prepare(struct aux *aux)

sql = "INSERT OR REPLACE INTO resql_info VALUES ("
"?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,"
"?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);";
"?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);";
rc = sqlite3_prepare_v3(aux->db, sql, -1, true, &aux->add_info, NULL);
if (rc != SQLITE_OK) {
goto error;
Expand Down Expand Up @@ -471,6 +472,7 @@ int aux_write_info(struct aux *aux, struct info *n)
rc |= sqlite3_bind_text(stmt, 35, sc_buf_get_str(&n->stats), -1, NULL);
rc |= sqlite3_bind_text(stmt, 36, sc_buf_get_str(&n->stats), -1, NULL);
rc |= sqlite3_bind_text(stmt, 37, sc_buf_get_str(&n->stats), -1, NULL);
rc |= sqlite3_bind_text(stmt, 37, sc_buf_get_str(&n->stats), -1, NULL);
out:
if (rc != SQLITE_OK) {
goto cleanup;
Expand All @@ -484,8 +486,7 @@ int aux_write_info(struct aux *aux, struct info *n)
return aux_rc(rc);
}

int aux_add_log(struct aux *aux, uint64_t id, const char *level,
const char *log)
int aux_add_log(struct aux *aux, uint64_t id, const char *level, const char *log)
{
assert(level != NULL);
assert(log != NULL);
Expand Down
19 changes: 9 additions & 10 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,19 @@
#include "sc/sc_str.h"

struct client {
struct conn conn;
struct sc_list list;
bool msg_wait; // msg in-progress
bool terminated; // waiting to be deallocated

char *name;
uint64_t ts;
uint64_t id;
uint64_t seq;
uint64_t seq; // current sequence
uint64_t commit_index; // round index for read request
uint64_t round_index; // round index for read request

struct msg msg;
uint64_t commit_index;
uint64_t round_index;
struct sc_list read;
bool msg_wait;
bool terminated;
struct conn conn;
struct sc_list list; // client list
struct sc_list read; // read request list
struct msg msg; // current msg
};

struct client *client_create(struct conn *conn, const char *name);
Expand Down
1 change: 1 addition & 0 deletions src/cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct cmd_init {
struct cmd_meta {
struct meta meta;
};

struct cmd_config {
bool add;
const char *name;
Expand Down
8 changes: 7 additions & 1 deletion src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ static int conn_established(struct conn *c)

c->state = CONN_CONNECTED;

/**
* Make sure socket is registered for read event and not registered for
* write event. It may have registered for write event as connection
* attempts are non-blocking.
*/
rc = conn_unregister(c, false, true);
if (rc != RS_OK) {
return rc;
Expand Down Expand Up @@ -149,6 +154,7 @@ static void conn_cleanup_sock(struct conn *c)

void conn_term(struct conn *c)
{
sc_list_del(NULL, &c->list);
sc_timer_cancel(&c->server->timer, &c->timer_id);
conn_cleanup_sock(c);

Expand Down Expand Up @@ -176,7 +182,7 @@ int conn_set(struct conn *c, struct conn *src)
* Important to unregister both. Connection might be registered before
* with any of the events. We move connection, so memory address will
* change. Event trigger uses pointer address, so we must unregister
* first.
* first and register with new address.
*/
rc = conn_unregister(c, true, true);
if (rc != RS_OK) {
Expand Down
7 changes: 5 additions & 2 deletions src/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,17 @@ int file_open(struct file *f, const char *path, const char *mode)

fp = fopen(path, mode);
if (fp == NULL) {
sc_log_error("file : %s, fopen : %s \n", path, strerror(errno));
return errno == ENOSPC ? RS_FULL : RS_ERROR;
goto err;
}

f->fp = fp;
sc_str_set(&f->path, path);

return RS_OK;

err:
sc_log_error("file : %s, fopen : %s \n", path, strerror(errno));
return errno == ENOSPC ? RS_FULL : RS_ERROR;
}

int file_close(struct file *f)
Expand Down
1 change: 1 addition & 0 deletions src/metric.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ void metric_encode(struct metric *m, struct sc_buf *buf)
sc_buf_put_fmt(buf, "%s",
sc_bytes_to_size(b, sizeof(b), (uint64_t) sz));

sc_buf_put_fmt(buf, "%" PRIu64, m->fsync_count);
sc_buf_put_fmt(buf, "%f", ((double) m->fsync_max) / 1000000);

div = (m->fsync_count ? m->fsync_count : 1);
Expand Down
4 changes: 2 additions & 2 deletions src/node.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ int node_try_connect(struct node *n)
struct sc_uri *uri;

if (n->conn.state != CONN_CONNECTED) {
n->interval = sc_min(32 * 1024, n->interval * 2);
n->interval = sc_min(16 * 1024, n->interval * 2);
}

timeout = (rs_rand() % 256) + n->interval;
timeout = (rs_rand() % 512) + n->interval;
n->conn_timer = sc_timer_add(n->timer, timeout, SERVER_TIMER_CONNECT, n);

if (n->conn.state == CONN_CONNECTED) {
Expand Down
1 change: 0 additions & 1 deletion src/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ struct node {
uint64_t msg_inflight;

int id;
bool known;
bool voted;
enum meta_role role;
const char* status;
Expand Down
2 changes: 2 additions & 0 deletions src/page.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

#include <errno.h>
#include <inttypes.h>
#include <unistd.h>

#define PAGE_ENTRY_MAX_SIZE (2u * 1024 * 1024 * 1024)
#define PAGE_END_MARK 0
Expand Down Expand Up @@ -309,6 +310,7 @@ void page_fsync(struct page *p, uint64_t index)
}

last = p->flush_pos;
(void) last;

ts = sc_time_mono_ns();
rc = sc_mmap_msync(&p->map, (last & ~(4095)), (pos - last));
Expand Down
1 change: 1 addition & 0 deletions src/rs.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ enum rs_rc
RS_FATAL = -15,
RS_FAIL = -16,
RS_SQL_ERROR = -17,
RS_SNAPSHOT = -18
};

#define rs_exp(fmt, ...) fmt, __VA_ARGS__
Expand Down
Loading

0 comments on commit c4da599

Please sign in to comment.