Skip to content

Commit

Permalink
feat(cluster) record and log errors causing nodes to be marked as DOWN
Browse files Browse the repository at this point in the history
This updates the `host still considered down` error logs to include the
error causing the node to be down, as well as the duration for which it
will still be considered down. E.g.:

    all hosts tried for query failed. 127.0.0.1: host still considered
    down for 1s (last error: timeout). 127.0.0.2: host still considered
    down for 1s (last error: timeout)

From #129
  • Loading branch information
thibaultcha committed May 9, 2019
1 parent ae4c614 commit a546705
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 30 deletions.
65 changes: 43 additions & 22 deletions lib/resty/cassandra/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ end
-----------------------------------------

local function set_peer(self, host, up, reconn_delay, unhealthy_at,
data_center, release_version)
data_center, connect_err, release_version)
data_center = data_center or ''
connect_err = connect_err or ''
release_version = release_version or ''

-- host status
Expand All @@ -53,8 +54,9 @@ local function set_peer(self, host, up, reconn_delay, unhealthy_at,
end

-- host info
local marshalled = fmt("%d:%d:%d:%s%s", reconn_delay, unhealthy_at,
#data_center, data_center, release_version)
local marshalled = fmt("%d:%d:%d:%d:%s%s%s", reconn_delay, unhealthy_at,
#data_center, #connect_err, data_center, connect_err,
release_version)

ok, err = self.shm:safe_set(_rec_key..host, marshalled)
if not ok then
Expand All @@ -65,7 +67,7 @@ local function set_peer(self, host, up, reconn_delay, unhealthy_at,
end

local function add_peer(self, host, data_center)
return set_peer(self, host, true, 0, 0, data_center, "")
return set_peer(self, host, true, 0, 0, data_center, nil, nil)
end

local function get_peer(self, host, status)
Expand All @@ -86,23 +88,28 @@ local function get_peer(self, host, status)
local sep_1 = find(marshalled, ":", 1, true)
local sep_2 = find(marshalled, ":", sep_1 + 1, true)
local sep_3 = find(marshalled, ":", sep_2 + 1, true)
local sep_4 = find(marshalled, ":", sep_3 + 1, true)

local reconn_delay = sub(marshalled, 1, sep_1 - 1)
local unhealthy_at = sub(marshalled, sep_1 + 1, sep_2 - 1)
local data_center_len = sub(marshalled, sep_2 + 1, sep_3 - 1)
local err_len = sub(marshalled, sep_3 + 1, sep_4 - 1)

local data_center_last = sep_3 + tonumber(data_center_len)
local data_center_last = sep_4 + tonumber(data_center_len)
local err_last = data_center_last + tonumber(err_len)

local data_center = sub(marshalled, sep_3 + 1, data_center_last)
local release_version = sub(marshalled, data_center_last + 1)
local data_center = sub(marshalled, sep_4 + 1, data_center_last)
local err_conn = sub(marshalled, data_center_last + 1, err_last)
local release_version = sub(marshalled, err_last + 1)

return {
up = status,
host = host,
data_center = data_center ~= '' and data_center or nil,
release_version = release_version ~= '' and release_version or nil,
reconn_delay = tonumber(reconn_delay),
unhealthy_at = tonumber(unhealthy_at)
unhealthy_at = tonumber(unhealthy_at),
err = err_conn,
}
end

Expand Down Expand Up @@ -130,7 +137,7 @@ local function delete_peer(self, host)
self.shm:delete(host) -- status bool
end

local function set_peer_down(self, host)
local function set_peer_down(self, host, connect_err)
if self.logging then
log(WARN, _log_prefix, 'setting host at ', host, ' DOWN')
end
Expand All @@ -139,7 +146,7 @@ local function set_peer_down(self, host)
peer = peer or empty_t -- this can be called from refresh() so no host in shm yet

return set_peer(self, host, false, self.reconn_policy:next_delay(host), get_now(),
peer.data_center, peer.release_version)
peer.data_center, connect_err, peer.release_version)
end

local function set_peer_up(self, host)
Expand All @@ -152,7 +159,7 @@ local function set_peer_up(self, host)
peer = peer or empty_t -- this can be called from refresh() so no host in shm yet

return set_peer(self, host, true, 0, 0,
peer.data_center, peer.release_version)
peer.data_center, nil, peer.release_version)
end

local function can_try_peer(self, host)
Expand All @@ -163,7 +170,8 @@ local function can_try_peer(self, host)
-- reconnection policy steps in before making a decision
local peer_rec, err = get_peer(self, host, up)
if not peer_rec then return nil, err end
return get_now() - peer_rec.unhealthy_at >= peer_rec.reconn_delay, nil, true
return get_now() - peer_rec.unhealthy_at >= peer_rec.reconn_delay,
nil, true, peer_rec
end
end

Expand Down Expand Up @@ -191,7 +199,7 @@ local function check_peer_health(self, host, coordinator_options, retry)
if not peer then return nil, err
else
peer:settimeout(self.timeout_connect)
local ok, err, maybe_down = peer:connect()
local ok, err_conn, maybe_down = peer:connect()
if ok then
-- host is healthy
if retry then
Expand All @@ -205,12 +213,12 @@ local function check_peer_health(self, host, coordinator_options, retry)
return peer
elseif maybe_down then
-- host is not (or still not) responsive
local ok, shm_err = set_peer_down(self, host)
local ok, shm_err = set_peer_down(self, host, err_conn)
if not ok then return nil, 'error setting host down: '..shm_err end

return nil, 'host seems unhealthy, considering it down ('..err..')'
return nil, 'host seems unhealthy, considering it down ('..err_conn..')'
else
return nil, err
return nil, err_conn
end
end
end
Expand Down Expand Up @@ -419,7 +427,7 @@ local function next_coordinator(self, coordinator_options)
local errors = {}

for _, peer_rec in self.lb_policy:iter() do
local ok, err, retry = can_try_peer(self, peer_rec.host)
local ok, err, retry, peer_state = can_try_peer(self, peer_rec.host)
if ok then
local peer, err = check_peer_health(self, peer_rec.host, coordinator_options, retry)
if peer then
Expand All @@ -433,7 +441,19 @@ local function next_coordinator(self, coordinator_options)
elseif err then
return nil, err
else
errors[peer_rec.host] = 'host still considered down'
local s = 'host still considered down'
if peer_state then
local waited = get_now() - peer_state.unhealthy_at
s = s .. ' for ' .. (peer_state.reconn_delay - waited) / 1000 .. 's'

if peer_state.err and peer_state.err ~= '' then
s = s .. ' (last error: ' .. peer_state.err .. ')'
else
s = s .. ' (last error: not recorded)'
end
end

errors[peer_rec.host] = s
end
end

Expand Down Expand Up @@ -540,7 +560,8 @@ function _Cluster:refresh()
end

local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at,
rows[i].data_center, rows[i].release_version)
rows[i].data_center, nil,
rows[i].release_version)
if not ok then return nil, err end
end
end
Expand Down Expand Up @@ -755,9 +776,9 @@ local function handle_error(self, err, cql_code, coordinator, request)
end
else
-- host seems down?
local ok, err = set_peer_down(self, coordinator.host)
if not ok then return nil, err end
return self:send_retry(request, 'coordinator seems down')
local ok, err2 = set_peer_down(self, coordinator.host, err)
if not ok then return nil, err2 end
return self:send_retry(request, 'coordinator seems down (' .. err .. ')')
end

