Skip to content

Commit

Permalink
Merge pull request #163 from mozilla-services/feat/159
Browse files Browse the repository at this point in the history
feat: Add percentage routing and migration status to user table
  • Loading branch information
jrconlin committed Dec 31, 2019
2 parents daf36fb + 828ff95 commit 07ef6b6
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 55 deletions.
5 changes: 4 additions & 1 deletion etc/tokenserver-dev.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ backend = tokenserver.assignment.memorynode.MemoryNodeAssignmentBackend
applications = sync-1.5
secrets_file = tokenserver/tests/secrets
service_entry = https://example.com
spanner_entry = https://spanner.example.com
spanner_node_id = 800
# this can be used to lock down the system to only existing accounts
#allow_new_users = true
migrate_new_user_percentage=0

[endpoints]
sync-1.5 = {node}/1.5/{uid}

[browserid]
backend = tokenserver.verifiers.LocalBrowserIdVerifier
audiences = https://token.services.mozilla.com

# Paster configuration for Pyramid
[filter:catcherror]
paste.filter_app_factory = mozsvc.middlewares:make_err_mdw
Expand Down
5 changes: 5 additions & 0 deletions tokenserver/assignment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
class INodeAssignment(Interface):
"""Interface definition for backend node-assignment db."""

def should_allocate_to_spanner(self, email):
"""Determine if this user is routed to spanner
"""

