Skip to content

Commit

Permalink
feat: reduce auth requests (api7#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
spacewander committed Dec 8, 2020
1 parent 5c0a268 commit 962dbe2
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 8 deletions.
61 changes: 53 additions & 8 deletions lib/resty/etcd/v3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
local typeof = require("typeof")
local cjson = require("cjson.safe")
local setmetatable = setmetatable
local random = math.random
local clear_tab = require("table.clear")
local utils = require("resty.etcd.utils")
local tab_nkeys = require("table.nkeys")
Expand All @@ -19,6 +20,7 @@ local decode_json = cjson.decode
local encode_json = cjson.encode
local encode_base64 = ngx.encode_base64
local decode_base64 = ngx.decode_base64
local semaphore = require("ngx.semaphore")
local INIT_COUNT_RESIZE = 2e8

local _M = {}
Expand All @@ -29,6 +31,7 @@ local mt = { __index = _M }
local refresh_jwt_token

local function _request_uri(self, method, uri, opts, timeout, ignore_auth)
utils.log_info("v3 request uri: ", uri, ", timeout: ", timeout)

local body
if opts and opts.body and tab_nkeys(opts.body) > 0 then
Expand All @@ -44,7 +47,7 @@ local function _request_uri(self, method, uri, opts, timeout, ignore_auth)
if self.is_auth then
if not ignore_auth then
-- authentication reqeust not need auth request
local _, err = refresh_jwt_token(self)
local _, err = refresh_jwt_token(self, timeout)
if err then
return nil, err
end
Expand Down Expand Up @@ -162,8 +165,15 @@ function _M.new(opts)
})
end

local sema, err = semaphore.new()
if not sema then
return nil, err
end

