Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

add auto finders syntax

  • Loading branch information...
commit f2f199de0d4c8707ff4f16279d7e34be1991ba34 1 parent e2353b1
@vangheem vangheem authored
View
5 develop.ini
@@ -42,6 +42,11 @@ sqlalchemy.url = sqlite:///%(here)s/test.db
mail.host = localhost
mail.port = 25
+autouserfinder = SQL
+autouserfinder.table_name = users
+autouserfinder.email_field = email
+autouserfinder.connection_string = sqlite:///%(here)s/users.db
+
[server:main]
use = egg:Paste#http
# Change to 0.0.0.0 to make public:
View
13 factored/app.py
@@ -8,6 +8,7 @@
from pyramid.httpexceptions import HTTPFound
from factored.models import DBSession
from factored.auth import getFactoredPlugins, getFactoredPlugin
+from factored.finders import getUserFinderPlugin
import os
from pyramid_mailer.mailer import Mailer
from factored.utils import get_context
@@ -64,8 +65,20 @@ def __init__(self, app, global_config, base_auth_url='/auth',
self.who = APIFactory([('auth_tkt', self.auth_tkt)],
[('auth_tkt', self.auth_tkt)], [], [],
default_request_classifier, default_challenge_decider)
+
+ # db configuration
engine = engine_from_config(settings, 'sqlalchemy.')
DBSession.configure(bind=engine)
+
+ finder_name = settings.get('autouserfinder', None)
+ if finder_name:
+ plugin = getUserFinderPlugin(finder_name)
+ if plugin:
+ self.userfinder = plugin(**get_settings(settings,
+ 'autouserfinder.'))
+ else:
+ raise Exception('User finder not found: %s', finder_name)
+
config = Configurator(settings=settings)
for plugin in getFactoredPlugins():
config.add_route(plugin.name, os.path.join(base_auth_url,
View
9 factored/auth.py
@@ -10,6 +10,7 @@
from datetime import timedelta
from factored.utils import get_google_auth_code
from factored.utils import get_context
+from factored.utils import create_user
_auth_plugins = []
@@ -74,7 +75,13 @@ def __init__(self, req):
self.send_submitted = self.validate_submitted = False
def get_user(self, username):
- return DBSession.query(User).filter_by(username=username).first()
+ user = DBSession.query(User).filter_by(username=username).first()
+ if user is None:
+ if 'userfinder' in self.req.registry['settings']:
+ finder = self.req.registry['settings']['userfinder']
+ if finder(username):
+ return create_user(username)
+ return user
def check_code(self, user):
return False
View
56 factored/finders.py
@@ -0,0 +1,56 @@
+from sqlalchemy import create_engine
+
+
+_finder_plugins = []
+
+
+def addUserFinderPlugin(plugin):
+ _finder_plugins.append(plugin)
+
+
+def getUserFinderPlugins():
+ return _finder_plugins
+
+
+def getUserFinderPlugin(name):
+ for plugin in _finder_plugins:
+ if plugin.name == name:
+ return plugin
+
+
+class SQLUserFinder(object):
+ """
+ Auto find related item from a SQL database
+ """
+ name = "SQL"
+
+ def __init__(self, table_name, email_field, connection_string):
+ self.connection_string = connection_string
+ self.table_name = table_name
+ self.email_field = email_field
+
+ @property
+ def engine(self):
+ return create_engine(self.connection_string)
+
+ def __call__(self, username):
+ res = self.engine.execute('select %s from %s where %s=?' % (
+ self.email_field, self.table_name, self.email_field),
+ username)
+ return len(res.fetchall()) > 0
+addUserFinderPlugin(SQLUserFinder)
+
+
+class EmailDomainFinderPlugin(object):
+ name = "Email Domain"
+
+ def __init__(self, valid_domains):
+ from factored import app
+ self.valid_domains = app._tolist(valid_domains)
+
+ def __call__(self, email):
+ for valid in self.valid_domains:
+ if email.endswith('@' + valid):
+ return True
+ return False
+addUserFinderPlugin(EmailDomainFinderPlugin)
View
11 factored/scripts/users.py
@@ -5,7 +5,8 @@
from pyramid.paster import get_appsettings, setup_logging
from factored.models import DBSession, User
-from factored.utils import get_barcode_image, generate_random_google_code
+from factored.utils import get_barcode_image
+from factored.utils import create_user
addparser = argparse.ArgumentParser(description='Add user')
@@ -24,12 +25,10 @@ def add():
engine = engine_from_config(settings, 'sqlalchemy.')
DBSession.configure(bind=engine)
with transaction.manager:
- secret = generate_random_google_code()
username = arguments.username
- model = User(username=username, secret=secret)
- DBSession.add(model)
- print 'barcode url:', get_barcode_image(username, secret)
- print 'secret:', secret
+ user = create_user(username)
+ print 'barcode url:', get_barcode_image(username, user.secret)
+ print 'secret:', user.secret
removeparser = argparse.ArgumentParser(description='Remove user')
removeparser.add_argument('config', help='configuration file')
View
8 factored/utils.py
@@ -48,3 +48,11 @@ def get_google_auth_code(secretkey, tm=None):
def get_context(req, **kwargs):
kwargs['static_path'] = req.registry['settings']['static_path']
return kwargs
+
+
+def create_user(username):
+ from factored.models import DBSession, User
+ secret = generate_random_google_code()
+ user = User(username=username, secret=secret)
+ DBSession.add(user)
+ return user
Please sign in to comment.
Something went wrong with that request. Please try again.