From c4da599dbcd8bec24a807d3e641369bbc786636d Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Sat, 8 May 2021 02:15:48 +0300 Subject: [PATCH] Dev (#39) refactor, restart fixes --- lib/sc/sc_timer.c | 21 +- lib/sc/sc_timer.h | 1 - src/aux.c | 7 +- src/client.h | 19 +- src/cmd.h | 1 + src/conn.c | 8 +- src/file.c | 7 +- src/metric.c | 1 + src/node.c | 4 +- src/node.h | 1 - src/page.c | 2 + src/rs.h | 1 + src/server.c | 462 ++++++++++++++++++++----------------- src/server.h | 3 +- src/snapshot.c | 17 +- test/CMakeLists.txt | 1 + test/add_test.c | 1 - test/example/multi_test.c | 7 +- test/example/single_test.c | 9 +- test/leader_test.c | 2 - test/remove_test.c | 98 ++++++++ test/test_util.c | 65 +++++- test/test_util.h | 5 + 23 files changed, 471 insertions(+), 272 deletions(-) create mode 100644 test/remove_test.c diff --git a/lib/sc/sc_timer.c b/lib/sc/sc_timer.c index 4cec146..7fada9f 100644 --- a/lib/sc/sc_timer.c +++ b/lib/sc/sc_timer.c @@ -61,11 +61,10 @@ void sc_timer_clear(struct sc_timer *t) { const uint32_t cap = t->wheel * WHEEL_COUNT; - t->count = 0; t->head = 0; for (uint32_t i = 0; i < cap; i++) { - t->list[i].timeout = UINT64_MAX; + t->list[i].timeout = SC_TIMER_INVALID; t->list[i].data = NULL; } } @@ -89,7 +88,7 @@ static bool expand(struct sc_timer *t) } for (uint32_t i = 0; i < cap; i++) { - alloc[i].timeout = UINT64_MAX; + alloc[i].timeout = SC_TIMER_INVALID; alloc[i].data = NULL; } @@ -121,7 +120,7 @@ uint64_t sc_timer_add(struct sc_timer *t, uint64_t timeout, uint64_t type, wheel_pos = (pos * t->wheel); for (seq = 0; seq < t->wheel; seq++) { index = wheel_pos + seq; - if (t->list[index].timeout == UINT64_MAX) { + if (t->list[index].timeout == SC_TIMER_INVALID) { goto out; } } @@ -131,10 +130,9 @@ uint64_t sc_timer_add(struct sc_timer *t, uint64_t timeout, uint64_t type, } index = (pos * t->wheel) + (seq); - assert(t->list[index].timeout == UINT64_MAX); + assert(t->list[index].timeout == SC_TIMER_INVALID); out: - t->count++; t->list[index].timeout = timeout + t->timestamp; t->list[index].type = type; t->list[index].data = data; @@ -153,11 +151,9 @@ void sc_timer_cancel(struct sc_timer *t, uint64_t *id) return; } - t->count--; pos = (((uint32_t) *id) * t->wheel) + (*id >> 32u); - assert(t->list[pos].timeout != UINT64_MAX); - t->list[pos].timeout = UINT64_MAX; + t->list[pos].timeout = SC_TIMER_INVALID; *id = SC_TIMER_INVALID; } @@ -167,7 +163,7 @@ uint64_t sc_timer_timeout(struct sc_timer *t, uint64_t timestamp, void *arg, void (*callback)(void *, uint64_t, uint64_t, void *)) { const uint64_t time = timestamp - t->timestamp; - uint32_t wheel, base; + uint32_t wheel, base, timeout; uint32_t head = t->head; uint32_t wheels = (uint32_t) (sc_timer_min(time / TICK, WHEEL_COUNT)); struct sc_timer_data *item; @@ -187,10 +183,9 @@ uint64_t sc_timer_timeout(struct sc_timer *t, uint64_t timestamp, void *arg, item = &t->list[base + i]; if (item->timeout <= t->timestamp) { - uint64_t timeout = item->timeout; - item->timeout = UINT64_MAX; + timeout = item->timeout; + item->timeout = SC_TIMER_INVALID; - t->count--; callback(arg, timeout, item->type, item->data); // Recalculates position each time because there diff --git a/lib/sc/sc_timer.h b/lib/sc/sc_timer.h index 8ecb0af..be11aa5 100644 --- a/lib/sc/sc_timer.h +++ b/lib/sc/sc_timer.h @@ -58,7 +58,6 @@ struct sc_timer { uint64_t timestamp; uint32_t head; uint32_t wheel; - uint32_t count; struct sc_timer_data *list; }; diff --git a/src/aux.c b/src/aux.c index c2683be..aa9515c 100644 --- a/src/aux.c +++ b/src/aux.c @@ -260,6 +260,7 @@ int aux_prepare(struct aux *aux) "total_memory TEXT," "used_memory_bytes TEXT," "used_memory TEXT," + "fsync_count TEXT," "fsync_max_ms TEXT," "fsync_average_ms TEXT," "snapshot_success TEXT," @@ -328,7 +329,7 @@ int aux_prepare(struct aux *aux) sql = "INSERT OR REPLACE INTO resql_info VALUES (" "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," - "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"; + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"; rc = sqlite3_prepare_v3(aux->db, sql, -1, true, &aux->add_info, NULL); if (rc != SQLITE_OK) { goto error; @@ -471,6 +472,7 @@ int aux_write_info(struct aux *aux, struct info *n) rc |= sqlite3_bind_text(stmt, 35, sc_buf_get_str(&n->stats), -1, NULL); rc |= sqlite3_bind_text(stmt, 36, sc_buf_get_str(&n->stats), -1, NULL); rc |= sqlite3_bind_text(stmt, 37, sc_buf_get_str(&n->stats), -1, NULL); + rc |= sqlite3_bind_text(stmt, 37, sc_buf_get_str(&n->stats), -1, NULL); out: if (rc != SQLITE_OK) { goto cleanup; @@ -484,8 +486,7 @@ int aux_write_info(struct aux *aux, struct info *n) return aux_rc(rc); } -int aux_add_log(struct aux *aux, uint64_t id, const char *level, - const char *log) +int aux_add_log(struct aux *aux, uint64_t id, const char *level, const char *log) { assert(level != NULL); assert(log != NULL); diff --git a/src/client.h b/src/client.h index 076a187..15eb0a0 100644 --- a/src/client.h +++ b/src/client.h @@ -40,20 +40,19 @@ #include "sc/sc_str.h" struct client { - struct conn conn; - struct sc_list list; + bool msg_wait; // msg in-progress + bool terminated; // waiting to be deallocated char *name; - uint64_t ts; uint64_t id; - uint64_t seq; + uint64_t seq; // current sequence + uint64_t commit_index; // round index for read request + uint64_t round_index; // round index for read request - struct msg msg; - uint64_t commit_index; - uint64_t round_index; - struct sc_list read; - bool msg_wait; - bool terminated; + struct conn conn; + struct sc_list list; // client list + struct sc_list read; // read request list + struct msg msg; // current msg }; struct client *client_create(struct conn *conn, const char *name); diff --git a/src/cmd.h b/src/cmd.h index e4694f3..b9d1867 100644 --- a/src/cmd.h +++ b/src/cmd.h @@ -58,6 +58,7 @@ struct cmd_init { struct cmd_meta { struct meta meta; }; + struct cmd_config { bool add; const char *name; diff --git a/src/conn.c b/src/conn.c index 534fb1c..def6d11 100644 --- a/src/conn.c +++ b/src/conn.c @@ -46,6 +46,11 @@ static int conn_established(struct conn *c) c->state = CONN_CONNECTED; + /** + * Make sure socket is registered for read event and not registered for + * write event. It may have registered for write event as connection + * attempts are non-blocking. + */ rc = conn_unregister(c, false, true); if (rc != RS_OK) { return rc; @@ -149,6 +154,7 @@ static void conn_cleanup_sock(struct conn *c) void conn_term(struct conn *c) { + sc_list_del(NULL, &c->list); sc_timer_cancel(&c->server->timer, &c->timer_id); conn_cleanup_sock(c); @@ -176,7 +182,7 @@ int conn_set(struct conn *c, struct conn *src) * Important to unregister both. Connection might be registered before * with any of the events. We move connection, so memory address will * change. Event trigger uses pointer address, so we must unregister - * first. + * first and register with new address. */ rc = conn_unregister(c, true, true); if (rc != RS_OK) { diff --git a/src/file.c b/src/file.c index 70223dd..43fcf19 100644 --- a/src/file.c +++ b/src/file.c @@ -86,14 +86,17 @@ int file_open(struct file *f, const char *path, const char *mode) fp = fopen(path, mode); if (fp == NULL) { - sc_log_error("file : %s, fopen : %s \n", path, strerror(errno)); - return errno == ENOSPC ? RS_FULL : RS_ERROR; + goto err; } f->fp = fp; sc_str_set(&f->path, path); return RS_OK; + +err: + sc_log_error("file : %s, fopen : %s \n", path, strerror(errno)); + return errno == ENOSPC ? RS_FULL : RS_ERROR; } int file_close(struct file *f) diff --git a/src/metric.c b/src/metric.c index 4ff816b..a046ff8 100644 --- a/src/metric.c +++ b/src/metric.c @@ -356,6 +356,7 @@ void metric_encode(struct metric *m, struct sc_buf *buf) sc_buf_put_fmt(buf, "%s", sc_bytes_to_size(b, sizeof(b), (uint64_t) sz)); + sc_buf_put_fmt(buf, "%" PRIu64, m->fsync_count); sc_buf_put_fmt(buf, "%f", ((double) m->fsync_max) / 1000000); div = (m->fsync_count ? m->fsync_count : 1); diff --git a/src/node.c b/src/node.c index 4b3458a..3fd7bdd 100644 --- a/src/node.c +++ b/src/node.c @@ -128,10 +128,10 @@ int node_try_connect(struct node *n) struct sc_uri *uri; if (n->conn.state != CONN_CONNECTED) { - n->interval = sc_min(32 * 1024, n->interval * 2); + n->interval = sc_min(16 * 1024, n->interval * 2); } - timeout = (rs_rand() % 256) + n->interval; + timeout = (rs_rand() % 512) + n->interval; n->conn_timer = sc_timer_add(n->timer, timeout, SERVER_TIMER_CONNECT, n); if (n->conn.state == CONN_CONNECTED) { diff --git a/src/node.h b/src/node.h index a5527ff..304a267 100644 --- a/src/node.h +++ b/src/node.h @@ -60,7 +60,6 @@ struct node { uint64_t msg_inflight; int id; - bool known; bool voted; enum meta_role role; const char* status; diff --git a/src/page.c b/src/page.c index 9ccc8db..9a9464e 100644 --- a/src/page.c +++ b/src/page.c @@ -45,6 +45,7 @@ #include #include +#include #define PAGE_ENTRY_MAX_SIZE (2u * 1024 * 1024 * 1024) #define PAGE_END_MARK 0 @@ -309,6 +310,7 @@ void page_fsync(struct page *p, uint64_t index) } last = p->flush_pos; + (void) last; ts = sc_time_mono_ns(); rc = sc_mmap_msync(&p->map, (last & ~(4095)), (pos - last)); diff --git a/src/rs.h b/src/rs.h index af1789a..1279a5e 100644 --- a/src/rs.h +++ b/src/rs.h @@ -121,6 +121,7 @@ enum rs_rc RS_FATAL = -15, RS_FAIL = -16, RS_SQL_ERROR = -17, + RS_SNAPSHOT = -18 }; #define rs_exp(fmt, ...) fmt, __VA_ARGS__ diff --git a/src/server.c b/src/server.c index 6922970..5015969 100644 --- a/src/server.c +++ b/src/server.c @@ -232,7 +232,15 @@ int server_close(struct server *s) sc_str_destroy(&s->voted_for); + if (s->own) { + node_destroy(s->own); + } + sc_array_foreach (&s->nodes, node) { + if (s->own == node) { + continue; + } + node_destroy(node); } @@ -287,10 +295,10 @@ int server_close(struct server *s) s->cluster_up = false; s->last_ts = 0; s->role = SERVER_ROLE_FOLLOWER; + s->leader = NULL; s->commit = 0; s->round = 0; - s->round_prev = 0; s->round_match = 0; return ret; @@ -385,6 +393,104 @@ void server_buf_free(struct server *s, struct sc_buf buf) sc_queue_add_first(&s->cache, buf); } +static void server_create_node(struct server *s, const char *name, + struct sc_array_ptr *uris) +{ + struct node *n; + + if (strcmp(name, s->conf.node.name) == 0) { + return; + } + + sc_array_foreach (&s->nodes, n) { + if (strcmp(n->name, name) == 0) { + return; + } + } + + n = node_create(name, s, true); + node_add_uris(n, uris); + node_clear_indexes(n, s->store.last_index); + sc_array_add(&s->nodes, n); +} + +static void server_update_connections(struct server *s) +{ + struct node *n; + struct meta_node m; + struct sc_array_ptr n_nodes; + struct sc_array_ptr n_unknown; + + sc_array_init(&n_nodes); + sc_array_init(&n_unknown); + + sc_array_foreach (&s->nodes, n) { + if (n == s->own) { + continue; + } + + if (meta_exists(&s->meta, n->name)) { + sc_array_add(&n_nodes, n); + } else { + sc_list_del(&s->connected_nodes, &n->list); + sc_array_add(&n_unknown, n); + } + } + + sc_array_foreach (&s->unknown_nodes, n) { + if (meta_exists(&s->meta, n->name)) { + sc_array_add(&n_nodes, n); + sc_list_add_tail(&s->connected_nodes, &n->list); + } else { + sc_array_add(&n_unknown, n); + } + } + + sc_array_term(&s->nodes); + sc_array_term(&s->unknown_nodes); + + s->nodes = n_nodes; + s->unknown_nodes = n_unknown; + + sc_array_foreach (&s->meta.nodes, m) { + server_create_node(s, m.name, &m.uris); + } + + if (s->in_cluster) { + sc_array_add(&s->nodes, s->own); + } +} + +static void server_become_follower(struct server *s, struct node *leader) +{ + if (s->role == SERVER_ROLE_FOLLOWER && s->leader == leader) { + return; + } + + s->role = SERVER_ROLE_FOLLOWER; + s->leader = leader; + s->prevote_count = 0; + s->prevote_term = 0; + s->vote_count = 0; + + snapshot_clear(&s->ss); + + if (leader != NULL) { + meta_set_leader(&s->meta, leader->name); + } +} + +void server_meta_change(struct server *s) +{ + s->in_cluster = meta_exists(&s->meta, s->conf.node.name); + server_update_connections(s); + + if (!s->in_cluster && s->role == SERVER_ROLE_LEADER && + s->meta.prev == NULL) { + server_become_follower(s, NULL); + } +} + int server_prepare_cluster(struct server *s) { int rc; @@ -401,8 +507,6 @@ int server_prepare_cluster(struct server *s) s->leader = NULL; s->own = node_create(s->conf.node.name, s, false); - sc_array_add(&s->nodes, s->own); - state_init(&s->state, cb, path, s->conf.cluster.name); rc = snapshot_init(&s->ss, s); @@ -552,11 +656,6 @@ int server_read_meta(struct server *s) meta_print(&s->meta, &s->tmp); sc_log_info(sc_buf_rbuf(&s->tmp)); - exist = meta_exists(&s->meta, s->conf.node.name); - if (exist) { - s->in_cluster = true; - } - rc = state_open(&s->state, s->conf.node.in_memory); if (rc != RS_OK) { return rc; @@ -575,6 +674,12 @@ int server_read_meta(struct server *s) s->commit = s->state.index; + if (s->state.meta.index > s->meta.index) { + meta_copy(&s->meta, &s->state.meta); + } + + server_meta_change(s); + return RS_OK; } @@ -639,7 +744,7 @@ static int server_on_task(struct server *s) static int server_on_signal(struct server *s) { int size; - uint64_t val; + unsigned char val; size = sc_sock_pipe_read(&s->sigfd, &val, sizeof(val)); if (size != sizeof(val)) { @@ -793,7 +898,7 @@ static int server_write_meta_cmd(struct server *s) { bool b; - assert(s->leader == s->own); + assert(s->role == SERVER_ROLE_LEADER); b = meta_exists(&s->meta, s->conf.node.name); if (b) { @@ -848,32 +953,6 @@ static int server_on_client_disconnect(struct server *s, struct client *c, return server_create_entry(s, true, 0, 0, CMD_DISCONNECT, &s->tmp); } -static int server_finalize_client_connection(struct server *s, struct client *c) -{ - int rc; - struct sc_buf *b = conn_out(&c->conn); - - msg_create_connect_resp(b, MSG_OK, c->seq, s->meta.term, s->meta.uris); - - rc = conn_flush(&c->conn); - if (rc != RS_OK) { - goto err; - } - - rc = client_processed(c); - if (rc != RS_OK) { - goto err; - } - - sc_map_put_64v(&s->vclients, c->id, c); - sc_log_debug("Client connected : %s \n", c->name); - - return RS_OK; - -err: - return server_on_client_disconnect(s, c, MSG_ERR); -} - static int server_on_client_connect_req(struct server *s, struct conn *in, struct msg_connect_req *msg) { @@ -941,7 +1020,6 @@ static int server_on_node_connect_req(struct server *s, struct conn *pending, if (!found) { n = node_create(msg->name, s, false); - n->known = false; sc_array_add(&s->unknown_nodes, n); } @@ -960,11 +1038,6 @@ static int server_on_node_connect_req(struct server *s, struct conn *pending, rc = conn_flush(&n->conn); if (rc != RS_OK) { node_disconnect(n); - return RS_OK; - } - - if (s->role == SERVER_ROLE_LEADER) { - return server_write_meta_cmd(s); } return RS_OK; @@ -1005,6 +1078,8 @@ static int server_on_connect_resp(struct server *s, struct sc_sock_fd *fd) conn_set_type(&node->conn, SERVER_FD_NODE_RECV); sc_list_add_tail(&s->connected_nodes, &node->list); + sc_log_debug("Connected to node[%s] \n", node->name); + return RS_OK; disconnect: @@ -1032,6 +1107,8 @@ static int server_on_outgoing_conn(struct server *s, struct sc_sock_fd *fd) return RS_OK; } + sc_log_debug("TCP connection is successful to node[%s] \n", node->name); + buf = conn_out(&node->conn); msg_create_connect_req(buf, MSG_NODE, c->cluster.name, c->node.name); @@ -1043,10 +1120,17 @@ static int server_on_outgoing_conn(struct server *s, struct sc_sock_fd *fd) return RS_OK; } -static void server_schedule_election(struct server *s) +static void server_schedule_election(struct server *s, bool fast) { const uint64_t type = SERVER_TIMER_ELECTION; - uint64_t t = s->conf.advanced.heartbeat + (rs_rand() % 2048); + uint64_t t; + + if (fast) { + t = (rs_rand() % 256) + 50; + } else { + t = s->conf.advanced.heartbeat + ((rs_rand() % 1024) + 150); + } + s->election_timer = sc_timer_add(&s->timer, t, type, NULL); } @@ -1091,21 +1175,6 @@ static int server_become_leader(struct server *s) return RS_OK; } -static void server_become_follower(struct server *s, struct node *leader) -{ - if (s->role == SERVER_ROLE_FOLLOWER && s->leader == leader) { - return; - } - - s->role = SERVER_ROLE_FOLLOWER; - s->leader = leader; - snapshot_clear(&s->ss); - - if (leader != NULL) { - meta_set_leader(&s->meta, leader->name); - } -} - static int server_check_prevote_count(struct server *s) { int rc; @@ -1158,7 +1227,7 @@ static int server_on_election_timeout(struct server *s) struct sc_buf *buf; uint64_t timeout = s->conf.advanced.heartbeat; - if (s->leader == s->own) { + if (s->role == SERVER_ROLE_LEADER) { return RS_OK; } @@ -1300,6 +1369,18 @@ static void server_on_full_disk(struct server *s) s->full_timer = sc_timer_add(&s->timer, 10000, SERVER_TIMER_FULL, NULL); } +static int server_restart(struct server *s) +{ + int rc; + + rc = server_close(s); + if (rc != RS_OK && rc != RS_FULL) { + rs_exit("server close : %d", rc); + } + + return server_prepare_start(s); +} + static void server_on_full_timer(struct server *s) { assert(s->full); @@ -1323,13 +1404,15 @@ static void server_on_full_timer(struct server *s) if (dir_size > limit) { rc = server_prepare_start(s); - if (rc == RS_ERROR) { - rs_exit("prepare_start : %d .", rc); - } - if (rc == RS_OK) { + switch (rc) { + case RS_OK: s->full = false; return; + case RS_FULL: + break; + default: + rs_exit("prepare_start : %d .", rc); } } @@ -1338,8 +1421,7 @@ static void server_on_full_timer(struct server *s) sc_bytes_to_size(cur, sizeof(cur), dir_size); sc_bytes_to_size(req, sizeof(req), limit); - sc_log_error("Free space : %s, need : %s. Retry in 10 seconds \n", cur, - req); + sc_log_error("Free space : %s, required : %s. \n", cur, req); } void server_timeout(void *arg, uint64_t timeout, uint64_t type, void *data) @@ -1362,7 +1444,7 @@ void server_timeout(void *arg, uint64_t timeout, uint64_t type, void *data) break; case SERVER_TIMER_ELECTION: rc = server_on_election_timeout(s); - server_schedule_election(s); + server_schedule_election(s, false); break; case SERVER_TIMER_INFO: rc = server_on_info_timer(s); @@ -1435,7 +1517,8 @@ static int server_on_reqvote_resp(struct server *s, struct node *n, } s->prevote_count = 0; - s->role = SERVER_ROLE_FOLLOWER; + server_become_follower(s, NULL); + return RS_OK; } @@ -1537,7 +1620,7 @@ static int server_update_commit(struct server *s, uint64_t commit) } } - s->commit = commit; + s->commit = min; } if (!s->ss_inprogress && s->commit >= store_ss_index(&s->store)) { @@ -1551,68 +1634,6 @@ static int server_update_commit(struct server *s, uint64_t commit) return RS_OK; } -void server_create_node(struct server *s, const char *name, - struct sc_array_ptr *uris) -{ - struct node *n; - - if (strcmp(name, s->conf.node.name) == 0) { - return; - } - - sc_array_foreach (&s->nodes, n) { - if (strcmp(n->name, name) == 0) { - return; - } - } - - n = node_create(name, s, true); - node_add_uris(n, uris); - node_clear_indexes(n, s->store.last_index); - n->known = true; - sc_array_add(&s->nodes, n); -} - -static void server_update_connections(struct server *s) -{ - struct node *n; - struct meta_node m; - struct sc_array_ptr n_nodes; - struct sc_array_ptr n_unknown; - - sc_array_init(&n_nodes); - sc_array_init(&n_unknown); - - sc_array_foreach (&s->nodes, n) { - if (meta_exists(&s->meta, n->name)) { - sc_array_add(&n_nodes, n); - } else { - sc_list_del(&s->connected_nodes, &n->list); - sc_array_add(&n_unknown, n); - } - } - - sc_array_foreach (&s->unknown_nodes, n) { - if (meta_exists(&s->meta, n->name)) { - sc_array_add(&n_nodes, n); - sc_list_add_tail(&s->connected_nodes, &n->list); - n->known = true; - } else { - sc_array_add(&n_unknown, n); - } - } - - sc_array_term(&s->nodes); - sc_array_term(&s->unknown_nodes); - - s->nodes = n_nodes; - s->unknown_nodes = n_unknown; - - sc_array_foreach (&s->meta.nodes, m) { - server_create_node(s, m.name, &m.uris); - } -} - static int server_store_entries(struct server *s, uint64_t index, unsigned char *buf, uint32_t len) { @@ -1634,7 +1655,7 @@ static int server_store_entries(struct server *s, uint64_t index, if (entry_term(curr) != term) { store_remove_after(&s->store, index - 1); meta_rollback(&s->meta, index - 1); - server_update_connections(s); + server_meta_change(s); } else { index++; continue; @@ -1643,9 +1664,7 @@ static int server_store_entries(struct server *s, uint64_t index, if (entry_flags(e) == CMD_META) { meta_replace(&s->meta, data, data_len); - s->in_cluster = - meta_exists(&s->meta, s->conf.node.name); - server_update_connections(s); + server_meta_change(s); } retry: rc = store_put_entry(&s->store, index, e); @@ -1804,30 +1823,10 @@ const char *server_shutdown(void *arg, const char *node) return "Shutdown in progress."; } -static void server_replace_snapshot(struct server *s) -{ - struct state_cb cb = { - .arg = s, - .add_node = server_add_node, - .remove_node = server_remove_node, - .shutdown = server_shutdown, - }; - - state_term(&s->state); - store_term(&s->store); - - state_init(&s->state, cb, s->conf.node.dir, s->conf.cluster.name); - state_open(&s->state, s->conf.node.in_memory); - store_init(&s->store, s->conf.node.dir, s->state.term, s->state.index); - snapshot_open(&s->ss, s->state.ss_path, s->state.term, s->state.index); - - s->commit = s->state.index; -} - int server_on_snapshot_req(struct server *s, struct node *n, struct msg *msg) { bool success = true; - int rc; + int rc = RS_OK; struct msg_snapshot_req *req = &msg->snapshot_req; struct sc_buf *buf; @@ -1855,10 +1854,6 @@ int server_on_snapshot_req(struct server *s, struct node *n, struct msg *msg) rc = snapshot_recv(&s->ss, req->ss_term, req->ss_index, req->done, req->offset, req->buf, req->len); - if (rc == RS_DONE) { - server_replace_snapshot(s); - } - if (rc == RS_ERROR) { success = false; } @@ -1866,7 +1861,7 @@ int server_on_snapshot_req(struct server *s, struct node *n, struct msg *msg) buf = conn_out(&n->conn); msg_create_snapshot_resp(buf, s->meta.term, success, req->done); - return RS_OK; + return rc; } int server_on_snapshot_resp(struct server *s, struct node *n, struct msg *msg) @@ -2055,10 +2050,7 @@ int server_on_client_recv(struct server *s, struct sc_sock_fd *fd, uint32_t ev) req = &c->msg.client_req; if (req->readonly) { - if (s->round_prev == s->round) { - s->round++; - } - + s->pending_readreq = true; c->round_index = s->round; c->commit_index = s->store.last_index; sc_list_add_tail(&s->read_reqs, &c->read); @@ -2100,8 +2092,7 @@ static int server_prepare_start(struct server *s) return rc; } - server_update_connections(s); - server_schedule_election(s); + server_schedule_election(s, true); s->info_timer = sc_timer_add(&s->timer, 0, SERVER_TIMER_INFO, NULL); @@ -2110,18 +2101,20 @@ static int server_prepare_start(struct server *s) static int server_on_meta(struct server *s, struct meta *meta) { - if (meta->index < s->meta.index) { + if (s->meta.index > meta->index) { return RS_OK; } if (meta->index == s->meta.index && s->meta.prev != NULL) { meta_remove_prev(&s->meta); - sc_buf_clear(&s->tmp); - meta_print(&s->meta, &s->tmp); - sc_log_info(sc_buf_rbuf(&s->tmp)); } else if (meta->index > s->meta.index) { meta_copy(&s->meta, meta); - s->in_cluster = meta_exists(&s->meta, s->conf.node.name); + } + + server_meta_change(s); + + if (!s->in_cluster && s->role == SERVER_ROLE_LEADER) { + server_become_follower(s, NULL); } return server_write_meta(s); @@ -2143,21 +2136,42 @@ static int server_on_term_start(struct server *s, struct meta *m) static int server_on_client_connect_applied(struct server *s, struct session *sess) { - struct client **c; + int rc; + struct client *c; + struct sc_buf *b; if (s->role != SERVER_ROLE_LEADER) { return RS_OK; } - c = (struct client **) sc_map_get_sv(&s->clients, sess->name); - if (c == NULL) { + c = sc_map_get_sv(&s->clients, sess->name); + if (!sc_map_found(&s->clients)) { return RS_OK; } - (*c)->id = sess->id; - (*c)->seq = sess->seq; + c->id = sess->id; + c->seq = sess->seq; + + b = conn_out(&c->conn); + msg_create_connect_resp(b, MSG_OK, c->seq, s->meta.term, s->meta.uris); - return server_finalize_client_connection(s, *c); + rc = conn_flush(&c->conn); + if (rc != RS_OK) { + goto err; + } + + rc = client_processed(c); + if (rc != RS_OK) { + goto err; + } + + sc_map_put_64v(&s->vclients, c->id, c); + sc_log_debug("Client connected : %s \n", c->name); + + return RS_OK; + +err: + return server_on_client_disconnect(s, c, MSG_ERR); } static int server_on_applied_client_req(struct server *s, uint64_t cid, @@ -2279,16 +2293,39 @@ static int server_process_readonly(struct server *s, struct client *c) static int server_check_commit(struct server *s) { int rc; - uint64_t match, round_index; - uint32_t index = s->meta.voter / 2; + uint64_t match = s->store.last_index; + uint64_t round_index = s->round; + const uint32_t index = (s->meta.voter / 2); struct node *node; struct client *c; struct sc_list *n, *it; + s->own->round = s->round; + sc_array_sort(&s->nodes, server_sort_matches); node = sc_array_at(&s->nodes, index); match = node->match; + if (s->in_cluster && s->own->match != s->store.last_index) { + uint64_t tmp_match = s->own->match; + s->own->match = s->store.last_index; + + sc_array_sort(&s->nodes, server_sort_matches); + node = sc_array_at(&s->nodes, index); + + if (node->match > match) { + match = node->match; + + if (s->conf.advanced.fsync) { + store_flush(&s->store); + } + s->own->match = s->store.last_index; + s->own->next = s->store.last_index + 1; + } else { + s->own->match = tmp_match; + } + } + rc = server_update_commit(s, match); if (rc != RS_OK) { return rc; @@ -2521,7 +2558,7 @@ static int server_flush_nodes(struct server *s) } if (n->msg_inflight > 0 || (n->next > s->store.last_index && - s->round == s->round_prev)) { + !s->pending_readreq)) { goto flush; } @@ -2555,6 +2592,8 @@ static int server_flush_nodes(struct server *s) } } + s->pending_readreq = false; + return RS_OK; } @@ -2577,21 +2616,12 @@ static int server_flush(struct server *s) return rc; } - if (s->own->next <= s->store.last_index) { - if (s->conf.advanced.fsync) { - store_flush(&s->store); - } - s->own->match = s->store.last_index; - s->own->next = s->store.last_index + 1; - } - s->own->round = s->round; - rc = server_check_commit(s); if (rc != RS_OK) { return rc; } - s->round_prev = s->round; + s->round++; return server_handle_jobs(s); } @@ -2656,6 +2686,25 @@ static int server_on_connect_req(struct server *s, struct sc_sock_fd *fd) return RS_OK; } +void server_handle_rc(struct server *s, int rc) +{ + switch (rc) { + case RS_OK: + break; + case RS_FULL: + server_on_full_disk(s); + break; + case RS_SNAPSHOT: + rc = server_restart(s); + if (rc == RS_FULL) { + server_on_full_disk(s); + } + break; + default: + rs_exit("FATAL : err : %d ", rc); + } +} + static void *server_run(void *arg) { int rc; @@ -2673,14 +2722,14 @@ static void *server_run(void *arg) } rc = server_prepare_start(s); - if (rc != RS_OK) { - if (rc == RS_ERROR) { - rs_exit("Failed to start server, shutting down. \n"); - } - - if (rc == RS_FULL) { - server_on_full_disk(s); - } + switch (rc) { + case RS_OK: + break; + case RS_FULL: + server_on_full_disk(s); + break; + default: + rs_exit("Failed to start server (%d), shutting down.. \n", rc); } if (conf.cmdline.systemd) { @@ -2713,8 +2762,6 @@ static void *server_run(void *arg) rs_exit("poll : %s \n", sc_sock_poll_err(&s->poll)); } - rc = RS_OK; - for (int i = 0; i < events; i++) { retry = 100; @@ -2751,22 +2798,13 @@ static void *server_run(void *arg) } if (rc != RS_OK) { + server_handle_rc(s, rc); break; } } - if (rc == RS_FULL) { - server_on_full_disk(s); - } else if (rc == RS_ERROR) { - rs_exit("failed with code : %d ", rc); - } - rc = server_flush(s); - if (rc == RS_FULL) { - server_on_full_disk(s); - } else if (rc == RS_ERROR) { - rs_exit("failed with code : %d ", rc); - } + server_handle_rc(s, rc); } sc_log_info("Resql[%s] is shutting down \n", s->conf.node.name); diff --git a/src/server.h b/src/server.h index 324fbfc..aebf92f 100644 --- a/src/server.h +++ b/src/server.h @@ -126,6 +126,7 @@ struct server { bool cluster_up; bool full; bool in_cluster; + bool pending_readreq; int timer_rc; // Cluster management @@ -139,7 +140,6 @@ struct server { uint64_t vote_timestamp; uint64_t prevote_term; uint64_t round_match; - uint64_t round_prev; uint64_t round; uint64_t commit; uint64_t timestamp; @@ -147,6 +147,7 @@ struct server { uint64_t info_timer; uint64_t full_timer; uint64_t last_ts; + uint64_t last_quorum; }; struct server *server_start(struct conf *c); diff --git a/src/snapshot.c b/src/snapshot.c index 0cd1d8f..594a9cd 100644 --- a/src/snapshot.c +++ b/src/snapshot.c @@ -117,6 +117,8 @@ int snapshot_term(struct snapshot *ss) int rc, ret = RS_OK; struct snapshot_task task = {.stop = true}; + snapshot_clear(ss); + if (!ss->init) { return RS_OK; } @@ -239,32 +241,27 @@ int snapshot_recv(struct snapshot *ss, uint64_t term, uint64_t index, bool done, ss->tmp = file_create(); rc = file_open(ss->tmp, ss->recv_path, "w+"); if (rc != RS_OK) { - sc_log_error("Open file failed: %s \n", ss->recv_path); - return RS_ERROR; + return rc; } } rc = file_write_at(ss->tmp, offset, data, len); if (rc != RS_OK) { - sc_log_error("snapshot_recv write_at : %s \n", strerror(errno)); return rc; } if (done) { rc = file_flush(ss->tmp); if (rc != RS_OK) { - sc_log_error("snapshot_recv flush : %s \n", - strerror(errno)); return rc; } file_destroy(ss->tmp); ss->tmp = NULL; - rc = rename(ss->recv_path, ss->path); - if (rc != 0) { - sc_log_error("snapshot_recv rename : %s \n", - strerror(errno)); + rc = file_rename(ss->recv_path, ss->path); + if (rc != RS_OK) { + return rc; } ss->term = 0; @@ -272,7 +269,7 @@ int snapshot_recv(struct snapshot *ss, uint64_t term, uint64_t index, bool done, snapshot_close(ss); - return RS_DONE; + return RS_SNAPSHOT; } return RS_OK; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0500b0f..fb1035c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -134,6 +134,7 @@ resql_test(leader_test.c) resql_test(meta_test.c) resql_test(msg_test.c) resql_test(page_test.c) +resql_test(remove_test.c) resql_test(restart_test.c) resql_test(session_test.c) resql_test(single_test.c) diff --git a/test/add_test.c b/test/add_test.c index 12290e3..c42ce64 100644 --- a/test/add_test.c +++ b/test/add_test.c @@ -46,7 +46,6 @@ void test_one() test_server_create(0, 1); test_server_add_auto(); - sleep(3); c = test_client_create(); resql_put_sql(c, "CREATE TABLE test (key INTEGER PRIMARY KEY, val INTEGER, blob BLOB);"); diff --git a/test/example/multi_test.c b/test/example/multi_test.c index d8d38fc..6e3ab51 100644 --- a/test/example/multi_test.c +++ b/test/example/multi_test.c @@ -38,18 +38,17 @@ static void multi() { + test_server_create(0, 3); test_server_create(1, 3); - // test_server_create(2, 3); - sleep(3); - - //test_server_create(2, 3); + test_server_create(2, 3); pause(); } int main(void) { + rs_global_init(); test_execute(multi); return 0; diff --git a/test/example/single_test.c b/test/example/single_test.c index 58a57aa..c0b8f36 100644 --- a/test/example/single_test.c +++ b/test/example/single_test.c @@ -36,14 +36,9 @@ static void single() { - struct conf conf; - char* params[] = {"", "--node-directory=."}; - conf_init(&conf); - conf_read_config(&conf, false, 2, params); - - test_server_create_conf(&conf, 0); - sleep(60); + test_server_create_auto(1); + pause(); } int main(void) diff --git a/test/leader_test.c b/test/leader_test.c index 6e149b3..8443b6e 100644 --- a/test/leader_test.c +++ b/test/leader_test.c @@ -136,8 +136,6 @@ void restart_test() rc = resql_exec(c, false, &rs); client_assert(c, rc == RESQL_OK); - printf("Client created tables after restart \n"); - for (int i = 0; i < 1000; i++) { snprintf(tmp, sizeof(tmp), "%d", i); diff --git a/test/remove_test.c b/test/remove_test.c new file mode 100644 index 0000000..919f91d --- /dev/null +++ b/test/remove_test.c @@ -0,0 +1,98 @@ +/* + * BSD-3-Clause + * + * Copyright 2021 Ozan Tezcan + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "resql.h" +#include "test_util.h" + +#include +#include + +void test_one() +{ + int rc; + int64_t key, value; + char blob[64]; + resql *c; + resql_result *rs; + struct resql_column *row; + + test_server_create(0, 2); + test_server_create(1, 2); + c = test_client_create(); + + resql_put_sql(c, "CREATE TABLE test (key INTEGER PRIMARY KEY, val INTEGER, blob BLOB);"); + rc = resql_exec(c, false, &rs); + client_assert(c, rc == RESQL_OK); + + resql_put_sql(c, "SELECT random(), random(), randomblob(64);"); + rc = resql_exec(c, true, &rs); + client_assert(c, rc == RESQL_OK); + + resql_put_sql(c, "INSERT INTO test VALUES(random(), random(), randomblob(64));"); + rc = resql_exec(c, false, &rs); + client_assert(c, rc == RESQL_OK); + + resql_put_sql(c, "SELECT * FROM test;"); + rc = resql_exec(c, false, &rs); + client_assert(c, rc == RESQL_OK); + + row = resql_row(rs); + key = row[0].intval; + value = row[1].intval; + memcpy(blob, row[2].blob, row[2].len); + + test_server_add(2, 2); + test_server_remove(0); + test_server_remove(2); + + resql_put_sql(c, "SELECT * FROM test;"); + rc = resql_exec(c, false, &rs); + client_assert(c, rc == RESQL_OK); + + rs_assert(key == row[0].intval); + rs_assert(value == row[1].intval); + rs_assert(memcmp(blob, row[2].blob, row[2].len) == 0); + + resql_put_sql(c, "SELECT * FROM resql_info;"); + rc = resql_exec(c, false, &rs); + client_assert(c, rc == RESQL_OK); + + rs_assert(resql_row_count(rs) == 1); + row = resql_row(rs); + rs_assert(strcmp(row[0].text, "node1") == 0); +} + +int main() +{ + test_execute(test_one); + + return 0; +} diff --git a/test/test_util.c b/test/test_util.c index 204d449..3f2db5a 100644 --- a/test/test_util.c +++ b/test/test_util.c @@ -40,6 +40,7 @@ #include #include +#include int init; static int count; @@ -273,12 +274,13 @@ struct server *test_server_create(int id, int cluster_size) return s; } -struct server * test_server_add_auto() +struct server *test_server_add_auto() { int i; int rc; resql *c; resql_result *rs; + struct server *s; for (i = 0; i < 9; i++) { if (cluster[i] == NULL) { @@ -292,7 +294,66 @@ struct server * test_server_add_auto() rc = resql_exec(c, false, &rs); client_assert(c, rc == RESQL_OK); - return test_server_create(i, 1); + s = test_server_create(i, 1); + test_wait_until_size(count); + + return s; +} + +void test_wait_until_size(int size) +{ + int i; + int rc; + resql *c; + struct resql_column *row; + resql_result *rs; + + for (i = 0; i < 9; i++) { + if (cluster[i] == NULL) { + break; + } + } + + c = test_client_create(); +retry: + resql_put_sql(c, "SELECT count(*) FROM resql_info;"); + rc = resql_exec(c, false, &rs); + client_assert(c, rc == RESQL_OK); + row = resql_row(rs); + if (row[0].intval != size) { + sleep(1); + goto retry; + } +} + +void test_server_add(int id, int cluster_size) +{ + int rc; + resql *c; + resql_result *rs; + + c = test_client_create(); + resql_put_sql(c, "SELECT resql('add-node', :url);"); + resql_bind_param_text(c, ":url", urls[id]); + rc = resql_exec(c, false, &rs); + client_assert(c, rc == RESQL_OK); + + test_server_create(id, cluster_size); +} + +void test_server_remove(int id) +{ + int rc; + resql *c; + resql_result *rs; + + c = test_client_create(); + resql_put_sql(c, "SELECT resql('remove-node', :name);"); + resql_bind_param_text(c, ":name", names[id]); + rc = resql_exec(c, false, &rs); + client_assert(c, rc == RESQL_OK); + + test_server_destroy(id); } void test_server_destroy(int id) diff --git a/test/test_util.h b/test/test_util.h index 92fd4ac..1f55209 100644 --- a/test/test_util.h +++ b/test/test_util.h @@ -57,6 +57,11 @@ struct server *test_server_create(int id, int cluster_size); struct server *test_server_start_auto(int cluster_size); struct server *test_server_start(int id, int cluster_size); struct server *test_server_add_auto(); + +void test_wait_until_size(int size); +void test_server_add(int id, int cluster_size); +void test_server_remove(int id); + void test_server_destroy(int id); void test_server_destroy_all(); void test_server_destroy_leader();