return setmetatable({
last_auth_time = now(), -- save last Authentication time
last_refresh_jwt_err = nil,
sema = sema,
jwt_token = nil, -- last Authentication token
is_auth = not not (user and password),
user = user,
Expand Down Expand Up @@ -195,34 +205,69 @@ local function choose_endpoint(self)
return endpoints[pos]
end


local function wake_up_everyone(self)
local count = -self.sema:count()
if count > 0 then
self.sema:post(count)
end
end


-- return refresh_is_ok, error
function refresh_jwt_token(self)
function refresh_jwt_token(self, timeout)
-- token exist and not expire
-- default is 5min, we use 3min
-- default is 5min, we use 3min plus random seconds to smooth the refresh across workers
-- https://github.com/etcd-io/etcd/issues/8287
if self.jwt_token and now() - self.last_auth_time < 60 * 3 then
if self.jwt_token and now() - self.last_auth_time < 60 * 3 + random(0, 60) then
return true, nil
end

if self.requesting_token then
self.sema:wait(timeout)
if self.jwt_token and now() - self.last_auth_time < 60 * 3 + random(0, 60) then
return true, nil
end

if self.last_refresh_jwt_err then
utils.log_info("v3 refresh jwt last err: ", self.last_refresh_jwt_err)
return nil, self.last_refresh_jwt_err
end

-- something unexpected happened, try again
utils.log_info("v3 try auth after waiting, timeout: ", timeout)
end

self.last_refresh_jwt_err = nil
self.requesting_token = true

local opts = {
body = {
name = self.user,
password = self.password,
}
}
local res, err = _request_uri(self, 'POST',
choose_endpoint(self).full_prefix .. "/auth/authenticate",
opts, 5, true) -- default authenticate timeout 5 second
choose_endpoint(self).full_prefix .. "/auth/authenticate",
opts, timeout, true)
self.requesting_token = false

if err then
self.last_refresh_jwt_err = err
wake_up_everyone(self)
return nil, err
end

if not res or not res.body or not res.body.token then
return nil, 'authenticate refresh token fail'
err = 'authenticate refresh token fail'
self.last_refresh_jwt_err = err
wake_up_everyone(self)
return nil, err
end

self.jwt_token = res.body.token
self.last_auth_time = now()
wake_up_everyone(self)

return true, nil
end
Expand Down Expand Up @@ -469,7 +514,7 @@ local function request_chunk(self, method, scheme, host, port, path, opts, timeo
local headers = {}
if self.is_auth then
-- authentication reqeust not need auth request
_, err = refresh_jwt_token(self)
_, err = refresh_jwt_token(self, timeout)
if err then
return nil, err
end
Expand Down
213 changes: 213 additions & 0 deletions t/v3/auth.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
use Test::Nginx::Socket::Lua;

log_level('info');
no_long_string();
repeat_each(1);

my $etcd_version = `etcd --version`;
if ($etcd_version =~ /^etcd Version: 2/ || $etcd_version =~ /^etcd Version: 3.[123]./) {
plan(skip_all => "etcd is too old");
} else {
my $enable_tls = $ENV{ETCD_ENABLE_TLS};
if ((defined $enable_tls) && $enable_tls eq "TRUE") {
plan(skip_all => "skip test cases with auth when TLS is enabled");
} else {
plan 'no_plan';
}
}

our $HttpConfig = <<'_EOC_';
lua_socket_log_errors off;
lua_package_path 'lib/?.lua;/usr/local/share/lua/5.3/?.lua;/usr/share/lua/5.1/?.lua;;';
init_by_lua_block {
local cjson = require("cjson.safe")
function check_res(data, err, val, status)
if err then
ngx.log(ngx.ERR, "err: ", err)
return
end
if val then
if data.body.kvs==nil then
ngx.exit(404)
end
if data.body.kvs and val ~= data.body.kvs[1].value then
ngx.say("failed to check value")
ngx.log(ngx.ERR, "failed to check value, got: ", data.body.kvs[1].value,
", expect: ", val)
ngx.exit(200)
else
ngx.say("checked val as expect: ", val)
end
end
if status and status ~= data.status then
ngx.exit(data.status)
end
end
}
_EOC_

run_tests();

__DATA__
=== TEST 1: share same etcd auth token
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local etcd, err = require "resty.etcd" .new({
protocol = "v3",
user = 'root',
password = 'abc123',
timeout = 3,
http_host = {
"http://127.0.0.1:12379",
},
})
check_res(etcd, err)
local t = {}
for i = 1, 3 do
local th = assert(ngx.thread.spawn(function(i)
local res, err = etcd:set("/test", { a='abc'})
check_res(res, err)
ngx.sleep(0.1)
res, err = etcd:delete("/test")
check_res(res, err)
end))
table.insert(t, th)
end
for i, th in ipairs(t) do
ngx.thread.wait(th)
end
ngx.say('ok')
}
}
--- request
GET /t
--- no_error_log
[error]
--- response_body
ok
--- grep_error_log eval
qr/uri: .+, timeout: \d+/
--- grep_error_log_out
uri: http://127.0.0.1:12379/v3/kv/put, timeout: 3
uri: http://127.0.0.1:12379/v3/auth/authenticate, timeout: 3
uri: http://127.0.0.1:12379/v3/kv/put, timeout: 3
uri: http://127.0.0.1:12379/v3/kv/put, timeout: 3
uri: http://127.0.0.1:12379/v3/kv/deleterange, timeout: 3
uri: http://127.0.0.1:12379/v3/kv/deleterange, timeout: 3
uri: http://127.0.0.1:12379/v3/kv/deleterange, timeout: 3
=== TEST 2: share same etcd auth token, auth failed
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local etcd, err = require "resty.etcd" .new({
protocol = "v3",
user = 'root',
password = '123',
timeout = 3,
http_host = {
"http://127.0.0.1:12379",
},
})
check_res(etcd, err)
local t = {}
for i = 1, 3 do
local th = assert(ngx.thread.spawn(function(i)
local res, err = etcd:set("/test", { a='abc'})
if not res then
ngx.log(ngx.ERR, err)
end
end))
table.insert(t, th)
end
for i, th in ipairs(t) do
ngx.thread.wait(th)
end
ngx.say('ok')
}
}
--- request
GET /t
--- response_body
ok
--- grep_error_log eval
qr/(uri: .+, timeout: \d+|v3 refresh jwt last err: [^,]+|authenticate refresh token fail)/
--- grep_error_log_out
uri: http://127.0.0.1:12379/v3/kv/put, timeout: 3
uri: http://127.0.0.1:12379/v3/auth/authenticate, timeout: 3
uri: http://127.0.0.1:12379/v3/kv/put, timeout: 3
uri: http://127.0.0.1:12379/v3/kv/put, timeout: 3
authenticate refresh token fail
v3 refresh jwt last err: authenticate refresh token fail
authenticate refresh token fail
v3 refresh jwt last err: authenticate refresh token fail
authenticate refresh token fail
=== TEST 3: share same etcd auth token, failed to connect
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local etcd, err = require "resty.etcd" .new({
protocol = "v3",
user = 'root',
password = '123',
timeout = 3,
})
check_res(etcd, err)
-- hack to inject 'connection refused' error
etcd.endpoints = {{
full_prefix = "http://127.0.0.1:1997/v3",
scheme = "http",
host = "127.0.0.1",
port = "1997",
}}
local t = {}
for i = 1, 3 do
local th = assert(ngx.thread.spawn(function(i)
local res, err = etcd:set("/test", { a='abc'})
if not res then
ngx.log(ngx.ERR, err)
end
end))
table.insert(t, th)
end
for i, th in ipairs(t) do
ngx.thread.wait(th)
end
ngx.say('ok')
}
}
--- request
GET /t
--- response_body
ok
--- grep_error_log eval
qr/(uri: .+, timeout: \d+|v3 refresh jwt last err: [^,]+|connection refused)/
--- grep_error_log_out
uri: http://127.0.0.1:1997/v3/kv/put, timeout: 3
uri: http://127.0.0.1:1997/v3/auth/authenticate, timeout: 3
uri: http://127.0.0.1:1997/v3/kv/put, timeout: 3
uri: http://127.0.0.1:1997/v3/kv/put, timeout: 3
connection refused
v3 refresh jwt last err: connection refused
connection refused
v3 refresh jwt last err: connection refused
connection refused

0 comments on commit 962dbe2

Please sign in to comment.