return nil, err, cql_code
Expand Down
62 changes: 54 additions & 8 deletions t/06-cluster.t
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ init: true
end
-- insert fake peers
cluster:set_peer('127.0.0.253', true, 0, 0, 'foocenter1', '0.0')
cluster:set_peer('127.0.0.254', true, 0, 0, 'foocenter1', '0.0')
cluster:set_peer('127.0.0.253', true, 0, 0, 'foocenter1', nil, '0.0')
cluster:set_peer('127.0.0.254', true, 0, 0, 'foocenter1', nil, '0.0')
local ok, err = cluster:refresh()
if not ok then
Expand Down Expand Up @@ -482,7 +482,7 @@ info: no host details for 127.0.0.254
end
-- insert previous peers with some infos
cluster:set_peer('127.0.0.1', false, 1000, 1461030739000, '', '')
cluster:set_peer('127.0.0.1', false, 1000, 1461030739000, '', '', '')
local ok, err = cluster:refresh()
if not ok then
Expand Down Expand Up @@ -987,6 +987,52 @@ coordinator 3: 127.0.0.1
end
end
ngx.sleep(0.2)
local coordinator, err = cluster:next_coordinator()
if not coordinator then
ngx.say(err)
end
}
}
--- request
GET /t
--- response_body_like chomp
all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\)\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\)\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\)
--- no_error_log
[error]
=== TEST 23: next_coordinator() returns no host available errors with recorded errors
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local Cluster = require 'resty.cassandra.cluster'
local cluster, err = Cluster.new()
if not cluster then
ngx.log(ngx.ERR, err)
end
local ok, err = cluster:refresh()
if not ok then
ngx.log(ngx.ERR, err)
end
local peers, err = cluster:get_peers()
if not peers then
ngx.log(ngx.ERR, err)
end
for i = 1, #peers do
local ok, err = cluster:set_peer_down(peers[i].host, "timeout")
if not ok then
ngx.log(ngx.ERR, err)
return
end
end
local coordinator, err = cluster:next_coordinator()
if not coordinator then
ngx.say(err)
Expand All @@ -996,13 +1042,13 @@ coordinator 3: 127.0.0.1
--- request
GET /t
--- response_body_like chomp
all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down\. 127\.0\.0\.\d+: host still considered down\. 127\.0\.0\.\d+: host still considered down
all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\)\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\)\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\)
--- no_error_log
[error]
=== TEST 23: next_coordinator() avoids down hosts
=== TEST 24: next_coordinator() avoids down hosts
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -1046,7 +1092,7 @@ GET /t
=== TEST 24: next_coordinator() marks nodes as down
=== TEST 25: next_coordinator() marks nodes as down
--- http_config eval
qq {
lua_socket_log_errors off;
Expand Down Expand Up @@ -1114,7 +1160,7 @@ can try peer 255.255.255.253: false
=== TEST 25: next_coordinator() retries down host as per reconnection policy and ups them back
=== TEST 26: next_coordinator() retries down host as per reconnection policy and ups them back
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -1189,7 +1235,7 @@ GET /t
=== TEST 26: next_coordinator() sets coordinator keyspace on connect
=== TEST 27: next_coordinator() sets coordinator keyspace on connect
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down

0 comments on commit a546705

Please sign in to comment.