def get_user(self, service, email):
"""Returns the user record for the given service and email.
Expand Down
16 changes: 3 additions & 13 deletions tokenserver/assignment/memorynode.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@

from pyramid.threadlocal import get_current_registry
from zope.interface import implements

from tokenserver.assignment import INodeAssignment
Expand All @@ -16,18 +14,10 @@ class MemoryNodeAssignmentBackend(object):
implements(INodeAssignment)

def __init__(self, service_entry=None, **kw):
self._service_entry = service_entry
self.service_entry = service_entry
self._users = {}
self._next_uid = 1

@property
def service_entry(self):
"""Implement this as a property to have the context when looking for
the value of the setting"""
if self._service_entry is None:
settings = get_current_registry().settings
self._service_entry = settings.get('tokenserver.service_entry')
return self._service_entry
self.settings = kw or {}

def clear(self):
self._users.clear()
Expand All @@ -53,7 +43,7 @@ def allocate_user(self, service, email, generation=0, client_state='',
'keys_changed_at': keys_changed_at,
'client_state': client_state,
'old_client_states': {},
'first_seen_at': get_timestamp()
'first_seen_at': get_timestamp(),
}
self._users[(service, email)] = user
self._next_uid += 1
Expand Down
107 changes: 72 additions & 35 deletions tokenserver/assignment/sqlnode/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
import math
import traceback
import hashlib
from mozsvc.exceptions import BackendError

from sqlalchemy.sql import select, update, and_
Expand Down Expand Up @@ -188,12 +189,19 @@ class SQLNodeAssignment(object):
def __init__(self, sqluri, create_tables=False, pool_size=100,
pool_recycle=60, pool_timeout=30, max_overflow=10,
pool_reset_on_return='rollback', capacity_release_rate=0.1,
spanner_node_id=None, migrate_new_user_percentage=0,
**kw):
self._cached_service_ids = {}
self.sqluri = sqluri
if pool_reset_on_return.lower() in ('', 'none'):
pool_reset_on_return = None

self._spanner_node_id = spanner_node_id
self.migrate_new_user_percentage = migrate_new_user_percentage
if self.migrate_new_user_percentage and self._spanner_node_id is None:
raise ValueError("Must set spanner_node_id when migrating "
"users to spanner")

# Use production-ready pool settings for the MySQL backend.
# We also need to work around mysql using "LEAST(a,b)" and
# sqlite using "MIN(a,b)" in expressions.
Expand All @@ -208,9 +216,11 @@ def __init__(self, sqluri, create_tables=False, pool_size=100,
logging_name='tokenserver.assignment.sqlnode'
)
self._sqlfunc_min = sqlfunc.least
self._sqlfunc_max = sqlfunc.greatest
else:
self._engine = create_engine(sqluri, poolclass=NullPool)
self._sqlfunc_min = sqlfunc.min
self._sqlfunc_max = sqlfunc.max

self._engine.echo = kw.get('echo', False)
self.capacity_release_rate = capacity_release_rate
Expand Down Expand Up @@ -280,7 +290,7 @@ def get_user(self, service, email):
'keys_changed_at': cur_row.keys_changed_at or 0,
'client_state': cur_row.client_state,
'old_client_states': {},
'first_seen_at': cur_row.created_at
'first_seen_at': cur_row.created_at,
}
# If the current row is marked as replaced or is missing a node,
# and they haven't been retired, then assign them a new node.
Expand All @@ -304,18 +314,32 @@ def get_user(self, service, email):
finally:
res.close()

def should_allocate_to_spanner(self, email):
"""use a simple, reproducable hashing mechanism to determine if
a user should be provisioned to spanner. Does not need to be
secure, just a selectable percentage."""
if self.migrate_new_user_percentage:
pick = ord(hashlib.sha1(email.encode()).digest()[0])
return pick < (256 * (self.migrate_new_user_percentage * .01))
else:
return False

def allocate_user(self, service, email, generation=0, client_state='',
keys_changed_at=0, node=None, timestamp=None):
if timestamp is None:
timestamp = get_timestamp()
if node is None:
nodeid, node = self.get_best_node(service)
nodeid, node = self.get_best_node(service, email)
else:
nodeid = self.get_node_id(service, node)
params = {
'service': service, 'email': email, 'nodeid': nodeid,
'generation': generation, 'keys_changed_at': keys_changed_at,
'client_state': client_state, 'timestamp': timestamp
'service': service,
'email': email,
'nodeid': nodeid,
'generation': generation,
'keys_changed_at': keys_changed_at,
'client_state': client_state,
'timestamp': timestamp
}
res = self._safe_execute(_CREATE_USER_RECORD, **params)
res.close()
Expand All @@ -327,7 +351,7 @@ def allocate_user(self, service, email, generation=0, client_state='',
'keys_changed_at': keys_changed_at,
'client_state': client_state,
'old_client_states': {},
'first_seen_at': timestamp
'first_seen_at': timestamp,
}

def update_user(self, service, user, generation=None, client_state=None,
Expand All @@ -341,7 +365,7 @@ def update_user(self, service, user, generation=None, client_state=None,
'service': service,
'email': user['email'],
'generation': generation,
'keys_changed_at': keys_changed_at,
'keys_changed_at': keys_changed_at
}
res = self._safe_execute(_UPDATE_USER_RECORD_IN_PLACE, **params)
res.close()
Expand Down Expand Up @@ -521,12 +545,16 @@ def add_node(self, service, node, capacity, **kwds):
available = math.ceil(capacity * self.capacity_release_rate)
res = self._safe_execute(sqltext(
"""
insert into nodes (service, node, available, capacity,
current_load, downed, backoff)
values (:service, :node, :available, :capacity,
insert into nodes (id, service, node, available, capacity,
current_load, downed, backoff)
values (:nodeid, :service, :node, :available, :capacity,
:current_load, :downed, :backoff)
"""),
service=service, node=node, capacity=capacity, available=available,
nodeid=kwds.get('nodeid', None),
service=service,
node=node,
capacity=capacity,
available=available,
current_load=kwds.get('current_load', 0),
downed=kwds.get('downed', 0),
backoff=kwds.get('backoff', 0),
Expand Down Expand Up @@ -596,33 +624,38 @@ def unassign_node(self, service, node, timestamp=None, nodeid=None):
)
res.close()

def get_best_node(self, service):
def get_best_node(self, service, email):
"""Returns the 'least loaded' node currently available, increments the
active count on that node, and decrements the slots currently available
"""
nodes = self._get_nodes_table(service)
service = self._get_service_id(service)

# Pick the least-loaded node that has available slots.
where = [nodes.c.service == service,
nodes.c.available > 0,
nodes.c.capacity > nodes.c.current_load,
nodes.c.downed == 0,
nodes.c.backoff == 0]

query = select([nodes]).where(and_(*where))

if self._is_sqlite:
# sqlite doesn't have the 'log' funtion, and requires
# coercion to a float for the sorting to work.
query = query.order_by(nodes.c.current_load * 1.0 /
nodes.c.capacity)
query = select([nodes])
send_to_spanner = self.should_allocate_to_spanner(email)
if send_to_spanner:
query = query.where(nodes.c.id == self._spanner_node_id)
else:
# using log() increases floating-point precision on mysql
# and thus makes the sorting more accurate.
query = query.order_by(sqlfunc.log(nodes.c.current_load) /
sqlfunc.log(nodes.c.capacity))
query = query.limit(1)
# Pick the least-loaded node that has available slots.
query = query.where(and_(
nodes.c.service == service,
nodes.c.available > 0,
nodes.c.capacity > nodes.c.current_load,
nodes.c.downed == 0,
nodes.c.backoff == 0
))
if self._is_sqlite:
# sqlite doesn't have the 'log' funtion, and requires
# coercion to a float for the sorting to work.
query = query.order_by(
nodes.c.current_load * 1.0 /
nodes.c.capacity)
else:
# using log() increases floating-point precision on mysql
# and thus makes the sorting more accurate.
query = query.order_by(
sqlfunc.log(nodes.c.current_load) /
sqlfunc.log(nodes.c.capacity))
query = query.limit(1)

# We may have to re-try the query if we need to release more capacity.
# This loop allows a maximum of five retries before bailing out.
Expand All @@ -647,6 +680,8 @@ def get_best_node(self, service):
res.close()
if res.rowcount == 0:
break
else:
break

# Did we succeed in finding a node?
if row is None:
Expand All @@ -656,11 +691,13 @@ def get_best_node(self, service):
node = str(row.node)

# Update the node to reflect the new assignment.
# This is a little racy with concurrent assignments, but no big deal.
# This is a little racy with concurrent assignments, but no big
# deal.
where = [nodes.c.service == service, nodes.c.node == node]
where = and_(*where)
fields = {'available': nodes.c.available - 1,
'current_load': nodes.c.current_load + 1}
fields = {'current_load': nodes.c.current_load + 1}
if not send_to_spanner:
fields['available'] = self._sqlfunc_max(nodes.c.available - 1, 0)
query = update(nodes, where, fields)
con = self._safe_execute(query, close=True)
con.close()
Expand Down
9 changes: 5 additions & 4 deletions tokenserver/scripts/allocate_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
logger = logging.getLogger("tokenserver.scripts.allocate_user")


def allocate_user(config_file, service, email, node=None):
def allocate_user(config, service, email, node=None):
logger.info("Allocating node for user %s", email)
logger.debug("Using config file %r", config_file)
config = tokenserver.scripts.load_configurator(config_file)
config.begin()
try:
backend = config.registry.getUtility(INodeAssignment)
Expand Down Expand Up @@ -68,14 +66,17 @@ def main(args=None):
tokenserver.scripts.configure_script_logging(opts)

config_file = os.path.abspath(args[0])
logger.debug("Using config file %r", config_file)
config = tokenserver.scripts.load_configurator(config_file)

service = args[1]
email = args[2]
if len(args) == 3:
node_name = None
else:
node_name = args[3]

allocate_user(config_file, service, email, node_name)
allocate_user(config, service, email, node_name)
return 0


Expand Down
23 changes: 22 additions & 1 deletion tokenserver/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,9 @@ def test_node_type_is_returned_in_response(self):

class TestServiceWithSQLBackend(TestService):

spanner_node = "https://spanner.example.com"
mysql_node = "https://example.com"

def get_ini(self):
return os.path.join(os.path.dirname(__file__),
'test_sql.ini')
Expand All @@ -806,8 +809,14 @@ def setUp(self):
self.backend._safe_execute('delete from users')
# Ensure the necessary service exists in the db.
self.backend.add_service('sync-1.1', '{node}/1.1/{uid}')
self.backend.add_service('sync-1.5', '{node}/1.5/{uid}')
# Ensure we have a node with enough capacity to run the tests.
self.backend.add_node('sync-1.1', 'https://example.com', 100)
self.backend.add_node('sync-1.1', self.mysql_node, 100)
self.backend.add_node('sync-1.5', self.mysql_node, 100)
# Ensure we have a spanner node, but give it no capacity
# so users are not assigned to it except under special
# circumstances.
self.backend.add_node('sync-1.5', self.spanner_node, 0, nodeid=800)

def tearDown(self):
# And clean up at the end, for good measure.
Expand All @@ -816,6 +825,18 @@ def tearDown(self):
self.backend._safe_execute('delete from users')
super(TestServiceWithSQLBackend, self).tearDown()

def test_assign_new_users_to_spanner(self):
self.backend.migrate_new_user_percentage = 1
# These emails are carefully selected so that the first is assigned
# to spanner, but the second will not be.
EMAIL0 = "abO-test@example.com"
EMAIL1 = "abT-test@example.com"

user0 = self.backend.allocate_user("sync-1.5", EMAIL0)
user1 = self.backend.allocate_user("sync-1.5", EMAIL1)
self.assertEquals(user0['node'], self.spanner_node)
self.assertEquals(user1['node'], self.mysql_node)


class TestServiceWithNoBackends(unittest.TestCase):

Expand Down
2 changes: 2 additions & 0 deletions tokenserver/tests/test_sql.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ secrets.master_secrets = "abcdef"
"123456"
node_type_patterns =
example:*example.com
spanner_node_id = 800
migrate_new_user_percentage=0

[endpoints]
sync-1.1 = {node}/1.1/{uid}
Expand Down
2 changes: 1 addition & 1 deletion tokenserver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def return_token(request):
- **id** -- a signed authorization token, containing the
user's id for hthe application and the node.
- **secret** -- a secret derived from the shared secret
- **uid** -- the user id for this servic
- **uid** -- the user id for this service
- **api_endpoint** -- the root URL for the user for the service.
"""
# at this stage, we are sure that the credentials, application and version
Expand Down

0 comments on commit 07ef6b6

Please sign in to comment.