Skip to content

Commit

Permalink
include lua-resty-global-throttle as hard dependency for now
Browse files Browse the repository at this point in the history
  • Loading branch information
ElvinEfendi committed Dec 29, 2020
1 parent 54dbd84 commit 3bfed52
Show file tree
Hide file tree
Showing 4 changed files with 380 additions and 0 deletions.
181 changes: 181 additions & 0 deletions rootfs/etc/nginx/lua/resty/global_throttle/sliding_window.lua
@@ -0,0 +1,181 @@
local tostring = tostring
local string_format = string.format
local math_floor = math.floor
local ngx_now = ngx.now
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local setmetatable = setmetatable

local _M = {}
local mt = { __index = _M }

-- uniquely identifies the window associated with given time
local function get_id(self, time)
return tostring(math_floor(time / self.window_size))
end

-- counter key is made of the identifier of current sliding window instance,
-- and identifier of the current window. This makes sure it is unique
-- per given sliding window instance in the given window.
local function get_counter_key(self, sample, time)
local id = get_id(self, time)
return string_format("%s.%s.%s.counter", self.namespace, sample, id)
end

local function get_last_rate(self, sample, now_ms)
local a_window_ago_from_now = now_ms - self.window_size
local last_counter_key = get_counter_key(self, sample, a_window_ago_from_now)

local last_count, err = self.store:get(last_counter_key)
if err then
return nil, err
end
if not last_count then
-- NOTE(elvinefendi): returning 0 as a default value here means
-- we will allow spike in the first window or in the window that
-- has no immediate previous window with samples.
-- What if we default to self.limit here?
last_count = 0
end
if last_count > self.limit then
-- in process_sample we also reactively check for exceeding limit
-- after icnrementing the counter. So even though counter can be higher
-- than the limit as a result of racy behaviour we would still throttle
-- anyway. That is way it is important to correct the last count here
-- to avoid over-punishment.
last_count = self.limit
end

return last_count / self.window_size
end

function _M.new(namespace, store, limit, window_size)
if not namespace then
return nil, "'namespace' parameter is missing"
end

if not store then
return nil, "'store' parameter is missing"
end
if not store.incr then
return nil, "'store' has to implement 'incr' function"
end
if not store.get then
return nil, "'store' has to implement 'get' function"
end

return setmetatable({
namespace = namespace,
store = store,
limit = limit,
window_size = window_size
}, mt), nil
end

local function get_desired_delay(self, remaining_time, last_rate, count)
if last_rate == 0 then
return remaining_time
end

local desired_delay = remaining_time - (self.limit - count) / last_rate

if desired_delay == 0 then
-- no delay
return nil
end

if desired_delay < 0 or desired_delay > self.window_size then
ngx_log(ngx_ERR, "unexpected value for delay: ", desired_delay,
", when remaining_time = ", remaining_time,
" last_rate = ", last_rate,
" count = ", count,
" limit = ", self.limit,
" window_size = ", self.window_size)
return nil
end

return desired_delay
end

-- process_sample first checks if limit exceeding for the given sample.
-- If so then, it calculates for how long this sample
-- should be delayed/rejected and returns estimated total count for
-- the current window for this sample along with suggested delay time to bring
-- the rate down below the limit.
-- If limit is not exceeding yet, it increments the counter corresponding
-- to the sample in the current window. Finally it checks if the limit is
-- exceeding again. This check is necessary because between the first check and
-- increment another sliding window instances might have processed enough
-- occurences of this sample to exceed the limit. Therefore if this check shows
-- that the limit is exceeding then we again calculate necessary delay.
--
-- Return values: estimated_count, delay, err
-- `estimated_count` - this is what the algorithm expects number of occurences
-- will be for the sample by the end of current window excluding the current
-- occurence of the sample. It is calculated based
-- on the rate from previous window and extrapolated to the current window.
-- If estimated_count is bigger than the configured limit, then the function
-- will also return delay > 0 to suggest that the sample has to be throttled.
-- `delay` - this is either strictly bigger than 0 in case limit is
-- exceeding, or nil in case rate of occurences of the sample is under the
-- limit. The unit is second.
-- `err` - in case there is a problem with processing the sample
-- this will be a string explaining the problem. In all other cases it is nil.
function _M.process_sample(self, sample)
local now = ngx_now()
local counter_key = get_counter_key(self, sample, now)
local remaining_time = self.window_size - now % self.window_size

local count, err = self.store:get(counter_key)
if err then
return nil, nil, err
end
if not count then
count = 0
end
if count >= self.limit then
-- count can be over the limit because of the racy nature
-- when it is at/over the limit we know for sure what is the final
-- count and desired delay for the current window, so no need to proceed
return count, remaining_time, nil
end

local last_rate
last_rate, err = get_last_rate(self, sample, now)
if err then
return nil, nil, err
end

local estimated_final_count = last_rate * remaining_time + count
if estimated_final_count >= self.limit then
local desired_delay =
get_desired_delay(self, remaining_time, last_rate, count)
return estimated_final_count, desired_delay, nil
end

local expiry = self.window_size * 2
local new_count
new_count, err = self.store:incr(counter_key, 1, expiry)
if err then
return nil, nil, err
end

