Permalink
Browse files

Merge pull request #12 from LeResKP/feature/onetoone_relation

Improve the onetoone relation
  • Loading branch information...
2 parents af22e19 + 35430e7 commit aab98fa59e39f83b46f86539ed9236ea908f9ab8 @ralphbean ralphbean committed Jan 9, 2013
Showing with 745 additions and 93 deletions.
  1. +180 −38 tests/test_utils.py
  2. +456 −0 tests/test_widgets.py
  3. +27 −50 tw2/sqla/factory.py
  4. +60 −3 tw2/sqla/utils.py
  5. +22 −2 tw2/sqla/widgets.py
View
218 tests/test_utils.py
@@ -18,8 +18,7 @@ def test_query_from_dict_simple(self):
'id': 1,
}
e = twsu.from_dict(self.DBTestCls1.query.filter_by(**d).first(), d)
- if hasattr(self, 'session'):
- self.session.flush()
+ self.session.flush()
assert(e.id == 1)
assert(e.name == 'foo')
assert(len(e.others) == 1)
@@ -32,8 +31,7 @@ def test_query_from_dict_empty(self):
x = self.DBTestCls1()
self.session.add(x)
e = twsu.from_dict(x, d)
- if hasattr(self, 'session'):
- self.session.flush()
+ self.session.flush()
assert(e.id == 2)
assert(e.name == None)
@@ -65,31 +63,113 @@ def test_from_dict_modify_to_none(self):
}
x = self.DBTestCls1.query.filter_by(id=2).first()
e = twsu.from_dict(x, d)
- if hasattr(self, 'session'):
- self.session.flush()
+ self.session.flush()
eq_(e.id, 2)
eq_(e.name, None)
eq_(len(e.others), 0)
-##
-## Not sure if this test should even be possible, but its sure broken now
-##
-#def test_from_dict_new_many_to_one_by_id(self):
-# #d = {
-# # #'id': None,
-# # #'nick': 'bazaar',
-# # #'other_id': 1,
-# #}
-# #e = twsu.from_dict(self.DBTestCls2(), d, getattr(self, 'session', None))
-# #if hasattr(self, 'session'):
-# # #self.session.flush()
-# #assert(e.id == 3)
-# #assert(e.nick == 'bazaar')
-# #print e.id, e.nick, e.other
-# #for q in e.other.others:
-# # #print "", q.id, q.nick, q.other
-# #assert(e in e.other.others)
+ def test_from_dict_modify_many_to_many(self):
+ d = {
+ 'id': 1,
+ 'surname': 'user1',
+ 'roles': [],
+ }
+ u = twsu.from_dict(self.session.query(self.DBTestCls3).one(), d)
+ self.session.add(u)
+ self.session.flush()
+
+ assert(self.session.query(self.DBTestCls3).count() == 1)
+ assert(self.session.query(self.DBTestCls4).count() == 1)
+ assert(u.id == 1)
+ assert(u.surname == 'user1')
+ assert(u.roles == [])
+
+ def test_from_dict_modify_one_to_one(self):
+ d = {
+ 'id': None,
+ 'name': 'user1',
+ 'account': {
+ 'account_name': 'account2',
+ }
+ }
+ u = twsu.from_dict(self.session.query(self.DBTestCls6).one(), d)
+ self.session.add(u)
+ self.session.flush()
+
+ assert(u.id == 1)
+ assert(u.name == 'user1')
+ assert(u.account.account_name == 'account2')
+ assert(self.session.query(self.DBTestCls5).count() == 1)
+
+ def test_from_dict_modify_one_to_one_to_none(self):
+ d = {
+ 'id': None,
+ 'name': 'user1',
+ 'account': None
+ }
+ u = twsu.from_dict(self.session.query(self.DBTestCls6).one(), d)
+ self.session.flush()
+
+ assert(u.id == 1)
+ assert(u.name == 'user1')
+ assert(u.account == None)
+ assert(self.session.query(self.DBTestCls5).count() == 0)
+
+ def test_from_dict_new_many_to_one_by_id(self):
+ d = {
+ 'id': None,
+ 'nick': 'bazaar',
+ 'other_id': 1,
+ }
+ e = twsu.from_dict(self.DBTestCls2(), d)
+ self.session.add(e)
+ self.session.flush()
+ assert(e.id == 3)
+ assert(e.nick == 'bazaar')
+ assert(len(e.other.others) == 2)
+ assert(e in e.other.others)
+
+ def test_from_dict_new_many_to_many_by_id(self):
+ d = {
+ 'id': None,
+ 'surname': 'user1',
+ 'roles': [self.admin_role],
+ }
+ u = twsu.from_dict(self.DBTestCls3(), d)
+ self.session.add(u)
+ self.session.flush()
+ assert(u.id == 2)
+ assert(u.surname == 'user1')
+ assert(u.roles == [self.admin_role])
+
+ def test_from_dict_new_one_to_one_by_id(self):
+ d = {
+ 'id': None,
+ 'name': 'user1',
+ 'account': self.DBTestCls5(account_name='account2'),
+ }
+ u = twsu.from_dict(self.DBTestCls6(), d)
+ self.session.add(u)
+ self.session.flush()
+ assert(u.id == 2)
+ assert(u.name == 'user1')
+ assert(u.account.account_name == 'account2')
+ assert(self.session.query(self.DBTestCls5).count() == 2)
+
+ def test_from_dict_new_one_to_one_by_id_no_account(self):
+ d = {
+ 'id': None,
+ 'name': 'user1',
+ 'account': None,
+ }
+ u = twsu.from_dict(self.DBTestCls6(), d)
+ self.session.add(u)
+ self.session.flush()
+ assert(u.id == 2)
+ assert(u.name == 'user1')
+ assert(u.account == None)
+ assert(self.session.query(self.DBTestCls5).count() == 1)
def test_from_dict_old_many_to_one_by_dict_recall(self):
assert(self.DBTestCls2.query.first().nick == 'bob')
@@ -101,8 +181,7 @@ def test_from_dict_old_many_to_one_by_dict_recall(self):
}
e = twsu.from_dict(self.DBTestCls2.query.first(), d)
- if hasattr(self, 'session'):
- self.session.flush()
+ self.session.flush()
assert(self.DBTestCls2.query.first().nick == 'updated')
assert(self.DBTestCls1.query.first().others[0].nick == 'updated')
@@ -118,8 +197,7 @@ def test_from_dict_old_many_to_one_by_dict(self):
x = self.DBTestCls2()
self.session.add(x)
e = twsu.from_dict(x, d)
- if hasattr(self, 'session'):
- self.session.flush()
+ self.session.flush()
assert(e.id == 3)
assert(e.nick == 'bazaar')
assert(e.other.name == 'foo')
@@ -136,11 +214,7 @@ def test_from_dict_new_many_to_one_by_dict(self):
x = self.DBTestCls2()
self.session.add(x)
e = twsu.from_dict(x, d)
- if hasattr(self, 'session'):
- self.session.flush()
- print e.id
- print e.nick
- print e.other
+ self.session.flush()
assert(e.id == 3)
assert(e.nick == 'bazaar')
assert(e in e.other.others)
@@ -160,11 +234,7 @@ def test_from_dict_new_one_to_many_by_dict(self):
x = self.DBTestCls1()
self.session.add(x)
e = twsu.from_dict(x, d)
- if hasattr(self, 'session'):
- self.session.flush()
- print e.id
- print e.name
- print e.others
+ self.session.flush()
assert(e.id == 2)
assert(e.name == 'qatar')
assert(e.others[0].nick == 'blang')
@@ -270,16 +340,43 @@ class DBTestCls2(el.Entity):
field=other_id,
backref='others')
+ class DBTestCls3(el.Entity):
+ surname = el.Field(el.String)
+ roles = el.ManyToMany('DBTestCls4')
+
+ class DBTestCls4(el.Entity):
+ rolename = el.Field(el.String)
+ users = el.ManyToMany('DBTestCls3')
+
+ class DBTestCls5(el.Entity):
+ account_name = el.Field(el.String, required=True)
+ user = el.OneToOne('DBTestCls6', inverse='account')
+
+ class DBTestCls6(el.Entity):
+ name = el.Field(el.String)
+ account_id = el.Field(el.Integer, required=False)
+ account = el.ManyToOne(DBTestCls5, field=account_id, inverse='user', uselist=False)
+
self.DBTestCls1 = DBTestCls1
self.DBTestCls2 = DBTestCls2
+ self.DBTestCls3 = DBTestCls3
+ self.DBTestCls4 = DBTestCls4
+ self.DBTestCls5 = DBTestCls5
+ self.DBTestCls6 = DBTestCls6
el.setup_all()
el.metadata.create_all()
foo = self.DBTestCls1(id=1, name='foo')
bob = self.DBTestCls2(id=1, nick='bob', other=foo)
george = self.DBTestCls2(id=2, nick='george')
+ fred = self.DBTestCls3(id=1, surname='fred')
+ admin = self.DBTestCls4(id=1, rolename='admin')
+ fred.roles.append(admin)
+ account1 = self.DbTestCls5(id=1, account_name='account1')
+ bob1 = self.DbTestCls6(id=1, name='bob1', account_id=1)
testapi.setup()
+ self.admin_role = admin
#def tearDown(self):
# import elixir as el
@@ -318,10 +415,44 @@ class DBTestCls2(Base):
other = sa.orm.relation(DBTestCls1,
backref=sa.orm.backref('others'))
+ join_table = sa.Table('Test3_Test4', Base.metadata,
+ sa.Column('Test3', sa.Integer, sa.ForeignKey('Test3.id'), primary_key=True),
+ sa.Column('Test4', sa.Integer, sa.ForeignKey('Test4.id'), primary_key=True)
+ )
+ class DBTestCls3(Base):
+ __tablename__ = 'Test3'
+ id = sa.Column(sa.Integer, primary_key=True)
+ surname = sa.Column(sa.String(50))
+ def __unicode__(self):
+ return self.surname
+ class DBTestCls4(Base):
+ __tablename__ = 'Test4'
+ id = sa.Column(sa.Integer, primary_key=True)
+ rolename = sa.Column(sa.String(50))
+ users = sa.orm.relationship('DBTestCls3', secondary=join_table, backref='roles')
+ def __unicode__(self):
+ return self.rolename
+
+ class DBTestCls5(Base):
+ __tablename__ = 'Test5'
+ id = sa.Column(sa.Integer, primary_key=True)
+ account_name = sa.Column(sa.String(50), nullable=False)
+
+ class DBTestCls6(Base):
+ __tablename__ = 'Test6'
+ id = sa.Column(sa.Integer, primary_key=True)
+ name = sa.Column(sa.String(50))
+ account_id = sa.Column(sa.Integer, sa.ForeignKey('Test5.id'), nullable=True)
+ account = sa.orm.relation(DBTestCls5, backref=sa.orm.backref('user', uselist=False))
+
Base.metadata.create_all()
self.DBTestCls1 = DBTestCls1
self.DBTestCls2 = DBTestCls2
+ self.DBTestCls3 = DBTestCls3
+ self.DBTestCls4 = DBTestCls4
+ self.DBTestCls5 = DBTestCls5
+ self.DBTestCls6 = DBTestCls6
foo = self.DBTestCls1(id=1, name='foo')
self.session.add(foo)
@@ -332,6 +463,17 @@ class DBTestCls2(Base):
george = self.DBTestCls2(id=2, nick='george')
self.session.add(george)
+ fred = self.DBTestCls3(id=1, surname='fred')
+ admin = self.DBTestCls4(id=1, rolename='admin')
+ fred.roles.append(admin)
+ self.session.add(fred)
+ self.admin_role = admin
+
+ account1 = self.DBTestCls5(id=1, account_name='account1')
+ self.session.add(account1)
+ bob1 = self.DBTestCls6(id=1, name='bob1', account_id=1)
+ self.session.add(bob1)
+
transaction.commit()
testapi.setup()
View
456 tests/test_widgets.py
@@ -85,6 +85,22 @@ class DbTestCls10(el.Entity):
def __unicode__(self):
return self.name
+ class DbTestCls11(el.Entity):
+ account_name = el.Field(el.String, required=True)
+ account_number = el.Field(el.String, required=True)
+ user = el.OneToOne('DbTestCls12', inverse='account')
+ def __unicode__(self):
+ return self.account_name
+
+ class DbTestCls12(el.Entity):
+ name = el.Field(el.String, required=True)
+ account_id = el.Field(el.Integer, required=False)
+ account = el.ManyToOne(DbTestCls11, field=account_id,
+ inverse='user', uselist=False)
+ def __unicode__(self):
+ return self.name
+
+
self.DbTestCls1 = DbTestCls1
self.DbTestCls2 = DbTestCls2
self.DbTestCls3 = DbTestCls3
@@ -95,6 +111,8 @@ def __unicode__(self):
self.DbTestCls8 = DbTestCls8
self.DbTestCls9 = DbTestCls9
self.DbTestCls10 = DbTestCls10
+ self.DbTestCls11 = DbTestCls11
+ self.DbTestCls12 = DbTestCls12
el.setup_all()
el.metadata.create_all()
@@ -121,6 +139,11 @@ def __unicode__(self):
bob1 = self.DbTestCls9(id=1, name='bob1', account_id=2)
assert(self.DbTestCls8.query.first().user == bob1)
assert(self.DbTestCls9.query.first().account == account1)
+ account1 = self.DbTestCls11(
+ id=2, account_name='account1', account_number='number1')
+ bob1 = self.DbTestCls12(id=1, name='bob1', account_id=2)
+ assert(self.DbTestCls11.query.first().user == bob1)
+ assert(self.DbTestCls12.query.first().account == account1)
transaction.commit()
return super(ElixirBase, self).setup()
@@ -206,6 +229,23 @@ class DbTestCls10(Base):
def __unicode__(self):
return self.name
+ class DbTestCls11(Base):
+ __tablename__ = 'Test11'
+ id = sa.Column(sa.Integer, primary_key=True)
+ account_name = sa.Column(sa.String(50), nullable=False)
+ account_number = sa.Column(sa.String(50), nullable=False)
+ def __unicode__(self):
+ return self.account_name
+ class DbTestCls12(Base):
+ __tablename__ = 'Test12'
+ id = sa.Column(sa.Integer, primary_key=True)
+ name = sa.Column(sa.String(50), nullable=False)
+ account_id = sa.Column(sa.Integer, sa.ForeignKey('Test11.id'),
+ nullable=True)
+ account = sa.orm.relation(DbTestCls11, backref=sa.orm.backref('user', uselist=False))
+ def __unicode__(self):
+ return self.name
+
self.DbTestCls1 = DbTestCls1
self.DbTestCls2 = DbTestCls2
@@ -217,6 +257,8 @@ def __unicode__(self):
self.DbTestCls8 = DbTestCls8
self.DbTestCls9 = DbTestCls9
self.DbTestCls10 = DbTestCls10
+ self.DbTestCls11 = DbTestCls11
+ self.DbTestCls12 = DbTestCls12
Base.metadata.create_all()
@@ -248,6 +290,13 @@ def __unicode__(self):
self.session.add(bob1)
assert(self.DbTestCls8.query.first().user == bob1)
assert(self.DbTestCls9.query.first().account == account1)
+ account1 = self.DbTestCls11(
+ id=2, account_name='account1', account_number='number1')
+ self.session.add(account1)
+ bob1 = self.DbTestCls12(id=1, name='bob1', account_id=2)
+ self.session.add(bob1)
+ assert(self.DbTestCls11.query.first().user == bob1)
+ assert(self.DbTestCls12.query.first().account == account1)
transaction.commit()
return super(SQLABase, self).setup()
@@ -2189,6 +2238,413 @@ class TestAutoEditRelationInFromElixir(ElixirBase, AutoEditRelationInFormT): pas
class TestAutoEditRelationInFormSQLA(SQLABase, AutoEditRelationInFormT): pass
+class NonRequiredOneToOneRelationT(WidgetTest):
+
+ def setup(self):
+ self.widget = self.widget(entity=self.DbTestCls12)
+ return super(NonRequiredOneToOneRelationT, self).setup()
+
+ widget = tws.DbFormPage
+ attrs = { 'id' : 'autoedit', 'title' : 'Test',
+ 'child' : tws.AutoTableForm}
+
+ expected = """
+<html>
+<head><title>Test</title></head>
+<body id="autoedit:page"><h1>Test</h1><form method="post" id="autoedit:form" enctype="multipart/form-data">
+ <span class="error"></span>
+ <table id="autoedit">
+ <tr class="odd required" id="autoedit:name:container">
+ <th>Name</th>
+ <td>
+ <input name="dbformpage_d:name" type="text" id="autoedit:name" />
+ <span id="autoedit:name:error"></span>
+ </td>
+ </tr><tr class="even" id="autoedit:account:fieldset:container">
+ <th>Account</th>
+ <td>
+ <fieldset id="autoedit:account:fieldset">
+ <legend></legend>
+ <table id="autoedit:account">
+ <tr class="odd" id="autoedit:account:account_name:container">
+ <th>Account Name</th>
+ <td>
+ <input name="account:account_name" type="text" id="autoedit:account:account_name" />
+ <span id="autoedit:account:account_name:error"></span>
+ </td>
+ </tr>
+ <tr class="even" id="autoedit:account:account_number:container">
+ <th>Account Number</th>
+ <td>
+ <input name="account:account_number" type="text" id="autoedit:account:account_number" />
+ <span id="autoedit:account:account_number:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:account:error"></span>
+ </td></tr>
+ </table>
+ </fieldset>
+ <span id="autoedit:account:fieldset:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:error"></span>
+ </td></tr>
+ </table>
+ <input type="submit" value="Save" />
+</form></body>
+</html>
+"""
+
+ declarative = True
+ def test_request_get_edit(self):
+ environ = {'REQUEST_METHOD': 'GET', 'QUERY_STRING' :'id=1'}
+ req=Request(environ)
+ assert(req.GET)
+ r = self.widget().request(req)
+ tw2test.assert_eq_xml(r.body, """
+<html>
+<head><title>Test</title></head>
+<body id="autoedit:page"><h1>Test</h1><form method="post" id="autoedit:form" enctype="multipart/form-data">
+ <span class="error"></span>
+ <table id="autoedit">
+ <tr class="odd required" id="autoedit:name:container">
+ <th>Name</th>
+ <td>
+ <input name="dbformpage_d:name" type="text" value="bob1" id="autoedit:name" />
+ <span id="autoedit:name:error"></span>
+ </td>
+ </tr><tr class="even" id="autoedit:account:fieldset:container">
+ <th>Account</th>
+ <td>
+ <fieldset id="autoedit:account:fieldset">
+ <legend></legend>
+ <table id="autoedit:account">
+ <tr class="odd" id="autoedit:account:account_name:container">
+ <th>Account Name</th>
+ <td>
+ <input name="account:account_name" type="text" value="account1" id="autoedit:account:account_name" />
+ <span id="autoedit:account:account_name:error"></span>
+ </td>
+ </tr>
+ <tr class="even" id="autoedit:account:account_number:container">
+ <th>Account Number</th>
+ <td>
+ <input name="account:account_number" type="text" value="number1" id="autoedit:account:account_number" />
+ <span id="autoedit:account:account_number:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:account:error"></span>
+ </td></tr>
+ </table>
+ </fieldset>
+ <span id="autoedit:account:fieldset:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:error"></span>
+ </td></tr>
+ </table>
+ <input type="submit" value="Save" />
+</form></body>
+</html>
+""")
+
+ def test_request_post_redirect(self):
+ environ = {'wsgi.input': StringIO('')}
+ req=Request(environ)
+ req.method = 'POST'
+ req.body='autoedit:name=toto&autoedit:account:account_name=plop&autoedit:account:account_number=num'
+ req.environ['CONTENT_LENGTH'] = str(len(req.body))
+ req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
+
+ self.mw.config.debug = True
+ r = self.widget(redirect="/foo").request(req)
+ assert( r.status_int == 302 and r.location=="/foo" )
+
+ def test_request_get(self):
+ environ = {'REQUEST_METHOD': 'GET'}
+ req=Request(environ)
+ r = self.widget().request(req)
+ tw2test.assert_eq_xml(r.body, """
+<html>
+<head><title>Test</title></head>
+<body id="autoedit:page"><h1>Test</h1><form method="post" id="autoedit:form" enctype="multipart/form-data">
+ <span class="error"></span>
+ <table id="autoedit">
+ <tr class="odd required" id="autoedit:name:container">
+ <th>Name</th>
+ <td>
+ <input name="dbformpage_d:name" type="text" id="autoedit:name" />
+ <span id="autoedit:name:error"></span>
+ </td>
+ </tr><tr class="even" id="autoedit:account:fieldset:container">
+ <th>Account</th>
+ <td>
+ <fieldset id="autoedit:account:fieldset">
+ <legend></legend>
+ <table id="autoedit:account">
+ <tr class="odd" id="autoedit:account:account_name:container">
+ <th>Account Name</th>
+ <td>
+ <input name="account:account_name" type="text" id="autoedit:account:account_name" />
+ <span id="autoedit:account:account_name:error"></span>
+ </td>
+ </tr>
+ <tr class="even" id="autoedit:account:account_number:container">
+ <th>Account Number</th>
+ <td>
+ <input name="account:account_number" type="text" id="autoedit:account:account_number" />
+ <span id="autoedit:account:account_number:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:account:error"></span>
+ </td></tr>
+ </table>
+ </fieldset>
+ <span id="autoedit:account:fieldset:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:error"></span>
+ </td></tr>
+ </table>
+ <input type="submit" value="Save" />
+</form></body>
+</html>
+""")
+
+ def test_request_post_invalid(self):
+ environ = {'REQUEST_METHOD': 'POST',
+ 'wsgi.input': StringIO(''),
+ }
+ req=Request(environ)
+ r = self.widget().request(req)
+ tw2test.assert_eq_xml(r.body, """<html>
+<head><title>Test</title></head>
+<body id="autoedit:page"><h1>Test</h1><form method="post" id="autoedit:form" enctype="multipart/form-data">
+ <span class="error"></span>
+ <table id="autoedit">
+ <tr class="odd required error" id="autoedit:name:container">
+ <th>Name</th>
+ <td>
+ <input name="dbformpage_d:name" type="text" value="" id="autoedit:name" />
+ <span id="autoedit:name:error">Enter a value</span>
+ </td>
+ </tr>
+ <tr class="even" id="autoedit:account:fieldset:container">
+ <th>Account</th>
+ <td>
+ <fieldset id="autoedit:account:fieldset">
+ <legend></legend>
+ <table id="autoedit:account">
+ <tr class="odd" id="autoedit:account:account_name:container">
+ <th>Account Name</th>
+ <td>
+ <input name="account:account_name" type="text" value="" id="autoedit:account:account_name" />
+ <span id="autoedit:account:account_name:error"></span>
+ </td>
+ </tr><tr class="even" id="autoedit:account:account_number:container">
+ <th>Account Number</th>
+ <td>
+ <input name="account:account_number" type="text" value="" id="autoedit:account:account_number" />
+ <span id="autoedit:account:account_number:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:account:error"></span>
+ </td></tr>
+ </table>
+ </fieldset>
+ <span id="autoedit:account:fieldset:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:error"></span>
+ </td></tr>
+ </table>
+ <input type="submit" value="Save" />
+</form></body>
+</html>""")
+
+ def test_request_post_partial_onetoone_invalid(self):
+ environ = {'REQUEST_METHOD': 'POST',
+ 'wsgi.input': StringIO(''),
+ }
+ req=Request(environ)
+ req.method = 'POST'
+ req.body='autoedit:account:account_name=account2&autoedit:name=name2'
+ r = self.widget().request(req)
+ tw2test.assert_eq_xml(r.body, """<html>
+<head>
+ <title>Test</title>
+</head>
+<body id="autoedit:page"><h1>Test</h1><form method="post" id="autoedit:form" enctype="multipart/form-data">
+ <span class="error"></span>
+ <table id="autoedit">
+ <tr class="odd required" id="autoedit:name:container">
+ <th>Name</th>
+ <td>
+ <input name="dbformpage_d:name" type="text" value="name2" id="autoedit:name" />
+ <span id="autoedit:name:error"></span>
+ </td>
+ </tr><tr class="even" id="autoedit:account:fieldset:container">
+ <th>Account</th>
+ <td>
+ <fieldset id="autoedit:account:fieldset">
+ <legend></legend>
+ <table id="autoedit:account">
+ <tr class="odd" id="autoedit:account:account_name:container">
+ <th>Account Name</th>
+ <td>
+ <input name="account:account_name" type="text" value="account2" id="autoedit:account:account_name" />
+ <span id="autoedit:account:account_name:error"></span>
+ </td>
+ </tr><tr class="even error" id="autoedit:account:account_number:container">
+ <th>Account Number</th>
+ <td>
+ <input name="account:account_number" type="text" value="" id="autoedit:account:account_number" />
+ <span id="autoedit:account:account_number:error">Enter a value</span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:account:error"></span>
+ </td></tr>
+ </table>
+ </fieldset>
+ <span id="autoedit:account:fieldset:error"></span>
+ </td>
+ </tr>
+ <tr class="error"><td colspan="2">
+ <span id="autoedit:error"></span>
+ </td></tr>
+</table>
+ <input type="submit" value="Save" />
+</form></body>
+</html>""")
+
+ def test_request_post_valid(self):
+ environ = {'wsgi.input': StringIO('')}
+ req=Request(environ)
+ req.method = 'POST'
+ req.body='autoedit:name=name2'
+ req.environ['CONTENT_LENGTH'] = str(len(req.body))
+ req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
+
+ self.mw.config.debug = True
+ r = self.widget().request(req)
+ assert r.body == """Form posted successfully {'account': None, 'name': u'name2'}""", r.body
+
+ def test_request_post_full_valid(self):
+ environ = {'wsgi.input': StringIO('')}
+ req=Request(environ)
+ req.method = 'POST'
+ req.body='autoedit:account:account_name=account2&autoedit:account:account_number=number2&autoedit:name=name2'
+ req.environ['CONTENT_LENGTH'] = str(len(req.body))
+ req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
+
+ self.mw.config.debug = True
+ r = self.widget().request(req)
+ assert r.body == """Form posted successfully {'account': {'account_name': u'account2', 'account_number': u'number2'}, 'name': u'name2'}""", r.body
+
+ def test_request_post_counts_new(self):
+ environ = {'wsgi.input': StringIO('')}
+ req=Request(environ)
+ req.method = 'POST'
+ req.body='autoedit:account:account_name=account2&autoedit:account:account_number=number2&autoedit:name=name2'
+ req.environ['CONTENT_LENGTH'] = str(len(req.body))
+ req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
+
+ self.mw.config.debug = True
+ assert(self.DbTestCls12.query.count() == 1)
+ assert(self.DbTestCls11.query.count() == 1)
+ r = self.widget().request(req)
+ assert(self.DbTestCls12.query.count() == 2)
+ assert(self.DbTestCls11.query.count() == 2)
+
+ def test_request_post_counts_new_no_onetoone(self):
+ environ = {'wsgi.input': StringIO('')}
+ req=Request(environ)
+ req.method = 'POST'
+ req.body='autoedit:name=name2'
+ req.environ['CONTENT_LENGTH'] = str(len(req.body))
+ req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
+
+ self.mw.config.debug = True
+ assert(self.DbTestCls12.query.count() == 1)
+ assert(self.DbTestCls11.query.count() == 1)
+ r = self.widget().request(req)
+ assert(self.DbTestCls12.query.count() == 2)
+ assert(self.DbTestCls11.query.count() == 1)
+
+ def test_request_post_counts_update(self):
+ environ = {'wsgi.input': StringIO(''), 'QUERY_STRING': 'id=1'}
+ req=Request(environ)
+ # req.GET['id'] = '1'
+ req.method = 'POST'
+ req.body='autoedit:account:account_name=account2&autoedit:account:account_number=number2&autoedit:name=bob2'
+ req.environ['CONTENT_LENGTH'] = str(len(req.body))
+ req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
+
+ self.mw.config.debug = True
+ assert(self.DbTestCls12.query.count() == 1)
+ assert(self.DbTestCls11.query.count() == 1)
+ r = self.widget().request(req)
+ assert(self.DbTestCls12.query.count() == 1)
+ assert(self.DbTestCls11.query.count() == 1)
+
+ def test_request_post_counts_update_no_onetoone(self):
+ environ = {'wsgi.input': StringIO(''), 'QUERY_STRING': 'id=1'}
+ req=Request(environ)
+ # req.GET['id'] = '1'
+ req.method = 'POST'
+ req.body='autoedit:name=bob2'
+ req.environ['CONTENT_LENGTH'] = str(len(req.body))
+ req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
+
+ self.mw.config.debug = True
+ assert(self.DbTestCls12.query.count() == 1)
+ assert(self.DbTestCls11.query.count() == 1)
+ r = self.widget().request(req)
+ assert(self.DbTestCls12.query.count() == 1)
+ assert(self.DbTestCls11.query.count() == 0)
+
+ def test_request_post_content_update(self):
+ environ = {'wsgi.input': StringIO(''), 'QUERY_STRING': 'id=1'}
+ req=Request(environ)
+ # req.GET['id'] = '1'
+ req.method = 'POST'
+ req.body='autoedit:account:account_name=account2&autoedit:account:account_number=number2&autoedit:name=bob2'
+ req.environ['CONTENT_LENGTH'] = str(len(req.body))
+ req.environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
+
+ self.mw.config.debug = True
+ original = self.DbTestCls12.query.filter(self.DbTestCls12.id==1).one()
+ assert(original.name == 'bob1')
+ original = self.DbTestCls11.query.filter(self.DbTestCls11.id==2).one()
+ assert(original.account_name == 'account1')
+ r = self.widget().request(req)
+ updated = self.DbTestCls12.query.filter(self.DbTestCls12.id==1)
+ assert(updated.count() == 1)
+ updated = updated.one()
+ assert(updated.name == 'bob2')
+ updated = self.DbTestCls11.query.filter(self.DbTestCls11.id==2)
+ assert(updated.count() == 1)
+ updated = updated.one()
+ assert(updated.account_name == 'account2')
+ assert(updated.account_number == 'number2')
+
+if el:
+ class TestNonRequiredOneToOneRelationElixir(
+ ElixirBase, NonRequiredOneToOneRelationT): pass
+
+class TestNonRequiredOneToOneRelationSQLA(SQLABase,
+ NonRequiredOneToOneRelationT): pass
+
+
class AutoTableFormAsChildT(WidgetTest):
def setup(self):
self.widget = self.widget(entity=self.DbTestCls7)
View
77 tw2/sqla/factory.py
@@ -1,6 +1,13 @@
import tw2.core as twc, tw2.forms as twf, sqlalchemy as sa, sys
import sqlalchemy.types as sat, tw2.dynforms as twd
from widgets import *
+from utils import (
+ is_relation,
+ is_onetoone,
+ is_manytomany,
+ is_manytoone,
+ is_onetomany,
+)
import compat
@@ -13,53 +20,6 @@ def product(x, y):
for j in y:
yield (i,j)
-def is_relation(prop):
- return isinstance(prop, sa.orm.RelationshipProperty)
-
-def is_onetoone(prop):
- if not is_relation(prop):
- return False
-
- if prop.direction == sa.orm.interfaces.ONETOMANY:
- if not prop.uselist:
- return True
-
- if prop.direction == sa.orm.interfaces.MANYTOONE:
- lis = list(prop._reverse_property)
- assert len(lis) == 1
- if not lis[0].uselist:
- return True
-
- return False
-
-def is_manytomany(prop):
- return is_relation(prop) and \
- prop.direction == sa.orm.interfaces.MANYTOMANY
-
-def is_manytoone(prop):
- if not is_relation(prop):
- return False
-
- if not prop.direction == sa.orm.interfaces.MANYTOONE:
- return False
-
- if is_onetoone(prop):
- return False
-
- return True
-
-def is_onetomany(prop):
- if not is_relation(prop):
- return False
-
- if not prop.direction == sa.orm.interfaces.ONETOMANY:
- return False
-
- if is_onetoone(prop):
- return False
-
- return True
-
def sort_properties(localname_from_relationname, localname_creation_order):
"""Returns a function which will sort the SQLAlchemy properties
"""
@@ -163,6 +123,10 @@ class WidgetPolicy(object):
If the property does not match any of the other selectors, this is used.
If this is None then an error is raised for properties that do not match.
+ `add_edit_link`
+ Boolean. If True and tws_edit_link is defined as param on the class, we
+ add a link to edit this object.
+
Alternatively, the `factory` method can be overriden to provide completely
customised widget selection.
"""
@@ -225,11 +189,13 @@ def factory(cls, prop):
raise twc.WidgetError(
"Cannot automatically create a widget " +
"for one-to-one relation '%s'" % prop.key)
+ required = required_widget(prop)
widget = cls.onetoone_widget(
id=prop.key,
entity=prop.mapper.class_,
- required=required_widget(prop),
- reverse_property_name=get_reverse_property_name(prop)
+ required=required,
+ reverse_property_name=get_reverse_property_name(prop),
+ required_on_parent=(not required),
)
elif prop.key in cls.name_widgets:
widget = cls.name_widgets[prop.key]
@@ -369,6 +335,13 @@ def child_filter(w):
w.key not in [W.key for W in new_children]
new_children.extend(filter(child_filter, orig_children))
+
+ cls.required_children = []
+ if getattr(cls, 'required_on_parent', False):
+ for c in new_children:
+ if c.validator.required:
+ cls.required_children += [c]
+ c.validator.required = False
cls.child = cls.child(children=new_children, entity=cls.entity)
@@ -391,7 +364,11 @@ class AutoEditFieldSet(AutoContainer, twf.TableFieldSet):
def post_define(cls):
if getattr(cls, 'entity', None):
required = getattr(cls, 'required', False)
- cls.validator = RelatedOneToOneValidator(entity=cls.entity, required=required)
+ required_children = getattr(cls, 'required_children', None)
+ cls.validator = RelatedOneToOneValidator(
+ entity=cls.entity,
+ required=required,
+ required_children=required_children)
# This is assigned here and not above because of a circular dep.
ViewPolicy.onetomany_widget = DbListLinkField
View
63 tw2/sqla/utils.py
@@ -1,6 +1,58 @@
import sqlalchemy as sa
+def is_relation(prop):
+ return isinstance(prop, sa.orm.RelationshipProperty)
+
+
+def is_onetoone(prop):
+ if not is_relation(prop):
+ return False
+
+ if prop.direction == sa.orm.interfaces.ONETOMANY:
+ if not prop.uselist:
+ return True
+
+ if prop.direction == sa.orm.interfaces.MANYTOONE:
+ lis = list(prop._reverse_property)
+ assert len(lis) == 1
+ if not lis[0].uselist:
+ return True
+
+ return False
+
+
+def is_manytomany(prop):
+ return is_relation(prop) and \
+ prop.direction == sa.orm.interfaces.MANYTOMANY
+
+
+def is_manytoone(prop):
+ if not is_relation(prop):
+ return False
+
+ if not prop.direction == sa.orm.interfaces.MANYTOONE:
+ return False
+
+ if is_onetoone(prop):
+ return False
+
+ return True
+
+
+def is_onetomany(prop):
+ if not is_relation(prop):
+ return False
+
+ if not prop.direction == sa.orm.interfaces.ONETOMANY:
+ return False
+
+ if is_onetoone(prop):
+ return False
+
+ return True
+
+
def from_dict(obj, data, protect_prm_tamp=True):
"""
Update a mapped object with data from a JSON-style nested dict/list
@@ -9,16 +61,16 @@ def from_dict(obj, data, protect_prm_tamp=True):
To protect against parameter tampering attacks, primary key fields are
never overwritten.
"""
-
mapper = sa.orm.object_mapper(obj)
pk_props = set(p.key for p in mapper.primary_key)
for key, value in data.iteritems():
+ prop = mapper.get_property(key)
if isinstance(value, dict):
if hasattr(obj, key):
record = getattr(obj, key)
if not record:
- record = mapper.get_property(key).mapper.class_()
+ record = prop.mapper.class_()
setattr(obj, key, record)
from_dict(record, value, protect_prm_tamp)
else:
@@ -28,12 +80,17 @@ def from_dict(obj, data, protect_prm_tamp=True):
elif isinstance(value, list) and \
value and isinstance(value[0], dict):
from_list(
- mapper.get_property(key).mapper.class_,
+ prop.mapper.class_,
getattr(obj, key),
value,
protect_prm_tamp=protect_prm_tamp
)
elif key not in pk_props:
+ if value is None:
+ old_v = getattr(obj, key, None)
+ if is_onetoone(prop) and old_v is not None:
+ # Delete the old value from the DB
+ old_v.query.delete()
setattr(obj, key, value)
return obj
View
24 tw2/sqla/widgets.py
@@ -93,10 +93,11 @@ class RelatedOneToOneValidator(twc.Validator):
This validator should be used for the one to one relation.
"""
- def __init__(self, entity, required=False, **kw):
+ def __init__(self, entity, required=False, required_children=None, **kw):
super(RelatedOneToOneValidator, self).__init__(**kw)
self.required=required
self.entity = entity
+ self.required_children = required_children
def to_python(self, value, state=None):
"""We just validate, there is at least one value
@@ -114,10 +115,29 @@ def has_value(dic):
if v:
return True
return False
-
+
if self.required:
if not has_value(value):
raise twc.ValidationError('required', self)
+ elif self.required_children:
+ if not has_value(value):
+ # No problem, no value posted.
+ # We return None to make sure we will delete the onetoone field
+ # or not create it
+ return None
+
+ error_dict = {}
+ for c in self.required_children:
+ v = value.get(c.key)
+ if v is twc.Invalid:
+ continue
+ if not v:
+ error_dict[c.key] = self.msgs['required']
+ if error_dict:
+ e = twc.ValidationError('required', self)
+ e.error_dict = error_dict
+ raise e
+
return value
def from_python(self, value, state=None):

0 comments on commit aab98fa

Please sign in to comment.