diff --git a/CHANGES.rst b/CHANGES.rst index 3d5de33..2a06f65 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,6 +6,8 @@ Changelog - Add test coverage computation. +- Various small Python 3 compatibility changes. + 3.0.1 (2017-10-18) ------------------ diff --git a/src/Shared/DC/ZRDB/Aqueduct.py b/src/Shared/DC/ZRDB/Aqueduct.py index 1d257a0..efe4815 100644 --- a/src/Shared/DC/ZRDB/Aqueduct.py +++ b/src/Shared/DC/ZRDB/Aqueduct.py @@ -15,7 +15,6 @@ import os import re from six import StringIO -import string from Acquisition import Implicit from App.Common import package_home @@ -137,7 +136,7 @@ def manage_test(self, REQUEST): result = self(REQUEST) report = HTML(custom_default_report(self.id, result)) - return apply(report, (self, REQUEST), {self.id: result}) + return report(*(self, REQUEST), **{self.id: result}) def index_html(self, URL1): " " @@ -188,7 +187,7 @@ def default_input_form(id, arguments, action='query', tabs=''): 'Enter query parameters:
' '\n' % (id, tabs, action, id), - string.joinfields( + '/n'.join( map( lambda a: ('\n' @@ -204,8 +203,7 @@ def default_input_form(id, arguments, action='query', tabs=''): 'default' in a[1] and a[1]['default'] or '' )) , items - ), - '\n'), + )), '\n\n%s ' % - string.joinfields( + ''.join( map(lambda c: ' \n' % nicify(c['name']), - columns), - '' + columns) )) if no_table: @@ -269,7 +266,7 @@ def custom_default_report(id, result, action='', no_table=0, % (td, n, c['type'] != 's' and ' null=""' or '', _td)) row = (' %s\n%s\n %s' % ( - tr, string.joinfields(row, delim), _tr)) + tr, delim.join(row), _tr)) return custom_default_report_src( id=id, heading=heading, row=row, action=action, no_table=no_table) @@ -281,11 +278,10 @@ def custom_default_zpt_report(id, result, action='', no_table=0, columns = result._searchable_result_columns() __traceback_info__ = columns heading = ('\n%s ' % - string.joinfields( + ''.join( map(lambda c: ' \n' % nicify(c['name']), - columns), - '' + columns) )) if no_table: @@ -300,7 +296,7 @@ def custom_default_zpt_report(id, result, action='', no_table=0, % (td, n, n, _td)) row = (' %s\n%s\n %s' % ( - tr, string.joinfields(row, delim), _tr)) + tr, delim.join(row), _tr)) return custom_default_zpt_report_src( id=id, heading=heading, row=row, action=action, no_table=no_table) @@ -425,13 +421,13 @@ def quotedHTML(text, ('"', '"'))): for char, name in character_entities: - text = string.replace(text, char, name) + text = text.replace(char, name) return text def nicify(name): - name = string.replace(name.strip(), '_', ' ') + name = name.strip().replace('_', ' ') return name[:1].upper() + name[1:] diff --git a/src/Shared/DC/ZRDB/Aqueduct.py.bak b/src/Shared/DC/ZRDB/Aqueduct.py.bak new file mode 100644 index 0000000..1d257a0 --- /dev/null +++ b/src/Shared/DC/ZRDB/Aqueduct.py.bak @@ -0,0 +1,496 @@ +############################################################################## +# +# Copyright (c) 2002 Zope Foundation and Contributors. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +############################################################################## + +import binascii +import os +import re +from six import StringIO +import string + +from Acquisition import Implicit +from App.Common import package_home +from DateTime.DateTime import DateTime +from DocumentTemplate import File +from DocumentTemplate import HTML +from OFS.SimpleItem import Item +from Persistence import Persistent +from zExceptions import Redirect + +# BBB Zope 2.12 +try: + from OFS.role import RoleManager +except ImportError: + from AccessControl.Role import RoleManager + + +dtml_dir = os.path.join(package_home(globals()), 'dtml') + +InvalidParameter = 'Invalid Parameter' + + +class BaseQuery(Persistent, Item, Implicit, RoleManager): + + _col = None + _arg = {} + query_date = DateTime() + manage_options = () + MissingArgumentError = 'Bad Request' + + def query_year(self): + return self.query_date.year() + + def query_month(self): + return self.query_date.month() + + def query_day(self): + return self.query_date.day() + + def quoted_input(self): + return quotedHTML(self.input_src) + + def quoted_report(self): + return quotedHTML(self.report_src) + + def _convert(self): + self._arg = parse(self.arguments_src) + + def _argdata(self, REQUEST): + r = {} + + try: + args = self._arg + except Exception: + self._convert() + args = self._arg + + id = self.id + missing = [] + + for name in args.keys(): + idname = "%s/%s" % (id, name) + try: + r[name] = REQUEST[idname] + except Exception: + try: + r[name] = REQUEST[name] + except Exception: + arg = args[name] + try: + r[name] = arg['default'] + except Exception: + try: + if not arg['optional']: + missing.append(name) + except Exception: + missing.append(name) + + # Note: the code above tries to check if an argument of the + # ZSQL method above has the "optional" flag set (in case the + # argument is omitted from the ZSQL function call). But there + # is neither corresponding code inside the parse() function to + # check for the "optional" parameter nor any documentation. + # So we omit the check for the optional parameter. There will + # be probably no code break but there will be hopefully more code + # to work as supposed to work. + +# if missing: +# raise self.MissingArgumentError( \ +# "The following arguments were omitted " \ +# " from the ZSQL method call: %s" % str(missing)) +# + + return r + + +class Searchable(BaseQuery): + + def _searchable_arguments(self): + + try: + return self._arg + except Exception: + self._convert() + return self._arg + + def _searchable_result_columns(self): + return self._col + + def manage_testForm(self, REQUEST): + """Provide testing interface""" + input_src = default_input_form(self.title_or_id(), + self._searchable_arguments(), + 'manage_test') + return HTML(input_src)(self, REQUEST) + + def manage_test(self, REQUEST): + 'Perform an actual query' + + result = self(REQUEST) + report = HTML(custom_default_report(self.id, result)) + return apply(report, (self, REQUEST), {self.id: result}) + + def index_html(self, URL1): + " " + raise Redirect("%s/manage_testForm" % URL1) + + +class Composite(object): + + def _getquery(self, id): + + o = self + i = 0 + while 1: + __traceback_info__ = o + q = getattr(o, id) + try: + if hasattr(q, '_searchable_arguments'): + try: + q = q.__of__(self.aq_parent) + except Exception: + pass + return q + except Exception: + pass + if i > 100: + raise AttributeError(id) + i = i + 1 + o = o.aq_parent + + def myQueryIds(self): + return map( + lambda k, queries=self.queries: + {'id': k, 'selected': k in queries}, + self.ZQueryIds()) + + +def default_input_form(id, arguments, action='query', tabs=''): + if arguments: + items = arguments.items() + return ( + "%s\n%s%s" % ( + '\n' + '%s Input Data\n' + '\n%s\n' + '\n' + '

%s Input Data

\n' + 'Enter query parameters:
' + '
%s
\n' '\n' '\n' @@ -248,11 +246,10 @@ def custom_default_report(id, result, action='', no_table=0, columns = result._searchable_result_columns() __traceback_info__ = columns heading = ('
%s
%s
\n' + % (id, tabs, action, id), + string.joinfields( + map( + lambda a: + ('\n' + ' ' + % (nicify(a[0]), + ( + 'type' in a[1] and + ("%s:%s" % (a[0], a[1]['type'])) or + a[0] + ), + 'default' in a[1] and a[1]['default'] or '' + )) + , items + ), + '\n'), + '\n\n
%s' + '
\n' + '\n' + '\n' + ' \n' + ' \n' + '\n' + '
\n\n\n\n' + ) + ) + else: + return ( + '\n' + '%s Input Data\n' + '\n%s\n' + '
\n' + '

%s Input Data

\n' + 'This query requires no input.

\n' + '\n' + '\n' + ' \n' + ' \n' + '\n' + '\n\n

\n\n\n' + % (id, tabs, action, id) + ) + + +custom_default_report_src = File( + os.path.join(dtml_dir, 'customDefaultReport.dtml')) +custom_default_zpt_report_src = File( + os.path.join(dtml_dir, 'customDefaultZPTReport.dtml')) + + +def custom_default_report(id, result, action='', no_table=0, + goofy=re.compile('\W').search): + + columns = result._searchable_result_columns() + __traceback_info__ = columns + heading = ('\n%s ' % + string.joinfields( + map(lambda c: + ' %s\n' % nicify(c['name']), + columns), + '' + )) + + if no_table: + tr, _tr, td, _td, delim = '

