Skip to content

Commit

Permalink
Merge pull request #18 from cabrera/fix_governor
Browse files Browse the repository at this point in the history
fix(governor): rate limiting gets 'stuck'
  • Loading branch information
balajiiyer committed Mar 7, 2014
2 parents 3bc74ce + 2c80830 commit 583c2ec
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 164 deletions.
200 changes: 65 additions & 135 deletions eom/governor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import division
import collections
import logging
import re
import time

from oslo.config import cfg
import simplejson as json

LOG = logging.getLogger(__name__)
CONF = cfg.CONF

OPT_GROUP_NAME = 'eom:governor'
OPTIONS = [
cfg.StrOpt('rates_file'),
cfg.IntOpt('node_count', default=1),
cfg.IntOpt('period_sec', default=5),
cfg.FloatOpt('max_sleep_sec', default=0.5),
cfg.FloatOpt('sleep_threshold', default=0.1),
cfg.FloatOpt('sleep_offset', default=0.99),
]

CONF.register_opts(OPTIONS, group=OPT_GROUP_NAME)

LOG = logging.getLogger(__name__)


class Rate(object):
"""Represents an individual rate configuration."""
Expand All @@ -57,30 +59,35 @@ def __init__(self, document, period_sec, node_count):
:param dict document:
"""
self.name = document['name']
if 'route' in document:
self.route = re.compile(document['route'] + '$')
else:
self.route = None

if 'methods' in document:
self.methods = set(document['methods'])
else:
self.methods = None
self.route = (
re.compile(document['route'] + '$')
if 'route' in document else None
)
self.methods = (
set(document['methods']) if 'methods' in document
else None
)

self.hard_limit = document['hard_limit'] / node_count
self.soft_limit = document['soft_limit'] / node_count
self.target = float(self.soft_limit) / period_sec
self.target = self.soft_limit / period_sec

if self.hard_limit <= self.soft_limit:
raise ValueError('hard limit must be > soft limit')

if not period_sec > 0:
raise ValueError('period_sec must be > 0')

def applies_to(self, method, path):
"""Determines whether this rate applies to a given request.
:param str method: HTTP method, such as GET or POST
:param str path: URL path, such as "/v1/queues"
"""
if self.route is not None and not self.route.match(path):
if self.methods is not None and method not in self.methods:
return False

if self.methods is not None and method not in self.methods:
if self.route is not None and not self.route.match(path):
return False

return True
Expand All @@ -106,153 +113,83 @@ def _get_counter_key(project_id, bucket):
return project_id + ':bucket:' + bucket


def _get_throttle_key(project_id):
return project_id + ':throttle_until'


# TODO(kgriffs): Consider converting to closure-style
class Cache(object):
__slots__ = ('store',)

def __init__(self):
self.store = {}
self.store = collections.defaultdict(int)

def __repr__(self):
return str(self.store)

def inc_counter(self, project_id, bucket):
key = _get_counter_key(project_id, bucket)
try:
count = self.store[key] + 1
self.store[key] = count
except KeyError:
count = 1
self.store[key] = 1

return count
self.store[key] += 1
return self.store[key]

def get_counter(self, project_id, bucket):
key = _get_counter_key(project_id, bucket)
try:
return self.store[key]
except KeyError:
return 0
return self.store[key]

def set_counter(self, project_id, bucket, val):
key = _get_counter_key(project_id, bucket)
self.store[key] = val

def reset_counter(self, project_id, bucket):
key = _get_counter_key(project_id, bucket)
self.store[key] = 0

def set_throttle(self, project_id, period_sec):
key = _get_throttle_key(project_id)
self.store[key] = time.time() + period_sec

def is_throttled(self, project_id):
key = _get_throttle_key(project_id)
if key not in self.store:
return False

throttle_until = self.store[key]
now = time.time()

return now < throttle_until


def _create_calc_sleep(period_sec, cache, sleep_threshold, sleep_offset):
"""Creates a closure with the given params for convenience and perf."""

ctx = {'last_bucket': None}

def calc_sleep(project_id, rate):
# Alternate between two buckets of
# counters using a time function.
now = time.time()
normalized = now % (period_sec * 2)

if normalized < period_sec:
current_bucket = 'a'
previous_bucket = 'b'
else:
current_bucket = 'b'
previous_bucket = 'a'
normalized = int(time.time()) % (period_sec * 2)

current_bucket, previous_bucket = (
('a', 'b') if normalized < period_sec else ('b', 'a')
)

if ctx['last_bucket'] != current_bucket:
cache.reset_counter(project_id, current_bucket)
ctx['last_bucket'] = current_bucket

current_count = cache.inc_counter(project_id, current_bucket)
cache.inc_counter(project_id, current_bucket)
previous_count = cache.get_counter(project_id, previous_bucket)

if previous_count > rate.hard_limit:
cache.set_counter(project_id, previous_bucket, rate.hard_limit - 1)
raise HardLimitError()

if previous_count > rate.soft_limit:
# If they had been doing requests at rate.soft_limit then how
# long would it have taken for them to submit the same
# number of requests?
normalized_sec = float(previous_count) / rate.target

# Slow them down so they can only do rate.soft_limit during
# period_sec. Do this by delaying each request so that
# taken together, all requests will take the amount of
# time they should have taken had they followed the
# limit during the last time period.
extra_sec = normalized_sec - period_sec
sleep_per_request = extra_sec / previous_count

# Allow the rate to slightly exceed the limit so
# that when we cross over to the next time epoch,
# we will continue throttling. Otherwise, we can
# thrash between throttling and not throttling.
sleep_per_request *= sleep_offset

# Now, the per-request pause may be too small to sleep
# on, so we chunk it up over multiple requests. If
# the sleep time is too small, it will be less
# accurate as well as introducing too much context-
# switching overhead that could affect other requests
# not related to this project ID.
if sleep_per_request < sleep_threshold and False:
batch_size = int(sleep_threshold / sleep_per_request)

# Only sleep every N requests
if current_count % batch_size == 0:
return sleep_per_request * batch_size

else:
# Sleep on every request
return sleep_per_request
sleep_sec = (
(
(previous_count / rate.target) - period_sec
)
/ previous_count
) * sleep_offset
return sleep_sec if sleep_sec >= 0 else 0

return 0

return calc_sleep


def _log(level, message, **vars):
"""Logs at the given level with short-circuiting."""
if LOG.getEffectiveLevel() != level:
return

LOG.log(level, message % vars)


def _http_429(start_response):
"""Responds with HTTP 429."""
start_response('429 Too Many Requests', [('Content-Length', '0')])

# TODO(kgriffs): Return a helpful message in JSON or XML, depending
# on the accept header.
return []


def _http_400(start_response):
"""Responds with HTTP 400."""
start_response('400 Bad Request', [('Content-Length', '0')])

# TODO(kgriffs): Return a helpful message in JSON or XML, depending
# on the accept header.
return []


# NOTE(kgriffs): Using a functional style since it is more
# performant than an object-oriented one (middleware should
# introduce as little overhead as possible.)
def wrap(app):
"""Wrap a WSGI app with ACL middleware.
Expand All @@ -265,7 +202,6 @@ def wrap(app):