-- The below limit checking is only to cope with a racy behaviour where
-- counter for the given sample is incremented at the same time by multiple
-- sliding_window instances. That is we re-adjust the new count by ignoring
-- the current occurence of the sample. Otherwise the limit would
-- unncessarily be exceeding.
local new_adjusted_count = new_count - 1

if new_adjusted_count >= self.limit then
-- incr above might take long enough to make difference, so
-- we recalculate time-dependant variables.
remaining_time = self.window_size - ngx_now() % self.window_size

return new_adjusted_count, remaining_time, nil
end

return estimated_final_count, nil, nil
end

return _M
44 changes: 44 additions & 0 deletions rootfs/etc/nginx/lua/resty/global_throttle/store.lua
@@ -0,0 +1,44 @@
local require = require
local string_format = string.format

-- Providers are lazily loaded based on given options.
-- Every store provider should implement `:incr(key, delta, expiry)`
-- that returns new value and an error and `:get(key)` that returns value
-- corresponding to given `ket` and an error if there's any.
local providers = {}

local _M = {}

function _M.new(options)
if not options then
return nil, "'options' param is missing"
end

if not options.provider then
return nil, "'provider' attribute is missing"
end

if not providers[options.provider] then
local provider_implementation_path =
string_format("resty.global_throttle.store.%s", options.provider)
local provider_implementation = require(provider_implementation_path)

if not provider_implementation then
return nil,
string_format("given 'store' implementation was not found in: '%s'",
provider_implementation_path)
end

providers[options.provider] = provider_implementation
end

local provider_implementation_instance, err =
providers[options.provider].new(options)
if not provider_implementation_instance then
return nil, err
end

return provider_implementation_instance, nil
end

return _M
105 changes: 105 additions & 0 deletions rootfs/etc/nginx/lua/resty/global_throttle/store/memcached.lua
@@ -0,0 +1,105 @@
local memcached = require("resty.memcached")

local string_format = string.format
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local setmetatable = setmetatable
local tonumber = tonumber

local _M = {}
local mt = { __index = _M }

function _M.new(options)
if not options.host or not options.port then
return nil, "'host' and 'port' options are required"
end

return setmetatable({
options = options,
}, mt), nil
end

local function with_client(self, action)
local memc, err = memcached:new()
if not memc then
return nil, string_format("failed to instantiate memcached: %s", err)
end

if self.options.connect_timeout and self.options.connect_timeout > 0 then
local ok
ok, err = memc:set_timeout(self.options.connect_timeout)
if not ok then
return nil, string_format("error setting connect timeout: %s", err)
end
end

local ok
ok, err = memc:connect(self.options.host, self.options.port)
if not ok then
return nil, string_format("failed to connect: %s", err)
end

local ret1, ret2 = action(memc)

if self.options.max_idle_timeout and self.options.pool_size then
ok, err =
memc:set_keepalive(self.options.max_idle_timeout, self.options.pool_size)
else
ok, err = memc:close()
end
if not ok then
ngx_log(ngx_ERR, err)
end

return ret1, ret2
end

function _M.incr(self, key, delta, expiry)
return with_client(self, function(memc)
local err_pattern =
string_format("%%s failed for key '%s', expiry '%s': %%s", key, expiry)
local new_value, err = memc:incr(key, delta)
if err then
if err ~= "NOT_FOUND" then
return nil, string_format(err_pattern, "increment", err)
end

local ok
ok, err = memc:add(key, delta, expiry)
if ok then
new_value = delta
elseif err == "NOT_STORED" then
-- possibly the other worker added the key, so attempting to incr again
new_value, err = memc:incr(key, delta)
if err then
return nil, string_format(err_pattern, "increment", err)
end
else
return nil, string_format(err_pattern, "add", err)
end
end

return tonumber(new_value), nil
end)
end

function _M.get(self, key)
return with_client(self, function(memc)
local value, flags, err = memc:get(key)
if err then
return nil, string_format("'get' failed for '%s': %s", key, err)
end
if value == nil and flags == nil and err == nil then
return nil, nil
end
return tonumber(value), nil
end)
end

function _M.__flush_all(self)
return with_client(self, function(memc)
return memc:flush_all()
end)
end

return _M
50 changes: 50 additions & 0 deletions rootfs/etc/nginx/lua/resty/global_throttle/store/shared_dict.lua
@@ -0,0 +1,50 @@
local ngx = ngx
local ngx_log = ngx.log
local ngx_WARN = ngx.WARN
local string_format = string.format
local setmetatable = setmetatable

local _M = {}
local mt = { __index = _M }

function _M.new(options)
if not options.name then
return nil, "shared dictionary name is mandatory"
end

local dict = ngx.shared[options.name]
if not dict then
return nil,
string_format("shared dictionary with name \"%s\" is not configured",
options.name)
end

return setmetatable({
dict = dict,
}, mt), nil
end

function _M.incr(self, key, delta, expiry)
local new_value, err, forcible = self.dict:incr(key, delta, 0, expiry)
if err then
return nil, err
end

if forcible then
ngx_log(ngx_WARN,
"shared dictionary is full, removed valid key(s) to store the new one")
end

return new_value, nil
end

function _M.get(self, key)
local value = self.dict:get(key)
if not value == nil then
return nil, "not found"
end

return value, nil
end

return _M

0 comments on commit 3bfed52

Please sign in to comment.