', '

', '', '', ',\n' + else: + tr, _tr, td, _td, delim = '', '', '', '', '\n' + + row = [] + for c in columns: + n = c['name'] + if goofy(n) is not None: + n = 'expr="_[\'%s]"' % (repr('"' + n)[2:]) + row.append(' %s%s' + % (td, n, c['type'] != 's' and ' null=""' or '', _td)) + + row = (' %s\n%s\n %s' % ( + tr, string.joinfields(row, delim), _tr)) + + return custom_default_report_src( + id=id, heading=heading, row=row, action=action, no_table=no_table) + + +def custom_default_zpt_report(id, result, action='', no_table=0, + goofy=re.compile('\W').search): + + columns = result._searchable_result_columns() + __traceback_info__ = columns + heading = ('\n%s ' % + string.joinfields( + map(lambda c: + ' %s\n' % nicify(c['name']), + columns), + '' + )) + + if no_table: + tr, _tr, td, _td, delim = '

', '

', '', '', ',\n' + else: + tr, _tr, td, _td, delim = '', '', '', '', '\n' + + row = [] + for c in columns: + n = c['name'] + row.append(' %s%s goes here%s' + % (td, n, n, _td)) + + row = (' %s\n%s\n %s' % ( + tr, string.joinfields(row, delim), _tr)) + + return custom_default_zpt_report_src( + id=id, heading=heading, row=row, action=action, no_table=no_table) + + +def detypify(arg): + l = arg.find(':') + if l > 0: + arg = arg[:l] + return arg + + +def decode(input, output): + while 1: + line = input.readline() + if not line: + break + s = binascii.a2b_base64(line[:-1]) + output.write(s) + + +def decodestring(s): + f = StringIO(s) + g = StringIO() + decode(f, g) + return g.getvalue() + + +class Args(object): + def __init__(self, data, keys): + self._data = data + self._keys = keys + + def items(self): + return map(lambda k, d=self._data: (k, d[k]), self._keys) + + def values(self): + return map(lambda k, d=self._data: d[k], self._keys) + + def keys(self): + return list(self._keys) + + def has_key(self, key): + return key in self._data + + def __contains__(self, key): + return key in self._data + + def __getitem__(self, key): + return self._data[key] + + def __setitem__(self, key, v): + self._data[key] = v + + def __delitem__(self, key): + del self._data[key] + + def __len__(self): + return len(self._data) + + +def parse(text, + result=None, + keys=None, + unparmre=re.compile( + r'([\000- ]*([^\000- ="]+))'), + parmre=re.compile( + r'([\000- ]*([^\000- ="]+)=([^\000- ="]+))'), + qparmre=re.compile( + r'([\000- ]*([^\000- ="]+)="([^"]*)")'), + ): + + if result is None: + result = {} + keys = [] + + __traceback_info__ = text + + mo = parmre.match(text) + + if mo: + name = mo.group(2) + value = {'default': mo.group(3)} + l = len(mo.group(1)) + + else: + mo = qparmre.match(text) + + if mo: + name = mo.group(2) + value = {'default': mo.group(3)} + l = len(mo.group(1)) + + else: + mo = unparmre.match(text) + + if mo: + name = mo.group(2) + value = {} + l = len(mo.group(1)) + else: + if not text or not text.strip(): + return Args(result, keys) + raise InvalidParameter(text) + + lt = name.find(':') + if lt > 0: + value['type'] = name[lt + 1:] + name = name[:lt] + + result[name] = value + keys.append(name) + + return parse(text[l:], result, keys) + + +def quotedHTML(text, + character_entities=( + ('&', '&'), + ("<", '<'), + (">", '>'), + ('"', '"'))): + + for char, name in character_entities: + text = string.replace(text, char, name) + + return text + + +def nicify(name): + name = string.replace(name.strip(), '_', ' ') + return name[:1].upper() + name[1:] + + +def decapitate(html, RESPONSE=None, + header_re=re.compile( + r'((' + r'[^\000- <>:]+:[^\n]*\n' + r'|' + r'[ \011]+[^\000- ][^\n]*\n' + r')+)[ \t]*\n([\000-\377]+)' + ), # please kill me now + space_re=re.compile(r'([ \t]+)'), + name_re=re.compile(r'([^\000- <>:]+):([^\n]*)'), + ): + + mo = header_re.match(html) + if mo is None: + return html + + headers, html = mo.group(1, 3) + + headers = headers.split('\n') + + i = 1 + while i < len(headers): + if not headers[i]: + del headers[i] + else: + mo = space_re.match(headers[i]) + if mo: + headers[i - 1] = "%s %s" % ( + headers[i - 1], headers[i][len(mo.group(1)):]) + del headers[i] + else: + i = i + 1 + + for i in range(len(headers)): + mo = name_re.match(headers[i]) + if mo: + k, v = mo.group(1, 2) + v = v.strip() + else: + raise ValueError('Invalid Header (%d): %s ' % (i, headers[i])) + RESPONSE.setHeader(k, v) + + return html + + +def delimited_output(results, REQUEST, RESPONSE): + delim = REQUEST['output-delimiter'] + try: + output_type = REQUEST['output-type'] + except Exception: + output_type = 'text/plain' + + RESPONSE.setHeader('content-type', output_type) + return "%s\n%s\n" % ( + delim.join(results.names()), + '\n'.join(map(lambda row, delim=delim: + delim.join(map(str, row)), + results)), + ) diff --git a/src/Shared/DC/ZRDB/Connection.py b/src/Shared/DC/ZRDB/Connection.py index 6686e70..a5e2605 100644 --- a/src/Shared/DC/ZRDB/Connection.py +++ b/src/Shared/DC/ZRDB/Connection.py @@ -15,7 +15,6 @@ from cgi import escape from logging import getLogger from six import StringIO -import string import sys from AccessControl.Permissions import view_management_screens @@ -157,7 +156,7 @@ def manage_test(self, query, REQUEST=None): '' % (r, query)) - report = apply(report,(self,REQUEST),{self.id:result}) + report = report(*(self,REQUEST), **{self.id:result}) return report diff --git a/src/Shared/DC/ZRDB/Connection.py.bak b/src/Shared/DC/ZRDB/Connection.py.bak new file mode 100644 index 0000000..6686e70 --- /dev/null +++ b/src/Shared/DC/ZRDB/Connection.py.bak @@ -0,0 +1,223 @@ +############################################################################## +# +# Copyright (c) 2002 Zope Foundation and Contributors. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +############################################################################## +'''Generic Database Connection Support''' + +from cgi import escape +from logging import getLogger +from six import StringIO +import string +import sys + +from AccessControl.Permissions import view_management_screens +from AccessControl.Permissions import change_database_connections +from AccessControl.Permissions import test_database_connections +from AccessControl.Permissions import open_close_database_connection +from AccessControl.SecurityInfo import ClassSecurityInfo +from Acquisition import Implicit +from App.Dialogs import MessageDialog +from App.special_dtml import DTMLFile +from DateTime.DateTime import DateTime +from DocumentTemplate import HTML +from OFS.SimpleItem import Item +from Persistence import Persistent +from zExceptions import BadRequest + +# BBB Zope 2.12 +try: + from AccessControl.class_init import InitializeClass +except ImportError: + from App.class_init import InitializeClass + +# BBB Zope 2.12 +try: + from OFS.role import RoleManager +except ImportError: + from AccessControl.Role import RoleManager + + +from .Aqueduct import custom_default_report +from . import RDB +from .Results import Results + +LOG = getLogger('ZRDB.Connection') + +class Connection(Persistent, + RoleManager, + Item, + Implicit, + ): + + security = ClassSecurityInfo() + + # Specify definitions for tabs: + manage_options=( + ( + {'label':'Status', 'action':'manage_main'}, + {'label':'Properties', 'action':'manage_properties'}, + {'label':'Test', 'action':'manage_testForm'}, + ) + + RoleManager.manage_options + + Item.manage_options + ) + + _v_connected='' + connection_string='' + # Should the database connection be established when the object + # is loaded from the ZODB (in __setstate__)? + connect_on_load=True + + def __init__(self, id, title, connection_string, check=None): + self.id=str(id) + self.edit(title, connection_string, check) + + def __setstate__(self, state): + Persistent.__setstate__(self, state) + if self.connect_on_load and self.connection_string: + try: self.connect(self.connection_string) + except: + LOG.error('Error connecting to relational database.', + exc_info=True) + + def title_and_id(self): + s=Connection.inheritedAttribute('title_and_id')(self) + if hasattr(self, '_v_connected') and self._v_connected: + s="%s, which is connected" % s + else: + s="%s, which is not connected" % s + return s + + def title_or_id(self): + s=Connection.inheritedAttribute('title_or_id')(self) + if hasattr(self, '_v_connected') and self._v_connected: + s="%s (connected)" % s + else: + s="%s ( not connected)" % s + return s + + def connected(self): return self._v_connected + + def edit(self, title, connection_string, check=1): + self.title=title + self.connection_string=connection_string + if check: self.connect(connection_string) + + manage_properties=DTMLFile('dtml/connectionEdit', globals()) + + security.declareProtected(change_database_connections, 'manage_edit') + def manage_edit(self, title, connection_string, check=None, REQUEST=None): + """Change connection + """ + self.edit(title, connection_string, check) + if REQUEST is not None: + return MessageDialog( + title='Edited', + message='%s has been edited.' % escape(self.id), + action ='./manage_main', + ) + + security.declareProtected(test_database_connections, 'manage_testForm') + manage_testForm=DTMLFile('dtml/connectionTestForm', globals()) + + security.declareProtected(test_database_connections, 'manage_test') + def manage_test(self, query, REQUEST=None): + "Executes the SQL in parameter 'query' and returns results" + dbc=self() #get our connection + res=dbc.query(query) + + if type(res) is type(''): + f=StringIO() + f.write(res) + f.seek(0) + result=RDB.File(f) + else: + result=Results(res) + + if REQUEST is None: + return result #return unadulterated result objects + + if result._searchable_result_columns(): + r=custom_default_report(self.id, result) + else: + r='This statement returned no results.' + + report = HTML( + '\n' + '\n
\n%s\n\n' + '

SQL Used:
\n
\n%s\n
\n
\n' + '' + % (r, query)) + + report = apply(report,(self,REQUEST),{self.id:result}) + + return report + + + security.declareProtected(view_management_screens, 'manage_main') + manage_main=DTMLFile('dtml/connectionStatus', globals()) + + security.declareProtected(open_close_database_connection, + 'manage_close_connection') + def manage_close_connection(self, REQUEST=None): + " " + try: + if hasattr(self,'_v_database_connection'): + self._v_database_connection.close() + except: + LOG.error('Error closing relational database connection.', + exc_info=True) + self._v_connected='' + if REQUEST is not None: + return self.manage_main(self, REQUEST) + + security.declareProtected(open_close_database_connection, + 'manage_open_connection') + def manage_open_connection(self, REQUEST=None): + " " + self.connect(self.connection_string) + return self.manage_main(self, REQUEST) + + def __call__(self, v=None): + try: return self._v_database_connection + except AttributeError: + s=self.connection_string + if s: + self.connect(s) + return self._v_database_connection + raise BadRequest( + '''The database connection is not connected''') + + def connect(self,s): + self.manage_close_connection() + DB=self.factory() + try: + try: + self._v_database_connection=DB(s) + except: + t, v, tb = sys.exc_info() + raise BadRequest( + 'Error connecting to DB.
\n' + '\n' + % (t,v)).with_traceback(tb) + finally: tb=None + self._v_connected=DateTime() + + return self + + def sql_quote__(self, v): + if v.find("\'") >= 0: + v = "''".join(v.split("\'")) + if v.find("\x00") >= 0: + v = "".join(v.split("\x00")) + return "'%s'" % v + +InitializeClass(Connection) diff --git a/src/Shared/DC/ZRDB/DA.py b/src/Shared/DC/ZRDB/DA.py index 0713ffb..7106df5 100644 --- a/src/Shared/DC/ZRDB/DA.py +++ b/src/Shared/DC/ZRDB/DA.py @@ -16,7 +16,6 @@ import os import re from six import StringIO -import string import sys from time import time @@ -430,11 +429,11 @@ def manage_advanced(self, max_rows, max_cache, cache_time, """ # paranoid type checking if not isinstance(max_rows, int): - max_rows = string.atoi(max_rows) + max_rows = int(max_rows) if not isinstance(max_cache, int): - max_cache = string.atoi(max_cache) + max_cache = int(max_cache) if not isinstance(cache_time, int): - cache_time = string.atoi(cache_time) + cache_time = int(cache_time) class_name = str(class_name) class_file = str(class_file) @@ -537,7 +536,7 @@ def manage_test(self, REQUEST): '' % (r, html_quote(src))) - report = apply(report, (self, REQUEST), {self.id: result}) + report = report(*(self, REQUEST), **{self.id: result}) if tb is not None: self.raise_standardErrorMessage( @@ -809,7 +808,7 @@ def __bobo_traverse__(self, REQUEST, key): if results: if len(results) > 1: try: - return results[string.atoi(key)].__of__(da) + return results[int(key)].__of__(da) except Exception: raise KeyError(key) else: diff --git a/src/Shared/DC/ZRDB/DA.py.bak b/src/Shared/DC/ZRDB/DA.py.bak new file mode 100644 index 0000000..0713ffb --- /dev/null +++ b/src/Shared/DC/ZRDB/DA.py.bak @@ -0,0 +1,848 @@ +############################################################################## +# +# Copyright (c) 2002 Zope Foundation and Contributors. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +############################################################################## +'''Generic Database adapter''' + +import imp +import os +import re +from six import StringIO +import string +import sys +from time import time + +from AccessControl.Permissions import change_database_methods +from AccessControl.Permissions import use_database_methods +from AccessControl.Permissions import view_management_screens +from AccessControl.SecurityInfo import ClassSecurityInfo +from AccessControl.SecurityManagement import getSecurityManager +from Acquisition import Implicit +from App.Extensions import getObject +from App.special_dtml import DTMLFile +from DocumentTemplate import HTML +from DocumentTemplate.html_quote import html_quote +from DateTime.DateTime import DateTime +from ExtensionClass import Base +from BTrees.OOBTree import OOBucket as Bucket +from OFS import bbb +from OFS.SimpleItem import Item +from Persistence import Persistent +import Products +if bbb.HAS_ZSERVER: + from webdav.Resource import Resource + from webdav.Lockable import ResourceLockedError +else: + Resource = bbb.Resource + from zExceptions import ResourceLockedError +from zExceptions import BadRequest +from zExceptions import NotFound + +# BBB Zope 2.12 +try: + from AccessControl.class_init import InitializeClass +except ImportError: + from App.class_init import InitializeClass + +# BBB Zope 2.12 +try: + from DocumentTemplate.security import RestrictedDTML +except ImportError: + from AccessControl.DTML import RestrictedDTML + +# BBB Zope 2.12 +try: + from OFS.role import RoleManager +except ImportError: + from AccessControl.Role import RoleManager + +from .Aqueduct import BaseQuery +from .Aqueduct import custom_default_report +from .Aqueduct import default_input_form +from .Aqueduct import parse +from .RDB import File +from .Results import Results +from .sqlgroup import SQLGroup +from .sqltest import SQLTest +from .sqlvar import SQLVar + + +def _getPath(home, prefix, name, suffixes): + + dir = os.path.join(home, prefix) + if dir == prefix: + raise ValueError('The prefix, %s, should be a relative path' % prefix) + + fn = os.path.join(dir, name) + if fn == name: + # Paranoia + raise ValueError( + 'The file name, %s, should be a simple file name' % name) + + for suffix in suffixes: + if suffix: + fqn = "%s.%s" % (fn, suffix) + else: + fqn = fn + if os.path.exists(fqn): + return fqn + + +def getPath(prefix, name, checkProduct=1, suffixes=('',), cfg=None): + """Find a file in one of several relative locations + + Arguments: + + prefix -- The location, relative to some home, to look for the + file + + name -- The name of the file. This must not be a path. + + checkProduct -- a flag indicating whether product directories + should be used as additional hope ares to be searched. This + defaults to a true value. + + If this is true and the name contains a dot, then the + text before the dot is treated as a product name and + the product package directory is used as anothe rhome. + + suffixes -- a sequences of file suffixes to check. + By default, the name is used without a suffix. + + cfg -- ease testing (not part of the API) + + The search takes on multiple homes which are the instance home, + the directory containing the directory containing the software + home, and possibly product areas. + """ + dir, ignored = os.path.split(name) + if dir: + raise ValueError( + 'The file name, %s, should be a simple file name' % name) + + if checkProduct: + dot = name.find('.') + if dot > 0: + product = name[:dot] + extname = name[dot + 1:] + for product_dir in Products.__path__: + found = _getPath(product_dir, os.path.join(product, prefix), + extname, suffixes) + if found is not None: + return found + + if cfg is None: + import App.config + cfg = App.config.getConfiguration() + + if prefix == "Extensions" and getattr(cfg, 'extensions', None) is not None: + found = _getPath(cfg.extensions, '', name, suffixes) + if found is not None: + return found + + locations = [cfg.instancehome] + + for home in locations: + found = _getPath(home, prefix, name, suffixes) + if found is not None: + return found + + try: + dot = name.rfind('.') + if dot > 0: + realName = name[dot + 1:] + toplevel = name[:dot] + + rdot = toplevel.rfind('.') + if rdot > -1: + module = __import__(toplevel, globals(), + {}, toplevel[rdot + 1:]) + else: + module = __import__(toplevel) + + prefix = os.path.join(module.__path__[0], prefix, realName) + + for suffix in suffixes: + if suffix: + fn = "%s.%s" % (prefix, suffix) + else: + fn = prefix + if os.path.exists(fn): + return fn + except Exception: + pass + + +def getObject(module, name, reload=0, + # The use of a mutable default is intentional here, + # because modules is a module cache. + modules={} + ): + # The use of modules here is not thread safe, however, there is + # no real harm in a race condition here. If two threads + # update the cache, then one will have simply worked a little + # harder than need be. So, in this case, we won't incur + # the expense of a lock. + old = modules.get(module) + if old is not None and name in old and not reload: + return old[name] + + base, ext = os.path.splitext(module) + if ext in ('py', 'pyc'): + # XXX should never happen; splitext() keeps '.' with the extension + prefix = base + else: + prefix = module + + path = getPath('Extensions', prefix, suffixes=('', 'py', 'pyc')) + if path is None: + raise NotFound( + "The specified module, '%s', couldn't be found." % module) + + __traceback_info__ = path, module + + base, ext = os.path.splitext(path) + if ext == '.pyc': + file = open(path, 'rb') + binmod = imp.load_compiled('Extension', path, file) + file.close() + module_dict = binmod.__dict__ + else: + try: + execsrc = open(path) + except: + raise NotFound("The specified module, '%s', " + "couldn't be opened." % module) + module_dict = {} + exec(execsrc, module_dict) + + if old is not None: + # XXX Accretive?? + old.update(module_dict) + else: + modules[module] = module_dict + + try: + return module_dict[name] + except KeyError: + raise NotFound("The specified object, '%s', was not found " + "in module, '%s'." % (name, module)) + + +class NoBrains(Base): + pass + + +def getBrain(module, class_name, reload=0, modules=None): + """ Check/load a class from an extension. + """ + if not module and not class_name: + return NoBrains + + if modules is None: + c = getObject(module, class_name, reload) + else: + c = getObject(module, class_name, reload, modules=modules) + + if getattr(c, '__bases__', None) is None: + raise ValueError('%s, is not a class' % class_name) + + return c + + +class DatabaseError(BadRequest): + " base class for external relational data base connection problems " + pass + + +class nvSQL(HTML): + # Non-validating SQL Template for use by SQLFiles. + commands = {} + for k, v in HTML.commands.items(): + commands[k] = v + commands['sqlvar'] = SQLVar + commands['sqltest'] = SQLTest + commands['sqlgroup'] = SQLGroup + + _proxy_roles = () + + +class SQL(RestrictedDTML, Base, nvSQL): + # Validating SQL template for Zope SQL Methods. + pass + + +class DA(BaseQuery, + Implicit, + Persistent, + RoleManager, + Item, + Resource): + 'Database Adapter' + + security = ClassSecurityInfo() + security.declareObjectProtected(use_database_methods) + security.setPermissionDefault(use_database_methods, + ('Anonymous', 'Manager')) + + _col = None + max_rows_ = 1000 + cache_time_ = 0 + max_cache_ = 100 + class_name_ = class_file_ = '' + allow_simple_one_argument_traversal = None + template_class = SQL + connection_hook = None + + manage_options = ( + ( + {'label': 'Edit', 'action': 'manage_main', + 'help': ('ZSQLMethods', 'Z-SQL-Method_Edit.stx')}, + {'label': 'Test', 'action': 'manage_testForm', + 'help': ('ZSQLMethods', 'Z-SQL-Method_Test.stx')}, + {'label': 'Advanced', 'action': 'manage_advancedForm', + 'help': ('ZSQLMethods', 'Z-SQL-Method_Advanced.stx')}, + ) + + RoleManager.manage_options + + Item.manage_options + ) + + def __init__(self, id, title, connection_id, arguments, template): + self.id = str(id) + self.manage_edit(title, connection_id, arguments, template) + + security.declareProtected(view_management_screens, 'manage_advancedForm') + manage_advancedForm = DTMLFile('dtml/advanced', globals()) + + security.declarePublic('test_url') + def test_url_(self): + 'Method for testing server connection information' + return 'PING' + + _size_changes = { + 'Bigger': (5, 5), + 'Smaller': (-5, -5), + 'Narrower': (0, -5), + 'Wider': (0, 5), + 'Taller': (5, 0), + 'Shorter': (-5, 0), + } + + def _er(self, title, connection_id, arguments, template, + SUBMIT, dtpref_cols, dtpref_rows, REQUEST): + dr, dc = self._size_changes[SUBMIT] + rows = str(max(1, int(dtpref_rows) + dr)) + cols = str(dtpref_cols) + if cols.endswith('%'): + cols = str(min(100, max(25, int(cols[:-1]) + dc))) + '%' + else: + cols = str(max(35, int(cols) + dc)) + e = (DateTime("GMT") + 365).rfc822() + setCookie = REQUEST["RESPONSE"].setCookie + setCookie("dtpref_rows", rows, path='/', expires=e) + setCookie("dtpref_cols", cols, path='/', expires=e) + REQUEST.other.update({"dtpref_cols": cols, "dtpref_rows": rows}) + return self.manage_main(self, REQUEST, title=title, + arguments_src=arguments, + connection_id=connection_id, src=template) + + security.declareProtected(change_database_methods, 'manage_edit') + def manage_edit(self, title, connection_id, arguments, template, + SUBMIT='Change', dtpref_cols='100%', dtpref_rows='20', + REQUEST=None): + """Change database method properties + + The 'connection_id' argument is the id of a database connection + that resides in the current folder or in a folder above the + current folder. The database should understand SQL. + + The 'arguments' argument is a string containing an arguments + specification, as would be given in the SQL method cration form. + + The 'template' argument is a string containing the source for the + SQL Template. + """ + + if SUBMIT in self._size_changes: + return self._er(title, connection_id, arguments, template, + SUBMIT, dtpref_cols, dtpref_rows, REQUEST) + + if self.wl_isLocked(): + raise ResourceLockedError('SQL Method is locked via WebDAV') + + self.title = str(title) + self.connection_id = str(connection_id) + arguments = str(arguments) + self.arguments_src = arguments + self._arg = parse(arguments) + template = str(template) + self.src = template + self.template = t = self.template_class(template) + t.cook() + self._v_cache = ({}, Bucket()) + if REQUEST: + if SUBMIT == 'Change and Test': + return self.manage_testForm(REQUEST) + message = 'ZSQL Method content changed' + return self.manage_main(self, REQUEST, manage_tabs_message=message) + return '' + + security.declareProtected(change_database_methods, 'manage_advanced') + def manage_advanced(self, max_rows, max_cache, cache_time, + class_name, class_file, direct=None, + REQUEST=None, connection_hook=None): + """Change advanced properties + + The arguments are: + + max_rows -- The maximum number of rows to be returned from a query. + + max_cache -- The maximum number of results to cache + + cache_time -- The maximum amound of time to use a cached result. + + class_name -- The name of a class that provides additional + attributes for result record objects. This class will be a + base class of the result record class. + + class_file -- The name of the file containing the class + definition. + + The class file normally resides in the 'Extensions' + directory, however, the file name may have a prefix of + 'product.', indicating that it should be found in a product + directory. + + For example, if the class file is: 'ACMEWidgets.foo', then an + attempt will first be made to use the file + 'lib/python/Products/ACMEWidgets/Extensions/foo.py'. If this + failes, then the file 'Extensions/ACMEWidgets.foo.py' will be + used. + + """ + # paranoid type checking + if not isinstance(max_rows, int): + max_rows = string.atoi(max_rows) + if not isinstance(max_cache, int): + max_cache = string.atoi(max_cache) + if not isinstance(cache_time, int): + cache_time = string.atoi(cache_time) + class_name = str(class_name) + class_file = str(class_file) + + self.max_rows_ = max_rows + self.max_cache_, self.cache_time_ = max_cache, cache_time + self._v_cache = {}, Bucket() + self.class_name_, self.class_file_ = class_name, class_file + self._v_brain = getBrain(self.class_file_, self.class_name_, 1) + self.allow_simple_one_argument_traversal = direct + + self.connection_hook = connection_hook + + if REQUEST is not None: + m = "ZSQL Method advanced settings have been set" + return self.manage_advancedForm( + self, REQUEST, manage_tabs_message=m) + + security.declareProtected(view_management_screens, 'PrincipiaSearchSource') + def PrincipiaSearchSource(self): + """Return content for use by the Find machinery.""" + return '%s\n%s' % (self.arguments_src, self.src) + + # WebDAV / FTP support + + default_content_type = 'text/plain' + + security.declareProtected(view_management_screens, 'document_src') + def document_src(self, REQUEST=None, RESPONSE=None): + """Return unprocessed document source.""" + if RESPONSE is not None: + RESPONSE.setHeader('Content-Type', 'text/plain') + return '%s\n%s' % (self.arguments_src, self.src) + + def manage_FTPget(self): + """Get source for FTP download""" + self.REQUEST.RESPONSE.setHeader('Content-Type', 'text/plain') + return '%s\n%s' % (self.arguments_src, self.src) + + def get_size(self): + return len(self.document_src()) + + security.declareProtected(change_database_methods, 'PUT') + def PUT(self, REQUEST, RESPONSE): + """Handle put requests""" + self.dav__init(REQUEST, RESPONSE) + self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1) + body = REQUEST.get('BODY', '') + m = re.match('\s*(.*)\s*\n', body, re.I | re.S) + if m: + self.arguments_src = m.group(1) + self._arg = parse(self.arguments_src) + body = body[m.end():] + template = body + self.src = template + self.template = t = self.template_class(template) + t.cook() + self._v_cache = ({}, Bucket()) + RESPONSE.setStatus(204) + return RESPONSE + + security.declareProtected(change_database_methods, 'manage_testForm') + def manage_testForm(self, REQUEST): + " " + input_src = default_input_form(self.title_or_id(), + self._arg, 'manage_test', + '') + return HTML(input_src)(self, REQUEST, HTTP_REFERER='') + + security.declareProtected(change_database_methods, 'manage_test') + def manage_test(self, REQUEST): + """Test an SQL method.""" + # Try to render the query template first so that the rendered + # source will be available for the error message in case some + # error occurs... + try: + src = self(REQUEST, src__=1) + except: + src = "Could not render the query template!" + + result = () + t = v = tb = None + try: + try: + src, result = self(REQUEST, test__=1) + if src.find('\0'): + src = ('\n' + '-' * 60 + '\n').join(src.split('\0')) + if result._searchable_result_columns(): + r = custom_default_report(self.id, result) + else: + r = 'This statement returned no results.' + except: + t, v, tb = sys.exc_info() + r = 'Error, %s: %s' % (t, v) + + report = HTML( + '\n' + '\n' + '\n
\n%s\n\n' + '
SQL used:
\n
\n%s\n
\n
\n' + '' + % (r, html_quote(src))) + + report = apply(report, (self, REQUEST), {self.id: result}) + + if tb is not None: + self.raise_standardErrorMessage( + None, REQUEST, t, v, tb, None, report) + + return report + + finally: + tb = None + + security.declareProtected(view_management_screens, 'index_html') + def index_html(self, REQUEST): + """ """ + REQUEST.RESPONSE.redirect("%s/manage_testForm" % REQUEST['URL1']) + + def _searchable_arguments(self): + return self._arg + + def _searchable_result_columns(self): + return self._col + + def _cached_result(self, DB__, query, max_rows, conn_id): + # Try to fetch a result from the cache. + # Compute and cache the result otherwise. + # Also maintains the cache and ensures stale entries + # are never returned and that the cache never gets too large. + + # NB: Correct cache behavior is predicated on Bucket.keys() + # returning a sequence ordered from smalled number + # (ie: the oldest cache entry) to largest number + # (ie: the newest cache entry). Please be careful if you + # change the class instantied below! + + # get hold of a cache + caches = getattr(self, '_v_cache', None) + if caches is None: + caches = self._v_cache = {}, Bucket() + cache, tcache = caches + + # the key for caching + cache_key = query, max_rows, conn_id + # the maximum number of result sets to cache + max_cache = self.max_cache_ + # the current time + now = time() + # the oldest time which is not stale + t = now - self.cache_time_ + + # if the cache is too big, we purge entries from it + if len(cache) >= max_cache: + keys = tcache.keys() + # We also hoover out any stale entries, as we're + # already doing cache minimisation. + # 'keys' is ordered, so we purge the oldest results + # until the cache is small enough and there are no + # stale entries in it + while keys and (len(keys) >= max_cache or keys[0] < t): + key = keys[0] + q = tcache[key] + del tcache[key] + del cache[q] + del keys[0] + + # okay, now see if we have a cached result + if cache_key in cache: + k, r = cache[cache_key] + # the result may still be stale, as we only hoover out + # stale results above if the cache gets too large. + if k > t: + # yay! a cached result returned! + return r + else: + # delete stale cache entries + del cache[cache_key] + try: + del tcache[k] + except KeyError: + # If there were two querys at the same timestamp, the + # entry in `tcache` might have been already deleted when + # purging the other query, so do not complain: + pass + + # call the pure query + result = DB__.query(query, max_rows) + + # When a ZSQL method is handled by one ZPublisher thread twice in + # less time than it takes for time.time() to return a different + # value, the SQL generated is different, then this code will leak + # an entry in 'cache' for each time the ZSQL method generates + # different SQL until time.time() returns a different value. + # + # On Linux, you would need an extremely fast machine under extremely + # high load, making this extremely unlikely. On Windows, this is a + # little more likely, but still unlikely to be a problem. + # + # If it does become a problem, the values of the tcache mapping + # need to be turned into sets of cache keys rather than a single + # cache key. + tcache[now] = cache_key + cache[cache_key] = now, result + + return result + + security.declareProtected(use_database_methods, '__call__') + def __call__(self, REQUEST=None, __ick__=None, src__=0, test__=0, **kw): + """Call the database method + + The arguments to the method should be passed via keyword + arguments, or in a single mapping object. If no arguments are + given, and if the method was invoked through the Web, then the + method will try to acquire and use the Web REQUEST object as + the argument mapping. + + The returned value is a sequence of record objects. + """ + + __traceback_supplement__ = (SQLMethodTracebackSupplement, self) + + if REQUEST is None: + if kw: + REQUEST = kw + else: + if hasattr(self, 'REQUEST'): + REQUEST = self.REQUEST + else: + REQUEST = {} + + # connection hook + c = self.connection_id + # for backwards compatability + hk = self.connection_hook + # go get the connection hook and call it + if hk: + c = getattr(self, hk)() + + try: + dbc = getattr(self, c) + except AttributeError: + raise AttributeError( + "The database connection %s cannot be found." % c) + + try: + DB__ = dbc() + except Exception: + raise DatabaseError('%s is not connected to a database' % self.id) + + if hasattr(self, 'aq_parent'): + p = self.aq_parent + else: + p = None + + argdata = self._argdata(REQUEST) + argdata['sql_delimiter'] = '\0' + argdata['sql_quote__'] = dbc.sql_quote__ + + security = getSecurityManager() + security.addContext(self) + try: + try: + query = self.template(p, **argdata) + except TypeError as msg: + msg = str(msg) + if msg.find('client') >= 0: + raise NameError("'client' may not be used as an " + + "argument name in this context") + else: + raise + finally: + security.removeContext(self) + + if src__: + return query + + if self.cache_time_ > 0 and self.max_cache_ > 0: + result = self._cached_result(DB__, query, self.max_rows_, c) + else: + result = DB__.query(query, self.max_rows_) + + if hasattr(self, '_v_brain'): + brain = self._v_brain + else: + brain = self._v_brain = getBrain( + self.class_file_, self.class_name_) + + if type(result) is type(''): + f = StringIO() + f.write(result) + f.seek(0) + result = File(f, brain, p) + else: + result = Results(result, brain, p) + columns = result._searchable_result_columns() + if test__ and columns != self._col: + self._col = columns + + # If run in test mode, return both the query and results so + # that the template doesn't have to be rendered twice! + if test__: + return query, result + + return result + + def da_has_single_argument(self): + return len(self._arg) == 1 + + def __getitem__(self, key): + args = self._arg + if self.allow_simple_one_argument_traversal and len(args) == 1: + results = self({args.keys()[0]: key}) + if results: + if len(results) > 1: + raise KeyError(key) + else: + raise KeyError(key) + r = results[0] + # if hasattr(self, 'aq_parent'): r=r.__of__(self.aq_parent) + return r + + self._arg[key] # raise KeyError if not an arg + return Traverse(self, {}, key) + + def connectionIsValid(self): + return (hasattr(self, self.connection_id) and + hasattr(getattr(self, self.connection_id), 'connected')) + + def connected(self): + return getattr(getattr(self, self.connection_id), 'connected')() + + +InitializeClass(DA) + + +ListType = type([]) + + +class Traverse(Base): + """Helper class for 'traversing' searches during URL traversal + """ + _da = None + + def __init__(self, da, args, name=None): + self._r = None + self._da = da + self._args = args + self._name = name + + def __bobo_traverse__(self, REQUEST, key): + name = self._name + da = self.__dict__['_da'] + args = self._args + if name: + if name in args: + v = args[name] + if type(v) is not ListType: + v = [v] + v.append(key) + key = v + + args[name] = key + + if len(args) < len(da._arg): + return self.__class__(da, args) + key = self # "consume" key + + elif key in da._arg: + return self.__class__(da, args, key) + + results = da(args) + if results: + if len(results) > 1: + try: + return results[string.atoi(key)].__of__(da) + except Exception: + raise KeyError(key) + else: + raise KeyError(key) + + r = results[0] + # if hasattr(da, 'aq_parent'): r=r.__of__(da.aq_parent) + self._r = r + + if key is self: + return r + + if hasattr(r, '__bobo_traverse__'): + try: + return r.__bobo_traverse__(REQUEST, key) + except Exception: + pass + + try: + return getattr(r, key) + except AttributeError as v: + if str(v) != key: + raise AttributeError(v) + + return r[key] + + def __getattr__(self, name): + r = self.__dict__['_r'] + if hasattr(r, name): + return getattr(r, name) + return getattr(self.__dict__['_da'], name) + + +class SQLMethodTracebackSupplement: + def __init__(self, sql): + self.object = sql diff --git a/src/Shared/DC/ZRDB/RDB.py b/src/Shared/DC/ZRDB/RDB.py index 65d531e..360a19f 100644 --- a/src/Shared/DC/ZRDB/RDB.py +++ b/src/Shared/DC/ZRDB/RDB.py @@ -14,7 +14,6 @@ import array import re -import string from Acquisition import Implicit from DateTime import DateTime diff --git a/src/Shared/DC/ZRDB/THUNK.py b/src/Shared/DC/ZRDB/THUNK.py index ddb4940..4599e93 100644 --- a/src/Shared/DC/ZRDB/THUNK.py +++ b/src/Shared/DC/ZRDB/THUNK.py @@ -11,13 +11,13 @@ # ############################################################################## -import thread +from six.moves._thread import allocate_lock -import TM -from TM import Surrogate +from . import TM +from .TM import Surrogate import transaction -thunk_lock = thread.allocate_lock() +thunk_lock = allocate_lock() class THUNKED_TM(TM.TM): """A big heavy hammer for handling non-thread safe DAs diff --git a/src/Shared/DC/ZRDB/tests/testResult.py b/src/Shared/DC/ZRDB/tests/testResult.py index 78559e2..f0cced2 100644 --- a/src/Shared/DC/ZRDB/tests/testResult.py +++ b/src/Shared/DC/ZRDB/tests/testResult.py @@ -20,22 +20,22 @@ def test_results(self): ((1, 2), (3, 4))), brains=Brain, parent=Parent) - self.assertEquals(len(r), 2) + self.assertEqual(len(r), 2) row = r[0] - self.assertEquals(row[0], 1) - self.assertEquals(row[1], 2) - self.assertEquals(row.foo, 1) - self.assertEquals(row.bar, 2) - self.assertEquals(row.FOO, 1) - self.assertEquals(row.BAR, 2) + self.assertEqual(row[0], 1) + self.assertEqual(row[1], 2) + self.assertEqual(row.foo, 1) + self.assertEqual(row.bar, 2) + self.assertEqual(row.FOO, 1) + self.assertEqual(row.BAR, 2) row = r[1] - self.assertEquals(row[0], 3) - self.assertEquals(row[1], 4) - self.assertEquals(row.foo, 3) - self.assertEquals(row.bar, 4) - self.assertEquals(row.FOO, 3) - self.assertEquals(row.BAR, 4) - self.failUnless(isinstance(row, Brain)) + self.assertEqual(row[0], 3) + self.assertEqual(row[1], 4) + self.assertEqual(row.foo, 3) + self.assertEqual(row.bar, 4) + self.assertEqual(row.FOO, 3) + self.assertEqual(row.BAR, 4) + self.assertTrue(isinstance(row, Brain)) def test_rdb_file(self): infile = StringIO("""\ @@ -47,22 +47,22 @@ def test_rdb_file(self): r = RDB.File(infile, brains=Brain, parent=Parent) - self.assertEquals(len(r), 2) + self.assertEqual(len(r), 2) row = r[0] - self.assertEquals(row[0], 1) - self.assertEquals(row[1], 2) - self.assertEquals(row.foo, 1) - self.assertEquals(row.bar, 2) - self.assertEquals(row.FOO, 1) - self.assertEquals(row.BAR, 2) + self.assertEqual(row[0], 1) + self.assertEqual(row[1], 2) + self.assertEqual(row.foo, 1) + self.assertEqual(row.bar, 2) + self.assertEqual(row.FOO, 1) + self.assertEqual(row.BAR, 2) row = r[1] - self.assertEquals(row[0], 3) - self.assertEquals(row[1], 4) - self.assertEquals(row.foo, 3) - self.assertEquals(row.bar, 4) - self.assertEquals(row.FOO, 3) - self.assertEquals(row.BAR, 4) - self.failUnless(isinstance(row, Brain)) + self.assertEqual(row[0], 3) + self.assertEqual(row[1], 4) + self.assertEqual(row.foo, 3) + self.assertEqual(row.bar, 4) + self.assertEqual(row.FOO, 3) + self.assertEqual(row.BAR, 4) + self.assertTrue(isinstance(row, Brain)) def test_suite(): return TestSuite((makeSuite(TestResults),)) diff --git a/src/Shared/DC/ZRDB/tests/testResult.py.bak b/src/Shared/DC/ZRDB/tests/testResult.py.bak new file mode 100644 index 0000000..78559e2 --- /dev/null +++ b/src/Shared/DC/ZRDB/tests/testResult.py.bak @@ -0,0 +1,68 @@ +from six import StringIO +from unittest import TestCase, TestSuite, makeSuite +from ExtensionClass import Base +from Shared.DC.ZRDB.Results import Results +from Shared.DC.ZRDB import RDB + + +class Brain(Base): + def __init__(self, *args): + pass + +Parent = Base() + + +class TestResults(TestCase): + + def test_results(self): + r = Results(([{'name':'foo', 'type':'integer'}, + {'name':'bar', 'type':'integer'}], + ((1, 2), (3, 4))), + brains=Brain, + parent=Parent) + self.assertEquals(len(r), 2) + row = r[0] + self.assertEquals(row[0], 1) + self.assertEquals(row[1], 2) + self.assertEquals(row.foo, 1) + self.assertEquals(row.bar, 2) + self.assertEquals(row.FOO, 1) + self.assertEquals(row.BAR, 2) + row = r[1] + self.assertEquals(row[0], 3) + self.assertEquals(row[1], 4) + self.assertEquals(row.foo, 3) + self.assertEquals(row.bar, 4) + self.assertEquals(row.FOO, 3) + self.assertEquals(row.BAR, 4) + self.failUnless(isinstance(row, Brain)) + + def test_rdb_file(self): + infile = StringIO("""\ + foo\tbar + 2i\t2i + 1\t2 + 3\t4\ + """) + r = RDB.File(infile, + brains=Brain, + parent=Parent) + self.assertEquals(len(r), 2) + row = r[0] + self.assertEquals(row[0], 1) + self.assertEquals(row[1], 2) + self.assertEquals(row.foo, 1) + self.assertEquals(row.bar, 2) + self.assertEquals(row.FOO, 1) + self.assertEquals(row.BAR, 2) + row = r[1] + self.assertEquals(row[0], 3) + self.assertEquals(row[1], 4) + self.assertEquals(row.foo, 3) + self.assertEquals(row.bar, 4) + self.assertEquals(row.FOO, 3) + self.assertEquals(row.BAR, 4) + self.failUnless(isinstance(row, Brain)) + +def test_suite(): + return TestSuite((makeSuite(TestResults),)) diff --git a/src/Shared/DC/ZRDB/tests/testTM.py b/src/Shared/DC/ZRDB/tests/testTM.py index e3d105f..c0e0a6c 100644 --- a/src/Shared/DC/ZRDB/tests/testTM.py +++ b/src/Shared/DC/ZRDB/tests/testTM.py @@ -20,10 +20,10 @@ def test_sortKey(self): tm = TM() # the default Transaction Manager should have .sortKey() of 1 for # backward compatibility - self.assertEquals(tm.sortKey(), 1) + self.assertEqual(tm.sortKey(), 1) # but the sortKey() should be adjustable tm.setSortKey(()) - self.assertEquals(tm.sortKey(), ()) + self.assertEqual(tm.sortKey(), ()) def test_suite(): return TestSuite((makeSuite(TestTM),)) diff --git a/src/Shared/DC/ZRDB/tests/testTM.py.bak b/src/Shared/DC/ZRDB/tests/testTM.py.bak new file mode 100644 index 0000000..e3d105f --- /dev/null +++ b/src/Shared/DC/ZRDB/tests/testTM.py.bak @@ -0,0 +1,29 @@ +############################################################################## +# +# Copyright (c) 2010 Zope Foundation and Contributors. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +############################################################################## + +from unittest import TestCase, TestSuite, makeSuite +from Shared.DC.ZRDB.TM import TM + +class TestTM(TestCase): + + def test_sortKey(self): + tm = TM() + # the default Transaction Manager should have .sortKey() of 1 for + # backward compatibility + self.assertEquals(tm.sortKey(), 1) + # but the sortKey() should be adjustable + tm.setSortKey(()) + self.assertEquals(tm.sortKey(), ()) + +def test_suite(): + return TestSuite((makeSuite(TestTM),)) diff --git a/src/Shared/DC/ZRDB/tests/test_results.py b/src/Shared/DC/ZRDB/tests/test_results.py index f35b588..ffdafd2 100644 --- a/src/Shared/DC/ZRDB/tests/test_results.py +++ b/src/Shared/DC/ZRDB/tests/test_results.py @@ -47,11 +47,11 @@ def test_len(self): def test_getitem(self): ob = self._makeOne((self.columns, self.data)) row = ob[0] - self.assertEquals(row[0], 'string1') - self.assertEquals(row[1], 1) + self.assertEqual(row[0], 'string1') + self.assertEqual(row[1], 1) row = ob[1] - self.assertEquals(row[0], 'string2') - self.assertEquals(row[1], 2) + self.assertEqual(row[0], 'string2') + self.assertEqual(row[1], 2) def test_getattr_and_aliases(self): ob = self._makeOne((self.columns, self.data)) @@ -69,12 +69,12 @@ def test_getattr_and_aliases(self): def test_suppliedbrain(self): ob = self._makeOne((self.columns, self.data), brains=Brain) row = ob[0] - self.failUnless(isinstance(row, Brain)) + self.assertTrue(isinstance(row, Brain)) def test_suppliedparent(self): ob = self._makeOne((self.columns, self.data), parent=Parent) row = ob[0] - self.failUnless(aq_parent(row) is Parent) + self.assertTrue(aq_parent(row) is Parent) def test_tuples(self): ob = self._makeOne((self.columns, self.data)) @@ -142,7 +142,7 @@ def test_record_of(self): def test_record_hash(self): ob = self._makeOne((self.columns, self.data)) row = ob[0] - self.assert_(isinstance(hash(row), int)) + self.assertTrue(isinstance(hash(row), int)) def test_record_len(self): ob = self._makeOne((self.columns, self.data)) diff --git a/src/Shared/DC/ZRDB/tests/test_results.py.bak b/src/Shared/DC/ZRDB/tests/test_results.py.bak new file mode 100644 index 0000000..f35b588 --- /dev/null +++ b/src/Shared/DC/ZRDB/tests/test_results.py.bak @@ -0,0 +1,188 @@ +""" Unit tests for Products.RHGDelivery.simpleresults +""" +import unittest + +from ExtensionClass import Base +from Acquisition import aq_parent + +class Brain: + def __init__(self, *args): pass + +Parent = Base() + +class TestResults(unittest.TestCase): + + # test fixtures + columns = [ {'name' : 'string', 'type' : 't', 'width':1}, + {'name':'int', 'type': 'i'} ] + data = [['string1', 1], ['string2', 2]] + + def _getTargetClass(self): + from Shared.DC.ZRDB.Results import Results + return Results + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_searchable_result_columns(self): + ob = self._makeOne((self.columns, self.data)) + self.assertEqual(ob._searchable_result_columns(), self.columns) + + def test_names(self): + ob = self._makeOne((self.columns, self.data)) + self.assertEqual(ob.names(), ['string', 'int']) + + def test_data_dictionary(self): + ob = self._makeOne((self.columns, self.data)) + self.assertEqual( + ob.data_dictionary(), + { 'string':{'name' : 'string', 'type' : 't', 'width':1}, + 'int':{'name':'int', 'type': 'i'} } + ) + + def test_len(self): + ob = self._makeOne((self.columns, self.data)) + self.assertEqual(len(ob), 2) + + def test_getitem(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertEquals(row[0], 'string1') + self.assertEquals(row[1], 1) + row = ob[1] + self.assertEquals(row[0], 'string2') + self.assertEquals(row[1], 2) + + def test_getattr_and_aliases(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertEqual(row.string, 'string1') + self.assertEqual(row.int, 1) + self.assertEqual(row.STRING, 'string1') + self.assertEqual(row.INT, 1) + row = ob[1] + self.assertEqual(row.string, 'string2') + self.assertEqual(row.int, 2) + self.assertEqual(row.STRING, 'string2') + self.assertEqual(row.INT, 2) + + def test_suppliedbrain(self): + ob = self._makeOne((self.columns, self.data), brains=Brain) + row = ob[0] + self.failUnless(isinstance(row, Brain)) + + def test_suppliedparent(self): + ob = self._makeOne((self.columns, self.data), parent=Parent) + row = ob[0] + self.failUnless(aq_parent(row) is Parent) + + def test_tuples(self): + ob = self._makeOne((self.columns, self.data)) + tuples = ob.tuples() + self.assertEqual( tuples, [('string1', 1), ('string2', 2)] ) + + def test_dictionaries(self): + ob = self._makeOne((self.columns, self.data)) + dicts = ob.dictionaries() + self.assertEqual( dicts, [{'string':'string1', 'int':1}, + {'string':'string2', 'int':2}] ) + + def test_asRDB(self): + ob = self._makeOne((self.columns, self.data)) + asrdb = ob.asRDB() + columns = ['string\tint', '1t\ti', 'string1\t1', 'string2\t2\n'] + self.assertEqual(asrdb, '\n'.join(columns)) + + def _set_noschema(self, row): + row.cantdoit = 1 + + def test_recordschema(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertEqual(row.__record_schema__, {'string':0, 'int':1}) + self.assertRaises(AttributeError, self._set_noschema, row) + + def test_record_as_read_mapping(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertEqual('%(string)s %(int)s' % row, 'string1 1') + row = ob[1] + self.assertEqual('%(string)s %(int)s' % row, 'string2 2') + + def test_record_as_write_mapping(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + row['int'] = 5 + self.assertEqual('%(string)s %(int)s' % row, 'string1 5') + + def test_record_as_write_mapping2(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + row.int = 5 + self.assertEqual('%(string)s %(int)s' % row, 'string1 5') + + def test_record_as_sequence(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertEqual(row[0], 'string1') + self.assertEqual(row[1], 1) + self.assertEqual(list(row), ['string1', 1]) + row = ob[1] + self.assertEqual(row[0], 'string2') + self.assertEqual(row[1], 2) + self.assertEqual(list(row), ['string2', 2]) + + def test_record_of(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + wrapped = row.__of__(Parent) + self.assertEqual(wrapped.aq_self, row) + self.assertEqual(wrapped.aq_parent, Parent) + + def test_record_hash(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assert_(isinstance(hash(row), int)) + + def test_record_len(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertEqual(len(row), 2) + + def _add(self, row1, row2): + return row1 + row2 + + def test_record_add(self): + ob = self._makeOne((self.columns, self.data)) + row1 = ob[0] + row2 = ob[1] + self.assertRaises(TypeError, self._add, row1, row2) + + def _slice(self, row): + return row[1:] + + def test_record_slice(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertRaises(TypeError, self._slice, row) + + def _mul(self, row): + return row * 3 + + def test_record_mul(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertRaises(TypeError, self._mul, row) + + def _del(self, row): + del row[0] + + def test_record_delitem(self): + ob = self._makeOne((self.columns, self.data)) + row = ob[0] + self.assertRaises(TypeError, self._del, row) + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestResults)) + return suite diff --git a/src/Shared/DC/ZRDB/tests/test_sqlgroup.py b/src/Shared/DC/ZRDB/tests/test_sqlgroup.py index 0d19f8b..0895bff 100644 --- a/src/Shared/DC/ZRDB/tests/test_sqlgroup.py +++ b/src/Shared/DC/ZRDB/tests/test_sqlgroup.py @@ -29,42 +29,42 @@ def _makeOne(self, *args, **kw): def test_ctor_empty_args(self): group = self._makeOne([('sqlgroup', '', None)]) self.assertEqual(group.__name__, 'sqlgroup ') - self.failIf(group.required) - self.failIf(group.where) - self.failIf(group.set) - self.failIf(group.noparens) + self.assertFalse(group.required) + self.assertFalse(group.where) + self.assertFalse(group.set) + self.assertFalse(group.noparens) def test_ctor_required(self): group = self._makeOne([('sqlgroup', 'required', None)]) self.assertEqual(group.__name__, 'sqlgroup required') - self.failUnless(group.required) - self.failIf(group.where) - self.failIf(group.set) - self.failIf(group.noparens) + self.assertTrue(group.required) + self.assertFalse(group.where) + self.assertFalse(group.set) + self.assertFalse(group.noparens) def test_ctor_where(self): group = self._makeOne([('sqlgroup', 'where', None)]) self.assertEqual(group.__name__, 'sqlgroup where') - self.failIf(group.required) - self.failUnless(group.where) - self.failIf(group.set) - self.failIf(group.noparens) + self.assertFalse(group.required) + self.assertTrue(group.where) + self.assertFalse(group.set) + self.assertFalse(group.noparens) def test_ctor_noparens(self): group = self._makeOne([('sqlgroup', 'noparens', None)]) self.assertEqual(group.__name__, 'sqlgroup noparens') - self.failIf(group.required) - self.failIf(group.where) - self.failIf(group.set) - self.failUnless(group.noparens) + self.assertFalse(group.required) + self.assertFalse(group.where) + self.assertFalse(group.set) + self.assertTrue(group.noparens) def test_ctor_set(self): group = self._makeOne([('sqlgroup', 'set', None)]) self.assertEqual(group.__name__, 'sqlgroup set') - self.failIf(group.required) - self.failIf(group.where) - self.failUnless(group.set) - self.failIf(group.noparens) + self.assertFalse(group.required) + self.assertFalse(group.where) + self.assertTrue(group.set) + self.assertFalse(group.noparens) def test_render_empty_optional(self): group = self._makeOne([('sqlgroup', '', lambda x, y:'')]) diff --git a/src/Shared/DC/ZRDB/tests/test_sqlgroup.py.bak b/src/Shared/DC/ZRDB/tests/test_sqlgroup.py.bak new file mode 100644 index 0000000..0d19f8b --- /dev/null +++ b/src/Shared/DC/ZRDB/tests/test_sqlgroup.py.bak @@ -0,0 +1,208 @@ +############################################################################## +# +# Copyright (c) 2005 Zope Foundation and Contributors. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +############################################################################## +import unittest + + +def _sql_quote(v): + return '"%s"' % v + + +class SQLGroupTests(unittest.TestCase): + + def _getTargetClass(self): + from Shared.DC.ZRDB.sqlgroup import SQLGroup + return SQLGroup + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_empty_args(self): + group = self._makeOne([('sqlgroup', '', None)]) + self.assertEqual(group.__name__, 'sqlgroup ') + self.failIf(group.required) + self.failIf(group.where) + self.failIf(group.set) + self.failIf(group.noparens) + + def test_ctor_required(self): + group = self._makeOne([('sqlgroup', 'required', None)]) + self.assertEqual(group.__name__, 'sqlgroup required') + self.failUnless(group.required) + self.failIf(group.where) + self.failIf(group.set) + self.failIf(group.noparens) + + def test_ctor_where(self): + group = self._makeOne([('sqlgroup', 'where', None)]) + self.assertEqual(group.__name__, 'sqlgroup where') + self.failIf(group.required) + self.failUnless(group.where) + self.failIf(group.set) + self.failIf(group.noparens) + + def test_ctor_noparens(self): + group = self._makeOne([('sqlgroup', 'noparens', None)]) + self.assertEqual(group.__name__, 'sqlgroup noparens') + self.failIf(group.required) + self.failIf(group.where) + self.failIf(group.set) + self.failUnless(group.noparens) + + def test_ctor_set(self): + group = self._makeOne([('sqlgroup', 'set', None)]) + self.assertEqual(group.__name__, 'sqlgroup set') + self.failIf(group.required) + self.failIf(group.where) + self.failUnless(group.set) + self.failIf(group.noparens) + + def test_render_empty_optional(self): + group = self._makeOne([('sqlgroup', '', lambda x, y:'')]) + md = {} + self.assertEqual(group.render(md), '') + + def test_render_empty_optional_where(self): + group = self._makeOne([('sqlgroup', 'where', lambda x, y:'')]) + md = {} + self.assertEqual(group.render(md), '') + + def test_render_empty_optional_set(self): + group = self._makeOne([('sqlgroup', 'set', lambda x, y:'')]) + md = {} + self.assertEqual(group.render(md), '') + + def test_render_empty_required_raises_ValueError(self): + group = self._makeOne([('sqlgroup', 'required', lambda x, y:'')]) + md = {} + self.assertRaises(ValueError, group.render, md) + + def test_render_one_block(self): + group = self._makeOne([('sqlgroup', '', lambda x, y:'abc'), + ]) + md = {} + rendered = group.render(md) + rendered = ''.join(rendered.split('\n')) + self.assertEqual(rendered, 'abc') + + def test_render_one_block_where(self): + group = self._makeOne([('sqlgroup', 'where', lambda x, y:'abc'), + ]) + md = {} + rendered = group.render(md) + self.assertEqual(rendered, 'where\nabc\n') + + def test_render_one_block_set(self): + group = self._makeOne([('sqlgroup', 'set', lambda x, y:'abc'), + ]) + md = {} + rendered = group.render(md) + self.assertEqual(rendered, 'set\nabc\n') + + def test_render_multiple_blocks_with_tname(self): + group = self._makeOne([('sqlgroup', '', lambda x, y:'abc'), + ('baz', '', lambda x, y: 'def'), + ('qux', '', lambda x, y: 'ghi'), + ]) + md = {} + rendered = group.render(md) + rendered = ''.join(rendered.split('\n')) + self.assertEqual(rendered, '(abc baz def qux ghi)') + + def test_render_multiple_blocks_with_tname_noparens(self): + group = self._makeOne([('sqlgroup', 'noparens', lambda x, y:'abc'), + ('baz', '', lambda x, y: 'def'), + ('qux', '', lambda x, y: 'ghi'), + ]) + md = {} + rendered = group.render(md) + rendered = ''.join(rendered.split('\n')) + self.assertEqual(rendered, 'abc baz def qux ghi') + + def test_render_multiple_blocks_with_tname_and_where(self): + group = self._makeOne([('sqlgroup', 'where', lambda x, y:'abc'), + ('baz', '', lambda x, y: 'def'), + ('qux', '', lambda x, y: 'ghi'), + ]) + md = {} + rendered = group.render(md) + rendered = ''.join(rendered.split('\n')) + self.assertEqual(rendered, 'where(abc baz def qux ghi)') + + + def test_parsed_rendered_complex_where(self): + # something of a functional test, as we use nvSQL to get parsed. + from Shared.DC.ZRDB.DA import nvSQL + template = nvSQL(WHERE_EXAMPLE) + mapping = {} + mapping['name'] = 'Goofy' + mapping['home_town'] = 'Orlando' + mapping['sql_quote__'] = _sql_quote + + rendered = template(None, mapping) + self.assertEqual(rendered, + 'select * from actors\n' + 'where\n' + '((nick_name = "Goofy"\n' + ' or first_name = "Goofy"\n)\n' + ' and home_town = "Orlando"\n)\n' + ) + + def test_parsed_rendered_complex_set(self): + # something of a functional test, as we use nvSQL to get parsed. + from Shared.DC.ZRDB.DA import nvSQL + template = nvSQL(UPDATE_EXAMPLE) + mapping = {} + mapping['nick_name'] = 'Goofy' + mapping['home_town'] = 'Orlando' + mapping['sql_quote__'] = _sql_quote + + rendered = template(None, mapping) + self.assertEqual(rendered, + 'update actors\n' + 'set\nnick_name = "Goofy" , home_town = "Orlando"\n' + ) + +WHERE_EXAMPLE = """\ +select * from actors + + + + + + + + + + + age >= + + + + age <= + + +""" + +UPDATE_EXAMPLE = """\ +update actors + + + + + +""" + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SQLGroupTests)) + return suite