node_count = group['node_count']
period_sec = group['period_sec']
max_sleep_sec = group['max_sleep_sec']
sleep_threshold = group['sleep_threshold']
sleep_offset = group['sleep_offset']

Expand All @@ -276,7 +212,6 @@ def wrap(app):
calc_sleep = _create_calc_sleep(period_sec, cache,
sleep_threshold, sleep_offset)

# WSGI callable
def middleware(env, start_response):
path = env['PATH_INFO']
method = env['REQUEST_METHOD']
Expand All @@ -291,40 +226,35 @@ def middleware(env, start_response):
try:
project_id = env['HTTP_X_PROJECT_ID']
except KeyError:
LOG.error(_('Request headers did not include X-Project-ID'))
LOG.debug(_('Request headers did not include X-Project-ID'))
return _http_400(start_response)

try:
sleep_sec = calc_sleep(project_id, rate)
except HardLimitError:
message = _('Hit hard limit of %(rate)d per sec. for '
'project %(project_id)s according to '
'rate rule "%(name)s"')
message = _('Hit hard limit of {rate} per sec. for '
'project {project_id} according to '
'rate rule "{name}"')

hard_rate = rate.hard_limit / period_sec
_log(logging.DEBUG, message, rate=hard_rate, project_id=project_id,
name=rate.name)
LOG.debug(message,
{'rate': hard_rate,
'project_id': project_id,
'name': rate.name})

return _http_429(start_response)

if sleep_sec > max_sleep_sec:
message = _('Sleep time of %(sleep_sec)f sec. for '
'project %(project_id)s exceeded max'
'sleep time of %(max_sleep_sec)f sec.')

_log(logging.DEBUG, message, sleep_sec=sleep_sec,
project_id=project_id, max_sleep_sec=max_sleep_sec)

return _http_429(start_response)

if sleep_sec != 0:
message = _('Sleeping %(sleep_sec)f sec. for '
'project %(project_id)s to limit '
'rate to %(limit)d according to '
'rate rule "%(name)s"')

_log(logging.DEBUG, message, sleep_sec=sleep_sec,
project_id=project_id, limit=rate.soft_limit, name=rate.name)
if sleep_sec > 0:
message = _('Sleeping {sleep_sec} sec. for '
'project {project_id} to limit '
'rate to {limit} according to '
'rate rule "{name}"')

LOG.debug(message,
{'sleep_sec': sleep_sec,
'project_id': project_id,
'limit': rate.soft_limit,
'name': rate.name})

# Keep calm...
time.sleep(sleep_sec)
Expand Down
Loading

0 comments on commit 583c2ec

Please sign in to comment.