From a546705cb561b2b1915ec0edbc4b5dacb69f07c2 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Thu, 9 May 2019 12:46:13 -0700 Subject: [PATCH] feat(cluster) record and log errors causing nodes to be marked as DOWN This updates the `host still considered down` error logs to include the error causing the node to be down, as well as the duration for which it will still be considered down. E.g.: all hosts tried for query failed. 127.0.0.1: host still considered down for 1s (last error: timeout). 127.0.0.2: host still considered down for 1s (last error: timeout) From #129 --- lib/resty/cassandra/cluster.lua | 65 ++++++++++++++++++++++----------- t/06-cluster.t | 62 +++++++++++++++++++++++++++---- 2 files changed, 97 insertions(+), 30 deletions(-) diff --git a/lib/resty/cassandra/cluster.lua b/lib/resty/cassandra/cluster.lua index 38d51ec..8f5f43e 100644 --- a/lib/resty/cassandra/cluster.lua +++ b/lib/resty/cassandra/cluster.lua @@ -42,8 +42,9 @@ end ----------------------------------------- local function set_peer(self, host, up, reconn_delay, unhealthy_at, - data_center, release_version) + data_center, connect_err, release_version) data_center = data_center or '' + connect_err = connect_err or '' release_version = release_version or '' -- host status @@ -53,8 +54,9 @@ local function set_peer(self, host, up, reconn_delay, unhealthy_at, end -- host info - local marshalled = fmt("%d:%d:%d:%s%s", reconn_delay, unhealthy_at, - #data_center, data_center, release_version) + local marshalled = fmt("%d:%d:%d:%d:%s%s%s", reconn_delay, unhealthy_at, + #data_center, #connect_err, data_center, connect_err, + release_version) ok, err = self.shm:safe_set(_rec_key..host, marshalled) if not ok then @@ -65,7 +67,7 @@ local function set_peer(self, host, up, reconn_delay, unhealthy_at, end local function add_peer(self, host, data_center) - return set_peer(self, host, true, 0, 0, data_center, "") + return set_peer(self, host, true, 0, 0, data_center, nil, nil) end local function get_peer(self, host, status) @@ -86,15 +88,19 @@ local function get_peer(self, host, status) local sep_1 = find(marshalled, ":", 1, true) local sep_2 = find(marshalled, ":", sep_1 + 1, true) local sep_3 = find(marshalled, ":", sep_2 + 1, true) + local sep_4 = find(marshalled, ":", sep_3 + 1, true) local reconn_delay = sub(marshalled, 1, sep_1 - 1) local unhealthy_at = sub(marshalled, sep_1 + 1, sep_2 - 1) local data_center_len = sub(marshalled, sep_2 + 1, sep_3 - 1) + local err_len = sub(marshalled, sep_3 + 1, sep_4 - 1) - local data_center_last = sep_3 + tonumber(data_center_len) + local data_center_last = sep_4 + tonumber(data_center_len) + local err_last = data_center_last + tonumber(err_len) - local data_center = sub(marshalled, sep_3 + 1, data_center_last) - local release_version = sub(marshalled, data_center_last + 1) + local data_center = sub(marshalled, sep_4 + 1, data_center_last) + local err_conn = sub(marshalled, data_center_last + 1, err_last) + local release_version = sub(marshalled, err_last + 1) return { up = status, @@ -102,7 +108,8 @@ local function get_peer(self, host, status) data_center = data_center ~= '' and data_center or nil, release_version = release_version ~= '' and release_version or nil, reconn_delay = tonumber(reconn_delay), - unhealthy_at = tonumber(unhealthy_at) + unhealthy_at = tonumber(unhealthy_at), + err = err_conn, } end @@ -130,7 +137,7 @@ local function delete_peer(self, host) self.shm:delete(host) -- status bool end -local function set_peer_down(self, host) +local function set_peer_down(self, host, connect_err) if self.logging then log(WARN, _log_prefix, 'setting host at ', host, ' DOWN') end @@ -139,7 +146,7 @@ local function set_peer_down(self, host) peer = peer or empty_t -- this can be called from refresh() so no host in shm yet return set_peer(self, host, false, self.reconn_policy:next_delay(host), get_now(), - peer.data_center, peer.release_version) + peer.data_center, connect_err, peer.release_version) end local function set_peer_up(self, host) @@ -152,7 +159,7 @@ local function set_peer_up(self, host) peer = peer or empty_t -- this can be called from refresh() so no host in shm yet return set_peer(self, host, true, 0, 0, - peer.data_center, peer.release_version) + peer.data_center, nil, peer.release_version) end local function can_try_peer(self, host) @@ -163,7 +170,8 @@ local function can_try_peer(self, host) -- reconnection policy steps in before making a decision local peer_rec, err = get_peer(self, host, up) if not peer_rec then return nil, err end - return get_now() - peer_rec.unhealthy_at >= peer_rec.reconn_delay, nil, true + return get_now() - peer_rec.unhealthy_at >= peer_rec.reconn_delay, + nil, true, peer_rec end end @@ -191,7 +199,7 @@ local function check_peer_health(self, host, coordinator_options, retry) if not peer then return nil, err else peer:settimeout(self.timeout_connect) - local ok, err, maybe_down = peer:connect() + local ok, err_conn, maybe_down = peer:connect() if ok then -- host is healthy if retry then @@ -205,12 +213,12 @@ local function check_peer_health(self, host, coordinator_options, retry) return peer elseif maybe_down then -- host is not (or still not) responsive - local ok, shm_err = set_peer_down(self, host) + local ok, shm_err = set_peer_down(self, host, err_conn) if not ok then return nil, 'error setting host down: '..shm_err end - return nil, 'host seems unhealthy, considering it down ('..err..')' + return nil, 'host seems unhealthy, considering it down ('..err_conn..')' else - return nil, err + return nil, err_conn end end end @@ -419,7 +427,7 @@ local function next_coordinator(self, coordinator_options) local errors = {} for _, peer_rec in self.lb_policy:iter() do - local ok, err, retry = can_try_peer(self, peer_rec.host) + local ok, err, retry, peer_state = can_try_peer(self, peer_rec.host) if ok then local peer, err = check_peer_health(self, peer_rec.host, coordinator_options, retry) if peer then @@ -433,7 +441,19 @@ local function next_coordinator(self, coordinator_options) elseif err then return nil, err else - errors[peer_rec.host] = 'host still considered down' + local s = 'host still considered down' + if peer_state then + local waited = get_now() - peer_state.unhealthy_at + s = s .. ' for ' .. (peer_state.reconn_delay - waited) / 1000 .. 's' + + if peer_state.err and peer_state.err ~= '' then + s = s .. ' (last error: ' .. peer_state.err .. ')' + else + s = s .. ' (last error: not recorded)' + end + end + + errors[peer_rec.host] = s end end @@ -540,7 +560,8 @@ function _Cluster:refresh() end local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at, - rows[i].data_center, rows[i].release_version) + rows[i].data_center, nil, + rows[i].release_version) if not ok then return nil, err end end end @@ -755,9 +776,9 @@ local function handle_error(self, err, cql_code, coordinator, request) end else -- host seems down? - local ok, err = set_peer_down(self, coordinator.host) - if not ok then return nil, err end - return self:send_retry(request, 'coordinator seems down') + local ok, err2 = set_peer_down(self, coordinator.host, err) + if not ok then return nil, err2 end + return self:send_retry(request, 'coordinator seems down (' .. err .. ')') end return nil, err, cql_code diff --git a/t/06-cluster.t b/t/06-cluster.t index a07d8f1..0116b2c 100644 --- a/t/06-cluster.t +++ b/t/06-cluster.t @@ -426,8 +426,8 @@ init: true end -- insert fake peers - cluster:set_peer('127.0.0.253', true, 0, 0, 'foocenter1', '0.0') - cluster:set_peer('127.0.0.254', true, 0, 0, 'foocenter1', '0.0') + cluster:set_peer('127.0.0.253', true, 0, 0, 'foocenter1', nil, '0.0') + cluster:set_peer('127.0.0.254', true, 0, 0, 'foocenter1', nil, '0.0') local ok, err = cluster:refresh() if not ok then @@ -482,7 +482,7 @@ info: no host details for 127.0.0.254 end -- insert previous peers with some infos - cluster:set_peer('127.0.0.1', false, 1000, 1461030739000, '', '') + cluster:set_peer('127.0.0.1', false, 1000, 1461030739000, '', '', '') local ok, err = cluster:refresh() if not ok then @@ -987,6 +987,52 @@ coordinator 3: 127.0.0.1 end end + ngx.sleep(0.2) + + local coordinator, err = cluster:next_coordinator() + if not coordinator then + ngx.say(err) + end + } + } +--- request +GET /t +--- response_body_like chomp +all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\)\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\)\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\) +--- no_error_log +[error] + + + +=== TEST 23: next_coordinator() returns no host available errors with recorded errors +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local Cluster = require 'resty.cassandra.cluster' + local cluster, err = Cluster.new() + if not cluster then + ngx.log(ngx.ERR, err) + end + + local ok, err = cluster:refresh() + if not ok then + ngx.log(ngx.ERR, err) + end + + local peers, err = cluster:get_peers() + if not peers then + ngx.log(ngx.ERR, err) + end + + for i = 1, #peers do + local ok, err = cluster:set_peer_down(peers[i].host, "timeout") + if not ok then + ngx.log(ngx.ERR, err) + return + end + end + local coordinator, err = cluster:next_coordinator() if not coordinator then ngx.say(err) @@ -996,13 +1042,13 @@ coordinator 3: 127.0.0.1 --- request GET /t --- response_body_like chomp -all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down\. 127\.0\.0\.\d+: host still considered down\. 127\.0\.0\.\d+: host still considered down +all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\)\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\)\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\) --- no_error_log [error] -=== TEST 23: next_coordinator() avoids down hosts +=== TEST 24: next_coordinator() avoids down hosts --- http_config eval: $::HttpConfig --- config location /t { @@ -1046,7 +1092,7 @@ GET /t -=== TEST 24: next_coordinator() marks nodes as down +=== TEST 25: next_coordinator() marks nodes as down --- http_config eval qq { lua_socket_log_errors off; @@ -1114,7 +1160,7 @@ can try peer 255.255.255.253: false -=== TEST 25: next_coordinator() retries down host as per reconnection policy and ups them back +=== TEST 26: next_coordinator() retries down host as per reconnection policy and ups them back --- http_config eval: $::HttpConfig --- config location /t { @@ -1189,7 +1235,7 @@ GET /t -=== TEST 26: next_coordinator() sets coordinator keyspace on connect +=== TEST 27: next_coordinator() sets coordinator keyspace on connect --- http_config eval: $::HttpConfig --- config location /t {