Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Webpy 0.23 #208

Open
wants to merge 29 commits into from

2 participants

@noQ

No description provided.

anandology added some commits
@anandology anandology Aaron's htmlunquote (pulled from webpy.dev) c791305
@anandology anandology fix indentation in ThreadedDict (pulled from webpy.dev) 0c9d268
@anandology anandology New functions: sendmail, emailerrors (pulled from webpy.dev) ee1c499
@anandology anandology smarter web.sendmail (WARNING: backwards incompatible API change) (pu…
…lled from webpy.dev)
0c8631e
@anandology anandology update debugerror to use new API (pulled from webpy.dev) bdcd373
@anandology anandology typo in web.sendmail (pulled from webpy.dev) 9162d39
@anandology anandology reorganized directory structure. (pulled from webpy.dev) 195056a
@anandology anandology typo in web.sendmail (pulled from webpy.dev) 719005d
@anandology anandology support for multiple recipients and smtp authentication in sendmail (…
…pulled from webpy.dev)
44845e3
@anandology anandology support for secure cookies (pulled from webpy.dev) 7223775
@anandology anandology Added support for https. tx ulrik. d042fdc
@anandology anandology fix in web.reparam: making a copy of dictionary because eval mangles it 62d9019
@anandology anandology fix: unicode support in db.sqllist. (bug#137042) 175c8b6
@anandology anandology support for cc and bcc in web.sendmail 247d501
@anandology anandology fix: transaction error in with statement a9df657
@anandology anandology only uppercase HTTP methods are allowed. (fix for bug@#176415) be46bf1
@anandology anandology fix: web.profile behavior in python 2.5 (Bug#133080) 90433b7
@anandology anandology web.py test suite. e3b2276
@anandology anandology fix for postgres unicode bug (Bug#177265) 2fb8483
@anandology anandology fix for web.background gotcha (Bug#133079) 07ae511
@anandology anandology fix for postgres unicode bug (Bug#177265) when using with polling. dd2fbdb
@anandology anandology fix for Bug#180134 - Error in one query makes subsequent queries fail 534f7ab
@anandology anandology fixed unicode issue in template.py (Bug#180510) and converted tests t…
…o doctests.
d74123c
@anandology anandology more tests 74146b4
@anandology anandology fixed error with unicode message in sendmail (Bug#180549) 4ab0b84
@anandology anandology fixed unicode issues in form.py (Bug#180653) 3c82303
@anandology anandology fixed another unicode issue in template.py (Bug#180510) 2eb1b9c
@anandology anandology changed version to 0.23 dc06d89
@anandology anandology test.py is not used any more. 7cf8fff
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 15, 2007
  1. @anandology
  2. @anandology
  3. @anandology
  4. @anandology
  5. @anandology
  6. @anandology
  7. @anandology
  8. @anandology
  9. @anandology
  10. @anandology
  11. @anandology
  12. @anandology
  13. @anandology
  14. @anandology
  15. @anandology
Commits on Dec 19, 2007
  1. @anandology
Commits on Dec 24, 2007
  1. @anandology
  2. @anandology

    web.py test suite.

    anandology authored
Commits on Dec 27, 2007
  1. @anandology
  2. @anandology
Commits on Jan 3, 2008
  1. @anandology
  2. @anandology
Commits on Jan 5, 2008
  1. @anandology
  2. @anandology

    more tests

    anandology authored
  3. @anandology
Commits on Jan 6, 2008
  1. @anandology
Commits on Jan 19, 2008
  1. @anandology
  2. @anandology

    changed version to 0.23

    anandology authored
  3. @anandology
This page is out of date. Refresh to see the latest.
View
27 README.tests
@@ -0,0 +1,27 @@
+# web.py unit tests
+
+## Setup
+
+All databases expect a database with name `webpy` with username `scott` and password `tiger`.
+
+## Running all tests
+
+To run all tests:
+
+ $ python test/alltests.py
+
+## Running individual tests
+
+To run all tests in a file:
+
+ $ python test/db.py
+
+To run all tests in a class:
+
+ $ python test/db.py SqliteTest
+
+To run a single test:
+
+ $ python test/db.py SqliteTest.testUnicode
+
+
View
0  trunk/experimental/pwt.py → experimental/pwt.py
File renamed without changes
View
0  trunk/experimental/untwisted.py → experimental/untwisted.py
File renamed without changes
View
2  trunk/setup.py → setup.py
@@ -5,7 +5,7 @@
from distutils.core import setup
setup(name='web.py',
- version='0.22',
+ version='0.23',
description='web.py: makes web apps',
author='Aaron Swartz',
author_email='me@aaronsw.com',
View
0  test/__init__.py
No changes.
View
8 test/alltests.py
@@ -0,0 +1,8 @@
+import webtest
+
+def suite():
+ modules = ["doctests", "db", "form"]
+ return webtest.suite(modules)
+
+if __name__ == "__main__":
+ webtest.main()
View
90 test/db.py
@@ -0,0 +1,90 @@
+"""DB test"""
+import webtest
+import web
+
+class DBTest(webtest.TestCase):
+ dbname = 'postgres'
+
+ def setUp(self):
+ webtest.setup_database(self.dbname)
+ web.query("CREATE TABLE person (name text, email text)")
+
+ def tearDown(self):
+ # there might be some error with the current connection, delete from a new connection
+ webtest.setup_database(self.dbname)
+ web.query('DROP TABLE person')
+
+ def testUnicode(self):
+ """Bug#177265: unicode queries throw errors"""
+ web.select('person', where='name=$name', vars={'name': u'\xf4'})
+
+ def testWrongQuery(self):
+ # It should be possible to run a correct query after getting an error from a wrong query.
+ try:
+ web.select('wrong_table')
+ except:
+ pass
+ web.select('person')
+
+ def assertRows(self, n):
+ result = web.select('person')
+ self.assertEquals(len(list(result)), n)
+
+ def testCommit(self):
+ web.transact()
+ web.insert('person', False, name='user1')
+ web.commit()
+
+ web.transact()
+ web.insert('person', False, name='user2')
+ web.insert('person', False, name='user3')
+ web.commit()
+ self.assertRows(3)
+
+ def testRollback(self):
+ web.transact()
+ web.insert('person', False, name='user1')
+ web.insert('person', False, name='user2')
+ web.insert('person', False, name='user3')
+ web.rollback()
+ self.assertRows(0)
+
+ def testNestedTransactions(self):
+ web.transact()
+ web.insert('person', False, name='user1')
+ self.assertRows(1)
+
+ web.transact()
+ web.insert('person', False, name='user2')
+ self.assertRows(2)
+ web.rollback()
+ self.assertRows(1)
+
+ web.transact()
+ web.insert('person', False, name='user3')
+ web.commit()
+
+ self.assertRows(2)
+ web.commit()
+
+
+
+
+class SqliteTest(DBTest):
+ dbname = "sqlite"
+
+ def testNestedTransactions(self):
+ # nested transactions are not supported in sqlite
+ pass
+
+class MySQLTest(DBTest):
+ dbname = "mysql"
+
+ def setUp(self):
+ web.config._hasPooling = False
+ webtest.setup_database(self.dbname)
+ # In mysql, transactions are supported only with INNODB engine.
+ web.query("CREATE TABLE person (name text, email text) ENGINE InnoDB")
+
+if __name__ == '__main__':
+ webtest.main()
View
10 test/doctests.py
@@ -0,0 +1,10 @@
+"""Run all doctests in web.py.
+"""
+import webtest
+
+def suite():
+ modules = ["web.utils", "web.db", "web.net", "web.wsgi", "web.http", "web.webapi", "web.request", "web.template"]
+ return webtest.doctest_suite(modules)
+
+if __name__ == "__main__":
+ webtest.main()
View
16 test/form.py
@@ -0,0 +1,16 @@
+import webtest
+from web import form
+
+class FormTest(webtest.TestCase):
+ def testUnicode(self):
+ t = form.Textbox('first_name', maxlength='255', description=u'\u1234', value=u'\u1234')
+ f = form.Form(t)
+ f.render() # this fails if unicode is not supported.
+
+ t = form.Textbox('first_name', maxlength='255', description=u'\u1234', value=u'\u1234')
+ t.note = u'\u1234'
+ f = form.Form(t)
+ f.render() # this fails if unicode is not supported.
+
+if __name__ == "__main__":
+ webtest.main()
View
58 test/webtest.py
@@ -0,0 +1,58 @@
+"""webtest: test utilities.
+"""
+import unittest
+from unittest import TestCase
+import sys, os
+
+# adding current directory to path to make sure local copy of web module is used.
+sys.path.insert(0, '.')
+
+import web
+
+def runTests(suite):
+ runner = unittest.TextTestRunner()
+ return runner.run(suite)
+
+def main(suite=None):
+ if not suite:
+ main_module = __import__('__main__')
+ suite = module_suite(main_module, sys.argv[1:] or None)
+
+ result = runTests(suite)
+ sys.exit(not result.wasSuccessful())
+
+def load_modules(names):
+ return [__import__(name, None, None, "x") for name in names]
+
+def module_suite(module, classnames=None):
+ """Makes a suite from a module."""
+ if hasattr(module, 'suite'):
+ return module.suite()
+ elif classnames:
+ return unittest.TestLoader().loadTestsFromNames(classnames, module)
+ else:
+ return unittest.TestLoader().loadTestsFromModule(module)
+
+def doctest_suite(module_names):
+ """Makes a test suite from doctests."""
+ import doctest
+ suite = unittest.TestSuite()
+ for mod in load_modules(module_names):
+ suite.addTest(doctest.DocTestSuite(mod))
+ return suite
+
+def suite(module_names):
+ """Creates a suite from multiple modules."""
+ suite = unittest.TestSuite()
+ for mod in load_modules(module_names):
+ suite.addTest(module_suite(mod))
+ return suite
+
+def setup_database(dbname):
+ if dbname == 'sqlite':
+ web.config.db_parameters = dict(dbn=dbname, db='webpy.db')
+ else:
+ web.config.db_parameters = dict(dbn=dbname, db='webpy', user='scott', pw='tiger')
+
+ #web.config.db_printing=True
+ web.load()
View
0  _makedoc.py → tools/_makedoc.py
File renamed without changes
View
0  markdown.py → tools/markdown.py
File renamed without changes
View
58 trunk/test.py
@@ -1,58 +0,0 @@
-"""
-web.py test suite.
-"""
-
-import sys
-import web
-import unittest
-
-def commit(value):
- web.transact()
- web.insert("test", seqname=False, value=value)
- web.commit()
-
-def rollback(value):
- web.transact()
- web.insert("test", seqname=False, value=value)
- web.rollback()
-
-def getrow(value):
- d = web.select('test', where='value=$value', vars=locals())
- a = (d and d[0].value) or None
- return a
-
-def assert_presence(*values):
- for v in values:
- assert getrow(v) is not None
-
-def assert_absence(*values):
- for v in values:
- assert getrow(v) is None
-
-class PostgresTest(unittest.TestCase):
- parameters = dict(dbn="postgres", db="webtest", user="postgres", pw="")
-
- def setUp(self):
- web.config.db_parameters = self.parameters
- #web.config.db_printing = True
- web.load()
- web.delete("test", where="1=1")
-
- def tearDown(self):
- sys.stdout.flush()
- sys.stderr.flush()
-
- def testCommit(self):
- commit(1)
- assert_presence(1)
-
- def testRollback(self):
- rollback(1)
- assert_absence(1)
-
-class MySQLTest(PostgresTest):
- parameters = dict(dbn="mysql", db="webtest", user="webpy", pw="")
-
-if __name__ == "__main__":
- unittest.main()
-
View
2  trunk/web/__init__.py → web/__init__.py
@@ -2,7 +2,7 @@
from __future__ import generators
"""web.py: makes web apps (http://webpy.org)"""
-__version__ = "0.22"
+__version__ = "0.23"
__revision__ = "$Rev$"
__author__ = "Aaron Swartz <me@aaronsw.com>"
__license__ = "public domain"
View
0  trunk/web/cheetah.py → web/cheetah.py
File renamed without changes
View
37 trunk/web/db.py → web/db.py
@@ -147,6 +147,8 @@ def reparam(string_, dictionary):
>>> reparam("s = $s", dict(s=True))
<sql: "s = 't'">
"""
+ # making a copy of dictionary because eval mangles it
+ dictionary = dictionary.copy()
vals = []
result = []
for live, chunk in _interpolate(string_):
@@ -258,6 +260,10 @@ def connect(dbn, **keywords):
if dbn == "postgres":
try:
import psycopg2 as db
+
+ # fix for Bug#177265
+ import psycopg2.extensions
+ psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
except ImportError:
try:
import psycopg as db
@@ -322,7 +328,19 @@ def db_cursor():
globals()['db'] = _PooledDB(db, keywords)
web.ctx.db = globals()['db'].connection()
else:
- web.ctx.db = db.connect(**keywords)
+ web.ctx.db = db.connect(**keywords)
+
+ # fix for Bug#177265
+ if web.ctx.get('db_name') == "postgres":
+ try:
+ if web.config._hasPooling:
+ # hack to call a function in actual db connection
+ web.ctx.db._con._con.set_client_encoding('UTF8')
+ else:
+ web.ctx.db.set_client_encoding('UTF8')
+ except Exception, e:
+ print >> web.debug, 'Error in setting utf-8 encoding:', str(e), '(ignored)'
+
return web.ctx.db.cursor()
web.ctx.db_cursor = db_cursor
@@ -340,7 +358,7 @@ def db_execute(cur, sql_query, dorollback=True):
except:
if web.config.get('db_printing'):
print >> web.debug, 'ERR:', str(sql_query)
- if dorollback: rollback(care=False)
+ if dorollback and not web.ctx.db_transaction: web.ctx.db.rollback()
raise
if web.config.get('db_printing'):
@@ -360,11 +378,13 @@ class transaction:
end.
"""
def __enter__(self):
+ self.db_transaction = web.ctx.db_transaction
transact()
def __exit__(self, exctype, excvalue, traceback):
if exctype is not None:
- rollback()
+ while self.db_transaction < web.ctx.db_transaction:
+ rollback()
else:
commit()
@@ -398,7 +418,7 @@ def rollback(care=True):
"""Rolls back a transaction."""
web.ctx.db_transaction -= 1
if web.ctx.db_transaction < 0:
- web.db_transaction = 0
+ web.ctx.db_transaction = 0
if care:
raise TransactionError, "not in a transaction"
else:
@@ -460,11 +480,12 @@ def sqllist(lst):
>>> sqllist(['a', 'b'])
'a, b'
- >>> sqllist('a')
- 'a'
-
+ >>> sqllist('foo')
+ 'foo'
+ >>> sqllist(u'foo')
+ u'foo'
"""
- if isinstance(lst, str):
+ if isinstance(lst, (str, unicode)):
return lst
else:
return ', '.join(lst)
View
48 trunk/web/debugerror.py → web/debugerror.py
@@ -2,17 +2,18 @@
pretty debug errors
(part of web.py)
-adapted from Django <djangoproject.com>
+portions adapted from Django <djangoproject.com>
Copyright (c) 2005, the Lawrence Journal-World
Used under the modified BSD license:
http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
"""
-__all__ = ["debugerror", "djangoerror"]
+__all__ = ["debugerror", "djangoerror", "emailerrors"]
-import sys, urlparse, pprint
+import sys, urlparse, pprint, traceback
from net import websafe
from template import Template
+from utils import sendmail
import webapi as web
import os, os.path
@@ -303,6 +304,47 @@ def debugerror():
web.ctx.headers = [('Content-Type', 'text/html')]
web.ctx.output = djangoerror()
+def emailerrors(email_address, olderror):
+ """
+ Wraps the old `internalerror` handler (pass as `olderror`) to
+ additionally email all errors to `email_address`, to aid in
+ debugging production websites.
+
+ Emails contain a normal text traceback as well as an
+ attachment containing the nice `debugerror` page.
+ """
+ def emailerrors_internal():
+ olderror()
+ tb = sys.exc_info()
+ error_name = tb[0]
+ error_value = tb[1]
+ tb_txt = ''.join(traceback.format_exception(*tb))
+ path = web.ctx.path
+ request = web.ctx.method+' '+web.ctx.home+web.ctx.fullpath
+ eaddr = email_address
+ text = ("""\
+------here----
+Content-Type: text/plain
+Content-Disposition: inline
+
+%(request)s
+
+%(tb_txt)s
+
+------here----
+Content-Type: text/html; name="bug.html"
+Content-Disposition: attachment; filename="bug.html"
+
+""" % locals()) + str(djangoerror())
+ sendmail(
+ "your buggy site <%s>" % eaddr,
+ "the bugfixer <%s>" % eaddr,
+ "bug: %(error_name)s: %(error_value)s (%(path)s)" % locals(),
+ text,
+ headers={'Content-Type': 'multipart/mixed; boundary="----here----"'})
+
+ return emailerrors_internal
+
if __name__ == "__main__":
urls = (
'/', 'index'
View
4 trunk/web/form.py → web/form.py
@@ -29,14 +29,14 @@ def render(self):
out += self.rendernote(self.note)
out += '<table>\n'
for i in self.inputs:
- out += ' <tr><th><label for="%s">%s</label></th>' % (i.id, i.description)
+ out += ' <tr><th><label for="%s">%s</label></th>' % (i.id, net.websafe(i.description))
out += "<td>"+i.pre+i.render()+i.post+"</td>"
out += '<td id="note_%s">%s</td></tr>\n' % (i.id, self.rendernote(i.note))
out += "</table>"
return out
def rendernote(self, note):
- if note: return '<strong class="wrong">%s</strong>' % note
+ if note: return '<strong class="wrong">%s</strong>' % net.websafe(note)
else: return ""
def validates(self, source=None, _validate=True, **kw):
View
12 trunk/web/http.py → web/http.py
@@ -164,7 +164,12 @@ def url(path=None, **kw):
return out
def background(func):
- """A function decorator to run a long-running function as a background thread."""
+ """A function decorator to run a long-running function as a background thread.
+
+ GOTCHA in postgres: Until the foreground task ends, any db access by background
+ task will necessarily get old data from before the foreground task started
+ because psycopg2 begins a transaction in the foreground task until it quits.
+ """
def internal(*a, **kw):
web.data() # cache it
@@ -173,6 +178,11 @@ def internal(*a, **kw):
def newfunc():
web._context[threading.currentThread()] = tmpctx
+ # Create new db cursor if there is one else background thread
+ # overwrites foreground cursor causing rubbish data into dbase
+ if web.config.get('db_parameters'):
+ import db
+ db.connect(**web.config.db_parameters)
func(*a, **kw)
myctx = web._context[threading.currentThread()]
for k in myctx.keys():
View
0  trunk/web/httpserver.py → web/httpserver.py
File renamed without changes
View
16 trunk/web/net.py → web/net.py
@@ -7,7 +7,7 @@
"validipaddr", "validipport", "validip", "validaddr",
"urlquote",
"httpdate", "parsehttpdate",
- "htmlquote", "websafe",
+ "htmlquote", "htmlunquote", "websafe",
]
import urllib, time
@@ -132,6 +132,20 @@ def htmlquote(text):
text = text.replace('"', "&quot;")
return text
+def htmlunquote(text):
+ """
+ Decodes `text` that's HTML quoted.
+
+ >>> htmlunquote('&lt;&#39;&amp;&quot;&gt;')
+ '<\\'&">'
+ """
+ text = text.replace("&quot;", '"')
+ text = text.replace("&#39;", "'")
+ text = text.replace("&gt;", ">")
+ text = text.replace("&lt;", "<")
+ text = text.replace("&amp;", "&") # Must be done last!
+ return text
+
def websafe(val):
"""
Converts `val` so that it's safe for use in UTF-8 HTML.
View
0  trunk/web/request.py → web/request.py
File renamed without changes
View
179 trunk/web/template.py → web/template.py
@@ -607,7 +607,7 @@ def h_getattr(self, i):
def h_call(self, i):
call = self.h(i[THING])
args = [self.h(x) for x in i[ARGS]]
- kw = dict([(x, self.h(y)) for (x, y) in i[KWARGS]])
+ kw = dict([(str(x), self.h(y)) for (x, y) in i[KWARGS]])
return call(*args, **kw)
def h_getitem(self, i):
@@ -677,9 +677,8 @@ def h_var(self, i):
def h_line(self, i):
out = []
for x in i[THING]:
- #@@ what if x is unicode
- if isinstance(x, str):
- out.append(x)
+ if isinstance(x, (unicode, str)):
+ out.append(utf8(x))
elif x[WHAT] == 'itpl':
o = self.h(x[NAME])
if x[FILTER]:
@@ -797,82 +796,100 @@ def frender(fn, *a, **kw):
return Template(open(fn).read(), *a, **kw)
def test():
- import sys
- verbose = '-v' in sys.argv
- def assertEqual(a, b):
- if a == b:
- if verbose:
- sys.stderr.write('.')
- sys.stderr.flush()
- else:
- assert a == b, "\nexpected: %s\ngot: %s" % (repr(b), repr(a))
-
- from utils import storage, group
-
- class t:
- def __init__(self, text):
- self.text = text
-
- def __call__(self, *a, **kw):
- return TestResult(self.text, Template(self.text)(*a, **kw))
-
- class TestResult:
- def __init__(self, source, value):
- self.source = source
- self.value = value
-
- def __eq__(self, other):
- if self.value == other:
- if verbose:
- sys.stderr.write('.')
- else:
- print >> sys.stderr, 'FAIL:', repr(self.source), 'expected', repr(other), ', got', repr(self.value)
- sys.stderr.flush()
-
- t('1')() == '1\n'
- t('$def with ()\n1')() == '1\n'
- t('$def with (a)\n$a')(1) == '1\n'
- t('$def with (a=0)\n$a')(1) == '1\n'
- t('$def with (a=0)\n$a')(a=1) == '1\n'
- t('$if 1: 1')() == '1\n'
- t('$if 1:\n 1')() == '1\n'
- t('$if 0: 0\n$elif 1: 1')() == '1\n'
- t('$if 0: 0\n$elif None: 0\n$else: 1')() == '1\n'
- t('$if (0 < 1) and (1 < 2): 1')() == '1\n'
- t('$for x in [1, 2, 3]: $x')() == '1\n2\n3\n'
- t('$for x in []: 0\n$else: 1')() == '1\n'
- t('$def with (a)\n$while a and a.pop(): 1')([1, 2, 3]) == '1\n1\n1\n'
- t('$while 0: 0\n$else: 1')() == '1\n'
- t('$ a = 1\n$a')() == '1\n'
- t('$# 0')() == ''
- t('$def with (d)\n$for k, v in d.iteritems(): $k')({1: 1}) == '1\n'
- t('$def with (a)\n$(a)')(1) == '1\n'
- t('$def with (a)\n$a')(1) == '1\n'
- t('$def with (a)\n$a.b')(storage(b=1)) == '1\n'
- t('$def with (a)\n$a[0]')([1]) == '1\n'
- t('${0 or 1}')() == '1\n'
- t('$ a = [1]\n$a[0]')() == '1\n'
- t('$ a = {1: 1}\n$a.keys()[0]')() == '1\n'
- t('$ a = []\n$if not a: 1')() == '1\n'
- t('$ a = {}\n$if not a: 1')() == '1\n'
- t('$ a = -1\n$a')() == '-1\n'
- t('$ a = "1"\n$a')() == '1\n'
- t('$if 1 is 1: 1')() == '1\n'
- t('$if not 0: 1')() == '1\n'
- t('$if 1:\n $if 1: 1')() == '1\n'
- t('$ a = 1\n$a')() == '1\n'
- t('$ a = 1.\n$a')() == '1.0\n'
- t('$({1: 1}.keys()[0])')() == '1\n'
- t('$for x in [1, 2, 3]:\n\t$x')() == ' 1\n 2\n 3\n'
- t('$def with (a)\n$:a')(1) == '1\n'
- t('$def with (a)\n$a')(u'\u203d') == '\xe2\x80\xbd\n'
- t(u'$def with (f)\n$:f("x")')(lambda x: x) == 'x\n'
-
- j = Template("$var foo: bar")()
- assertEqual(str(j), '')
- assertEqual(j.foo, 'bar\n')
- if verbose: sys.stderr.write('\n')
-
+ r"""Doctest for testing template module.
+
+ >>> t = Template
+ >>> t('1')()
+ '1\n'
+ >>> t('$def with ()\n1')()
+ '1\n'
+ >>> t('$def with (a)\n$a')(1)
+ '1\n'
+ >>> t('$def with (a=0)\n$a')(1)
+ '1\n'
+ >>> t('$def with (a=0)\n$a')(a=1)
+ '1\n'
+ >>> t('$if 1: 1')()
+ '1\n'
+ >>> t('$if 1:\n 1')()
+ '1\n'
+ >>> t('$if 0: 0\n$elif 1: 1')()
+ '1\n'
+ >>> t('$if 0: 0\n$elif None: 0\n$else: 1')()
+ '1\n'
+ >>> t('$if (0 < 1) and (1 < 2): 1')()
+ '1\n'
+ >>> t('$for x in [1, 2, 3]: $x')()
+ '1\n2\n3\n'
+ >>> t('$for x in []: 0\n$else: 1')()
+ '1\n'
+ >>> t('$def with (a)\n$while a and a.pop(): 1')([1, 2, 3])
+ '1\n1\n1\n'
+ >>> t('$while 0: 0\n$else: 1')()
+ '1\n'
+ >>> t('$ a = 1\n$a')()
+ '1\n'
+ >>> t('$# 0')()
+ ''
+ >>> t('$def with (d)\n$for k, v in d.iteritems(): $k')({1: 1})
+ '1\n'
+ >>> t('$def with (a)\n$(a)')(1)
+ '1\n'
+ >>> t('$def with (a)\n$a')(1)
+ '1\n'
+ >>> t('$def with (a)\n$a.b')(storage(b=1))
+ '1\n'
+ >>> t('$def with (a)\n$a[0]')([1])
+ '1\n'
+ >>> t('${0 or 1}')()
+ '1\n'
+ >>> t('$ a = [1]\n$a[0]')()
+ '1\n'
+ >>> t('$ a = {1: 1}\n$a.keys()[0]')()
+ '1\n'
+ >>> t('$ a = []\n$if not a: 1')()
+ '1\n'
+ >>> t('$ a = {}\n$if not a: 1')()
+ '1\n'
+ >>> t('$ a = -1\n$a')()
+ '-1\n'
+ >>> t('$ a = "1"\n$a')()
+ '1\n'
+ >>> t('$if 1 is 1: 1')()
+ '1\n'
+ >>> t('$if not 0: 1')()
+ '1\n'
+ >>> t('$if 1:\n $if 1: 1')()
+ '1\n'
+ >>> t('$ a = 1\n$a')()
+ '1\n'
+ >>> t('$ a = 1.\n$a')()
+ '1.0\n'
+ >>> t('$({1: 1}.keys()[0])')()
+ '1\n'
+ >>> t('$for x in [1, 2, 3]:\n\t$x')()
+ ' 1\n 2\n 3\n'
+ >>> t('$def with (a)\n$:a')(1)
+ '1\n'
+ >>> t('$def with (a)\n$a')(u'\u203d')
+ '\xe2\x80\xbd\n'
+ >>> t(u'$def with (a)\n$a $:a')(u'\u203d')
+ '\xe2\x80\xbd \xe2\x80\xbd\n'
+ >>> t(u'$def with ()\nfoo')()
+ 'foo\n'
+ >>> def f(x): return x
+ ...
+ >>> t(u'$def with (f)\n$:f("x")')(f)
+ 'x\n'
+ >>> t(u'$def with (f)\n$:f(x="x")')(f)
+ 'x\n'
+ >>> x = t('$var foo: bar')()
+ >>> str(x)
+ ''
+ >>> x.foo
+ 'bar\n'
+ """
if __name__ == "__main__":
- test()
+ import doctest
+ doctest.testmod()
View
128 trunk/web/utils.py → web/utils.py
@@ -20,10 +20,11 @@
"ThreadedDict",
"autoassign",
"to36",
- "safemarkdown"
+ "safemarkdown",
+ "sendmail"
]
-import re, sys, time, threading
+import re, sys, time, threading, os
try: import datetime
except ImportError: pass
@@ -52,16 +53,16 @@ def __getattr__(self, key):
return self[key]
except KeyError, k:
raise AttributeError, k
-
+
def __setattr__(self, key, value):
self[key] = value
-
+
def __delattr__(self, key):
try:
del self[key]
except KeyError, k:
raise AttributeError, k
-
+
def __repr__(self):
return '<Storage ' + dict.__repr__(self) + '>'
@@ -623,14 +624,18 @@ def __call__(self, *args): ##, **kw): kw unused
stime = time.time()
result = prof.runcall(self.func, *args)
stime = time.time() - stime
-
prof.close()
- stats = hotshot.stats.load(temp.name)
- stats.strip_dirs()
- stats.sort_stats('time', 'calls')
+
+ def print_stats():
+ stats = hotshot.stats.load(temp.name)
+ stats.strip_dirs()
+ stats.sort_stats('time', 'calls')
+ stats.print_stats(40)
+ stats.print_callers()
+
x = '\n\ntook '+ str(stime) + ' seconds\n'
- x += capturestdout(stats.print_stats)(40)
- x += capturestdout(stats.print_callers)()
+ x += capturestdout(print_stats)()
+
return result, x
profile = Profile
@@ -698,31 +703,31 @@ class ThreadedDict:
"""
def __init__(self, dictionary):
self.__dict__['_ThreadedDict__d'] = dictionary
-
+
def __getattr__(self, attr):
return getattr(self.__d[threading.currentThread()], attr)
-
+
def __getitem__(self, item):
return self.__d[threading.currentThread()][item]
-
+
def __setattr__(self, attr, value):
if attr == '__doc__':
self.__dict__[attr] = value
else:
return setattr(self.__d[threading.currentThread()], attr, value)
-
+
def __delattr__(self, item):
try:
del self.__d[threading.currentThread()][item]
except KeyError, k:
raise AttributeError, k
-
+
def __delitem__(self, item):
del self.__d[threading.currentThread()][item]
-
+
def __setitem__(self, item, value):
self.__d[threading.currentThread()][item] = value
-
+
def __hash__(self):
return hash(self.__d[threading.currentThread()])
@@ -790,6 +795,93 @@ def safemarkdown(text):
text = markdown(text)
return text
+def sendmail(from_address, to_address, subject, message, headers=None, **kw):
+ """
+ Sends the email message `message` with mail and envelope headers
+ for from `from_address_` to `to_address` with `subject`.
+ Additional email headers can be specified with the dictionary
+ `headers.
+
+ If `web.config.smtp_server` is set, it will send the message
+ to that SMTP server. Otherwise it will look for
+ `/usr/lib/sendmail`, the typical location for the sendmail-style
+ binary.
+ """
+ try:
+ import webapi
+ except ImportError:
+ webapi = Storage(config=Storage())
+
+ if headers is None: headers = {}
+
+ cc = kw.get('cc', [])
+ bcc = kw.get('bcc', [])
+
+ def listify(x):
+ if not isinstance(x, list):
+ return [x]
+ else:
+ return x
+
+ to_address = listify(to_address)
+ cc = listify(cc)
+ bcc = listify(bcc)
+
+ recipients = to_address + cc + bcc
+
+ headers = dictadd({
+ 'MIME-Version': '1.0',
+ 'Content-Type': 'text/plain; charset=UTF-8',
+ 'Content-Disposition': 'inline',
+ 'From': from_address,
+ 'To': ", ".join(to_address),
+ 'Subject': subject
+ }, headers)
+
+ if cc:
+ headers['Cc'] = ", ".join(cc)
+
+ import email.Utils
+ from_address = email.Utils.parseaddr(from_address)[1]
+ recipients = [email.Utils.parseaddr(r)[1] for r in recipients]
+ message = ('\n'.join(['%s: %s' % x for x in headers.iteritems()])
+ + "\n\n" + message)
+ message = utf8(message)
+
+ if webapi.config.get('smtp_server'):
+ server = webapi.config.get('smtp_server')
+ port = webapi.config.get('smtp_port', 0)
+ username = webapi.config.get('smtp_username')
+ password = webapi.config.get('smtp_password')
+ debug_level = webapi.config.get('smtp_debuglevel', None)
+ starttls = webapi.config.get('smtp_starttls', False)
+
+ import smtplib
+ smtpserver = smtplib.SMTP(server, port)
+
+ if debug_level:
+ smtpserver.set_debuglevel(debug_level)
+
+ if starttls:
+ smtpserver.ehlo()
+ smtpserver.starttls()
+ smtpserver.ehlo()
+
+ if username and password:
+ smtpserver.login(username, password)
+
+ smtpserver.sendmail(from_address, recipients, message)
+ smtpserver.quit()
+ else:
+ assert not from_address.startswith('-'), 'security'
+ for r in recipients:
+ assert not r.startswith('-'), 'security'
+
+ i, o = os.popen2(["/usr/lib/sendmail", '-f', from_address] + recipients)
+ i.write(message)
+ i.close()
+ o.close()
+ del i, o
if __name__ == "__main__":
import doctest
View
12 trunk/web/webapi.py → web/webapi.py
@@ -123,13 +123,15 @@ def data():
ctx.data = ctx.env['wsgi.input'].read(cl)
return ctx.data
-def setcookie(name, value, expires="", domain=None):
+def setcookie(name, value, expires="", domain=None, secure=False):
"""Sets a cookie."""
if expires < 0:
expires = -1000000000
kargs = {'expires': expires, 'path':'/'}
if domain:
kargs['domain'] = domain
+ if secure:
+ kargs['secure'] = secure
# @@ should we limit cookies to a different path?
cookie = Cookie.SimpleCookie()
cookie[name] = value
@@ -259,7 +261,8 @@ def _load(env):
ctx.output = ''
ctx.environ = ctx.env = env
ctx.host = env.get('HTTP_HOST')
- ctx.homedomain = 'http://' + env.get('HTTP_HOST', '[unknown]')
+ ctx.protocol = env.get('HTTPS') and 'https' or 'http'
+ ctx.homedomain = ctx.protocol + '://' + env.get('HTTP_HOST', '[unknown]')
ctx.homepath = os.environ.get('REAL_SCRIPT_NAME', env.get('SCRIPT_NAME', ''))
ctx.home = ctx.homedomain + ctx.homepath
ctx.ip = env.get('REMOTE_ADDR')
@@ -300,6 +303,11 @@ def wsgifunc(func, *middleware):
def wsgifunc(env, start_resp):
_load(env)
+
+ # allow uppercase methods only
+ if ctx.method.upper() != ctx.method:
+ return notfound()
+
try:
result = func()
except StopIteration:
View
0  trunk/web/wsgi.py → web/wsgi.py
File renamed without changes
View
0  trunk/web/wsgiserver/LICENSE.txt → web/wsgiserver/LICENSE.txt
File renamed without changes
View
0  trunk/web/wsgiserver/__init__.py → web/wsgiserver/__init__.py
File renamed without changes
Something went wrong with that request. Please try again.