Permalink
Browse files

Remove all forms code in node module.

  • Loading branch information...
1 parent 339c708 commit 0d38f464d04324e3b23328c15df7004bbda9514b @malthe malthe committed Aug 5, 2011
Showing with 225 additions and 426 deletions.
  1. +22 −131 lumin/models/user.py
  2. +81 −295 lumin/node.py
  3. +122 −0 lumin/schema.py
View
153 lumin/models/user.py
@@ -1,145 +1,36 @@
-import colander
-from colander import Float
-from colander import SchemaNode
-from colander import String
-
-import deform
-
from pyramid.security import authenticated_userid
from pyramid.security import Allow
from pyramid.security import Everyone
+from pyramid.security import has_permission
from lumin.node import ContextById
+from lumin.node import Collection
-@colander.deferred
-def deferred_username_validator(node, kw):
- request = kw['request']
- def validate_username(node, value):
- if len(value) < 4 or len(value) > 24:
- raise colander.Invalid(node,
- "Length of user name must be between 4 and \
- 24 lowercase alphanumeric characters")
- if not value.replace('_', '').isalnum() or not value.islower():
- raise colander.Invalid(node,
- "Only lowercase numbers, letters and \
- underscores are permitted")
- if not value[0].isalpha():
- raise colander.Invalid(node,
- "The username must start with a \
- letter")
- collection = request.context.collection
- available = collection.find({'_id': value}).count()==0
- if not available:
- raise colander.Invalid(node, "Username is not available")
- return validate_username
-
-email_widget = deform.widget.CheckedInputWidget(
- subject="Email",
- confirm_subject="Confirm Email",
- size=40
- )
-
-class EmailSchema(colander.Schema):
- email = SchemaNode(String(),
- title="email",
- description='Type your email address and confirm it',
- validator=colander.Email(),
- widget=email_widget)
+class UserManagement(Collection):
+ __acl__ = (
+ (Allow, Everyone, 'join'),
+ (Allow, 'group:managers', ('add', 'delete')),
+ )
+ collection = 'users'
-class PasswordSchema(colander.Schema):
- password = SchemaNode(String(),
- validator=colander.Length(min=6),
- widget = deform.widget.CheckedPasswordWidget(size=40),
- description="Type your password and confirm it")
+class User(ContextById):
+ __acl__ = (
+ (Allow, 'group:managers', 'view', 'edit'),
+ )
-class UserSchema(colander.MappingSchema):
- _id = SchemaNode(String(),
- title="Username",
- description="The name of the participant",
- validator=deferred_username_validator)
- given_name = SchemaNode(String(), missing='',
- title="Given Name")
- surname = SchemaNode(String(), missing='',
- title="Surname")
- street_address = SchemaNode(String(), missing='',
- title="Street Address",
- description='Address info (number, street, unit)')
- locality = SchemaNode(String(), missing='',
- title='City',
- description="City or township name")
- ## TODO: There must be an ISO list for this
- region = SchemaNode(String(), missing='',
- title='Locality',
- description='State, Province, Township or equivalent')
- postal_code = SchemaNode(String(), missing='',
- title='Postal Code',
- description='ZIP or postal code')
- ## TODO: make this oneOf ISO countries
- country_name = SchemaNode(String(), missing='',
- title='Country',
- description='Country')
- telephone = SchemaNode(String(), missing='',
- title='Telephone Number')
- fax = SchemaNode(String(), missing='',
- title='Fax number')
- website_url = SchemaNode(String(), missing='',
- title='Website URL',
- description='I.e. http://example.com')
- latitude = SchemaNode(Float(), missing=colander.null,
- title='Latitude')
- longitude = SchemaNode(Float(), missing=colander.null,
- title='Longitude')
- email = SchemaNode(String(),
- title="email",
- description='Type your email address and confirm it',
- validator=colander.Email(),
- widget=email_widget)
- password = SchemaNode(String(),
- validator=colander.Length(min=6),
- widget = deform.widget.CheckedPasswordWidget(size=40),
- description="Type your password and confirm it")
-
-class SimpleUserSchema(colander.MappingSchema):
- _id = SchemaNode(String(),
- title="Username",
- description="The name of the participant",
- validator=deferred_username_validator)
- display_name = SchemaNode(String(), missing=colander.null,
- title="Display Name",
- widget=deform.widget.TextInputWidget(size=40))
- email = SchemaNode(String(),
- title="email",
- description='Type your email address and confirm it',
- validator=colander.Email(),
- widget=email_widget)
- password = SchemaNode(String(),
- validator=colander.Length(min=6),
- widget = deform.widget.CheckedPasswordWidget(size=40),
- description="Type your password and confirm it")
-
+ collection = 'users'
+ def __init__(self, request, **kwargs):
+ super(User, self).__init__(request, **kwargs)
-class User(ContextById):
- __acl__ = [
- (Allow, Everyone, 'view'), ## Really?
- (Allow, Everyone, ('add')),
- (Allow, 'group:users', ('add', 'edit')),
- (Allow, 'group:managers', ('add', 'edit', 'delete')),
- ]
- __parent__ = __collection__ = 'users'
- __schema__ = UserSchema
- button_name = 'Create User'
+ if self._id == authenticated_userid(request):
+ permissions = tuple(
+ permission for permission in ('view', 'edit') if
+ not has_permission(permission, self, request)
+ )
- def __init__(self, request):
- super(User, self).__init__(request)
- self.logged_in = authenticated_userid(request)
- self._id = request.matchdict.get('slug')
- if self._id == self.logged_in:
- if (Allow, self._id, ('edit', 'delete')) not in self.__acl__:
- self.__acl__.append((Allow, self._id, ('edit', 'delete')))
- if self._id != self.logged_in:
- if (Allow, self.logged_in, ('edit', 'delete')) in self.__acl__:
- self.__acl__.remove((Allow, self.logged_in, ('edit', 'delete')))
+ if permission:
+ self.__acl__ += ((Allow, self._id, permissions),)
View
376 lumin/node.py
@@ -1,368 +1,154 @@
import datetime
from pymongo.errors import DuplicateKeyError
-
-import colander
-import deform
-
from webob.exc import HTTPInternalServerError
from pyramid.exceptions import NotFound
from pyramid.security import Allow
from pyramid.security import Everyone
from lumin.util import TS_FORMAT
-from lumin.util import cancel
from lumin.util import normalize
-class RootFactory(object):
- __acl__ = [ (Allow, Everyone, 'view'),]
- __name__ = __parent__ = None
- __collection__ = None
- def __init__(self, request, collection=None):
- self.db = request.db
- self.fs = request.fs
- if request.get('mc', None):
- self.mc = request.mc
-
-
-class ContextById(RootFactory):
+class Factory(object):
+ """Pyramid context factory base class."""
- __acl__ = [] ## this should become _default__acl__
+ __acl__ = [
+ (Allow, Everyone, 'view'),
+ ]
- #: the collection name we will use in the DB
- __collection__ = None #'root'
- __name__ = __parent__ = None
- __schema__ = colander.Schema
- button_name = "Submit"
+ __name__ = __parent__ = None
- def __init__(self, request, _id=None):
- super(ContextById, self).__init__(request)
+ def __init__(self, request):
self.request = request
- self.environ = request.environ
- self.data={}
- ## These next two can prolly use the setters below, maybe...
- ## but this way you can set it as a class variable and then
- ## override it live with another coll/schema and then get the
- ## original back by self.property = self.__property__
- ## This is perhaps desirable for our two schemas one form
- ## dilemma. Use a non-validating (all colander.null) schema
- ## while filling shit out then self.schema = ValidatingSchema
- ## when finalizing and submitting.
- self._collection = self.db[self.__collection__]
- self._schema = self.__schema__().bind(request=self.request)
- self._id = _id if _id else request.matchdict.get('slug')
- if self._id:
- cursor = self.collection.find(
- {'_id' : self._id}
- )
- try:
- assert cursor.count() < 2
- self.data = cursor.next()
- except StopIteration:
- raise NotFound
- except AssertionError:
- raise HTTPInternalServerError
- @property
- def __name__(self):
- return self._id
- @property
- def collection(self):
- """
- returns the :term:`context` factory's :term:`collection` name
- """
- return self._collection
+class Collection(Factory):
+ """Represents a collection context."""
- @collection.setter
- def collection(self, coll):
- """
- sets the context factory's collection
+ # Database collection name
+ collection = None
- :param coll: The :term:`collection` name as ``unicode``, ``str``
- """
- if not isinstance(coll, (unicode, str)):
- raise TypeError("{} is not unicode, str")
- self._collection = self.db[coll]
+ def __init__(self, request):
+ super(Collection, self).__init__(request)
+ self._collection = request.db[self.collection]
@property
- def schema(self):
- """
- returns the context factory's schema
- """
- return self._schema
-
- @schema.setter
- def schema(self, schema, bind=True):
- """
- sets the context factory's schema
-
- :param schema: an instance of ``colander.MappingSchema``
- :param bind: whether the request should be bound to the
- schema, defaults to True. This is necessary for
- colander.deferred to work with the db which is attached to the
- request.
- """
- if not issubclass(schema, colander.Schema):
- raise TypeError("{} is not a colander.MappingSchema")
- if bind:
- self._schema = schema().bind(request=self.request)
- else:
- self._schema = schema()
-
-
-
- def add_form(self):
- """
- :rtype: a tuple consisting of the form and and required static resources
-
- This form is for adding a new that does not yet have a
- :term:`context`. This isn't entirely true. We have generated a
- context here, but is isn't in the DB yet and it has no data
- yet. It is essentially a context shell at this point.
- """
- buttons = (deform.form.Button(name = "submit",
- title = self.button_name
- ),
- cancel)
- form = deform.Form(self.schema, buttons=buttons)
- resources = form.get_widget_resources()
- return (form, resources)
-
- def edit_form(self):
- """
- :rtype: a tuple consisting of the form and and required static resources.
+ def __name__(self):
+ return self.collection
- This form is for editing an existing item represented by this :term:`context`
+ def find(self, **kwargs):
+ return self._collection.find(**kwargs)
- TODO: can these two forms be the same form?
+ def insert(self, doc, title_or_id, increment=True, seperator=u'-'):
"""
- buttons = (deform.form.Button(name = "submit",
- title = "Update"
- ),
- cancel)
- form = deform.Form(self.schema, buttons=buttons)
- resources = form.get_widget_resources()
- return (form, resources)
-
-
- def insert(self,
- doc,
- title_or_id,
- increment=True,
- seperator=u'-'):
- """
- Insert the item this ``context`` represents into the
- :term:`collection`.
+ Insert ``doc`` into the :term:`collection`.
:param doc: A dictionary to be stored in the DB
:param title_or_id: a string to be normalized for a URL and used as the _id for the document.
:param increment: Whether to increment ``title_or_id`` if it already exists in the DB. **Default: ``True``**
:param seperator: carachter to separate ``title_or_id`` incremental id. **Default: ``u"-"``**
"""
+
ctime = mtime = datetime.datetime.utcnow().strftime(TS_FORMAT)
doc['ctime'] = ctime
doc['mtime'] = mtime
doc['_id'] = normalize(title_or_id)
+
if increment:
- suffix=0
+ suffix = 0
_id = doc['_id']
while True:
try:
- oid=self.collection.insert(doc, safe=True)
+ oid = self._collection.insert(doc, safe=True)
break
- except DuplicateKeyError as e:
- suffix+=1
+ except DuplicateKeyError:
+ suffix += 1
_id_suffixed = u','.join([_id, unicode(suffix)])
doc['_id'] = _id_suffixed
else:
- oid = self.collection.insert(doc, safe=True)
- return oid
+ oid = self._collection.insert(doc, safe=True)
- def update(self):
- """
- Update the item this ``context`` represents in its
- :term:`collection`
- """
- self.data['mtime'] = datetime.datetime.utcnow().strftime(TS_FORMAT)
- result = self.collection.update({"_id" : self.data["_id"] },
- self.data,
- manipulate=True,
- safe=True)
- return result
+ return oid
def delete(self, safe=False):
"""
Remove the entry represented by this ``context`` from this
:term:`collection`
"""
- result = self.collection.remove(self.data["_id"],
- safe=safe)
+ result = self._collection.remove(self.data["_id"], safe=safe)
if safe and result['err']:
raise result['err']
-class ContextBySpec(RootFactory):
- """
- Like ContextById but takes a *spec*ifying dictionary instead.
- :param request: A pyramid request object
- :param spec: A dictionary to use to extract the desired item from the DB
- :param unique: Should this context be a single item
- """
- _default__acl__ = __acl__ = []
+class ContextById(Collection):
+ def __init__(self, request, _id=None):
+ super(ContextById, self).__init__(request)
- #: the collection name we will use in the DB
- __collection__ = None
- __name__ = __parent__ = None
- __schema__ = colander.Schema
- button_name = "Submit"
+ # We get the object id from the request slug; the ``_id``
+ # keyword argument is just for testing purposes
+ self._id = _id if _id is not None else \
+ request.matchdict['slug']
- def __init__(self, request, spec=None, unique=True):
- super(ContextBySpec, self).__init__(request)
- self.request = request
- self.environ = request.environ
- self.spec = spec
- self.unique = unique
- self.data={}
- self._collection = self.db[self.__collection__]
- self._schema = self.__schema__().bind(request=self.request)
- for item in self._default__acl__:
- if item not in self.__acl__:
- self.__acl__.append(item)
- if self.spec:
- cursor = self.collection.find(spec)
- if self.unique:
- try:
- assert cursor.count() < 2
- self.data = cursor.next()
- self._id = self.data['_id']
- except StopIteration:
- raise NotFound
- except AssertionError:
- raise HTTPInternalServerError("More than one result "
- + "matched the spec")
- acl = self.data.get('__acl__', None)
- if acl:
- self.__acl__.extend(acl)
+ cursor = self._collection.find({'_id': self._id})
+ if cursor.count() > 1:
+ raise HTTPInternalServerError(
+ "Duplicate object found for '%s'." % self._id
+ )
+
+ try:
+ self.data = cursor.next()
+ except StopIteration:
+ raise NotFound(self._id)
@property
def __name__(self):
- ## this is probably wrong, but maybe not need to think.
return self._id
- @property
- def collection(self):
- """
- returns the :term:`context` factory's :term:`collection` name
- """
- return self._collection
-
- @collection.setter
- def collection(self, coll):
- """
- sets the context factory's collection
-
- :param coll: The :term:`collection` name as ``unicode``, ``str``
- """
- if not isinstance(coll, (unicode, str)):
- raise TypeError("{} is not unicode, str")
- self._collection = self.db[coll]
-
- @property
- def schema(self):
- """
- returns the context factory's schema
- """
- return self._schema
-
- @schema.setter
- def schema(self, schema, bind=True):
- """
- sets the context factory's schema
+ def save(self):
+ return self.update(self.data)
- :param schema: an instance of ``colander.MappingSchema``
- :param bind: whether the request should be bound to the
- schema, defaults to True. This is necessary for
- colander.deferred to work with the db which is attached to the
- request.
+ def update(self, data):
"""
- if not issubclass(schema, colander.Schema):
- raise TypeError("{} is not a colander.MappingSchema")
- if bind:
- self._schema = schema().bind(request=self.request)
- else:
- self._schema = schema()
-
-
-
- def add_form(self):
+ Update the item this ``context`` represents in its
+ :term:`collection`.
"""
- :rtype: a tuple consisting of the form and and required static resources
- This form is for adding a new that does not yet have a
- :term:`context`. This isn't entirely true. We have generated a
- context here, but is isn't in the DB yet and it has no data
- yet. It is essentially a context shell at this point.
- """
- buttons = (deform.form.Button(name = "submit",
- title = self.button_name
- ),
- cancel)
- form = deform.Form(self.schema, buttons=buttons)
- resources = form.get_widget_resources()
- return (form, resources)
-
- def edit_form(self):
- """
- :rtype: a tuple consisting of the form and and required static resources.
+ self.data = data
+ self.data['mtime'] = datetime.datetime.utcnow().strftime(TS_FORMAT)
- This form is for editing an existing item represented by this :term:`context`
+ return self._collection.update(
+ {"_id": self._id},
+ self.data,
+ manipulate=True,
+ safe=True
+ )
- TODO: can these two forms be the same form?
- """
- buttons = (deform.form.Button(name = "submit",
- title = "Update"
- ),
- cancel)
- form = deform.Form(self.schema, buttons=buttons)
- resources = form.get_widget_resources()
- return (form, resources)
+class ContextBySpec(Collection):
+ """
+ Like ContextById but takes a *spec*ifying dictionary instead.
- def insert(self, doc):
- """
- Insert the item this ``context`` represents into the
- :term:`collection`. It generates the _id since we don't want
- to ask for docs by this attribute. The OID is returned.
+ :param request: A pyramid request object
+ :param spec: A dictionary to use to extract the desired item from the DB
+ :param unique: Should this context be a single item
+ """
- :param doc: A dictionary to be stored in the DB
- """
- ctime = mtime = datetime.datetime.utcnow().strftime(TS_FORMAT)
- doc['ctime'] = ctime
- doc['mtime'] = mtime
- oid = self.collection.insert(doc, safe=True)
- return oid
+ def __init__(self, request, spec=None, unique=True):
+ super(ContextBySpec, self).__init__(request)
- def update(self):
- """
- Update the item this ``context`` represents in its
- :term:`collection`
- """
- self.data['mtime'] = datetime.datetime.utcnow().strftime(TS_FORMAT)
- oid = self.collection.update({"_id" : self.data["_id"] },
- self.data,
- manipulate=True,
- safe=True)
- return oid
+ cursor = self._collection.find(spec)
+ if unique:
+ if cursor.count() > 1:
+ raise HTTPInternalServerError(
+ "Multiple objects found for specification: '%s'." % \
+ self._id
+ )
- def delete(self, safe=False):
- """
- Remove the entry represented by this ``context`` from this
- :term:`collection`
- """
- result = self.collection.remove(self.data["_id"],
- safe=safe)
- if safe and result['err']:
- raise result['err']
+ try:
+ self.data = cursor.next()
+ except StopIteration:
+ raise NotFound(spec)
+ else:
+ self.data = tuple(cursor)
View
122 lumin/schema.py
@@ -0,0 +1,122 @@
+import colander
+from colander import Float
+from colander import SchemaNode
+from colander import String
+
+import deform
+
+
+@colander.deferred
+def deferred_username_validator(node, kw):
+ request = kw['request']
+
+ def validate_username(node, value):
+ if len(value) < 4 or len(value) > 24:
+ raise colander.Invalid(node,
+ "Length of user name must be between 4 and \
+ 24 lowercase alphanumeric characters")
+ if not value.replace('_', '').isalnum() or not value.islower():
+ raise colander.Invalid(node,
+ "Only lowercase numbers, letters and \
+ underscores are permitted")
+ if not value[0].isalpha():
+ raise colander.Invalid(node,
+ "The username must start with a \
+ letter")
+
+ query = request.context.find(_id=value)
+
+ if query.count() > 0:
+ raise colander.Invalid(node, "Username is not available")
+
+ return validate_username
+
+
+email_widget = deform.widget.CheckedInputWidget(
+ subject="Email",
+ confirm_subject="Confirm Email",
+ size=40
+ )
+
+
+class EmailSchema(colander.Schema):
+ email = SchemaNode(String(),
+ title="email",
+ description='Type your email address and confirm it',
+ validator=colander.Email(),
+ widget=email_widget)
+
+
+class PasswordSchema(colander.Schema):
+ password = SchemaNode(String(),
+ validator=colander.Length(min=6),
+ widget=deform.widget.CheckedPasswordWidget(size=40),
+ description="Type your password and confirm it")
+
+
+class UserSchema(colander.MappingSchema):
+ _id = SchemaNode(String(),
+ title="Username",
+ description="The name of the participant",
+ validator=deferred_username_validator)
+ given_name = SchemaNode(String(), missing='',
+ title="Given Name")
+ surname = SchemaNode(String(), missing='',
+ title="Surname")
+ street_address = SchemaNode(
+ String(), missing='',
+ title="Street Address",
+ description='Address info (number, street, unit)')
+ locality = SchemaNode(String(), missing='',
+ title='City',
+ description="City or township name")
+ ## TODO: There must be an ISO list for this
+ region = SchemaNode(String(), missing='',
+ title='Locality',
+ description='State, Province, Township or equivalent')
+ postal_code = SchemaNode(String(), missing='',
+ title='Postal Code',
+ description='ZIP or postal code')
+ ## TODO: make this oneOf ISO countries
+ country_name = SchemaNode(String(), missing='',
+ title='Country',
+ description='Country')
+ telephone = SchemaNode(String(), missing='',
+ title='Telephone Number')
+ fax = SchemaNode(String(), missing='',
+ title='Fax number')
+ website_url = SchemaNode(String(), missing='',
+ title='Website URL',
+ description='I.e. http://example.com')
+ latitude = SchemaNode(Float(), missing=colander.null,
+ title='Latitude')
+ longitude = SchemaNode(Float(), missing=colander.null,
+ title='Longitude')
+ email = SchemaNode(String(),
+ title="email",
+ description='Type your email address and confirm it',
+ validator=colander.Email(),
+ widget=email_widget)
+ password = SchemaNode(String(),
+ validator=colander.Length(min=6),
+ widget=deform.widget.CheckedPasswordWidget(size=40),
+ description="Type your password and confirm it")
+
+
+class SimpleUserSchema(colander.MappingSchema):
+ _id = SchemaNode(String(),
+ title="Username",
+ description="The name of the participant",
+ validator=deferred_username_validator)
+ display_name = SchemaNode(String(), missing=colander.null,
+ title="Display Name",
+ widget=deform.widget.TextInputWidget(size=40))
+ email = SchemaNode(String(),
+ title="email",
+ description='Type your email address and confirm it',
+ validator=colander.Email(),
+ widget=email_widget)
+ password = SchemaNode(String(),
+ validator=colander.Length(min=6),
+ widget=deform.widget.CheckedPasswordWidget(size=40),
+ description="Type your password and confirm it")

0 comments on commit 0d38f46

Please sign in to comment.