From 7ff6450b3f9f4c2cab353f49d2bec0c86826bb7b Mon Sep 17 00:00:00 2001 From: Date: Mon, 4 Jul 2005 16:59:17 +0000 Subject: [PATCH] added some z3 interfaces for catalog related classes --- ZCatalog.py | 18 +- interfaces.py | 27 ++ tests/testCatalog.py | 800 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 838 insertions(+), 7 deletions(-) create mode 100644 interfaces.py create mode 100644 tests/testCatalog.py diff --git a/ZCatalog.py b/ZCatalog.py index 8f1a112a..1e680c1c 100644 --- a/ZCatalog.py +++ b/ZCatalog.py @@ -16,11 +16,10 @@ """ from warnings import warn -import urllib, time, sys, string,logging +import urllib, time, sys, string, logging from Globals import DTMLFile, MessageDialog import Globals - from OFS.Folder import Folder from OFS.ObjectManager import ObjectManager from DateTime import DateTime @@ -29,19 +28,23 @@ from DocumentTemplate.DT_Util import InstanceDict, TemplateDict from DocumentTemplate.DT_Util import Eval from AccessControl.Permission import name_trans -from Catalog import Catalog, CatalogError from AccessControl.DTML import RestrictedDTML from AccessControl.Permissions import \ manage_zcatalog_entries, manage_zcatalog_indexes, search_zcatalog -from ZCatalogIndexes import ZCatalogIndexes from ZODB.POSException import ConflictError import transaction from Products.PluginIndexes.common.PluggableIndex \ import PluggableIndexInterface from Products.PluginIndexes.TextIndex import Splitter -from IZCatalog import IZCatalog +from zLOG import LOG +from zope.interface import implements + +from Catalog import Catalog, CatalogError +from interfaces import IZCatalog as z3IZCatalog +from IZCatalog import IZCatalog as z2IZCatalog from ProgressHandler import ZLogHandler -from zLOG import LOG, INFO +from ZCatalogIndexes import ZCatalogIndexes + LOG = logging.getLogger('Zope.ZCatalog') @@ -79,7 +82,8 @@ class is that it is not Zope specific. You can use it in any Python program to catalog objects. """ - __implements__ = IZCatalog + __implements__ = z2IZCatalog + implements(z3IZCatalog) meta_type = "ZCatalog" icon='misc_/ZCatalog/ZCatalog.gif' diff --git a/interfaces.py b/interfaces.py new file mode 100644 index 00000000..159c5e07 --- /dev/null +++ b/interfaces.py @@ -0,0 +1,27 @@ +############################################################################## +# +# Copyright (c) 2005 Zope Corporation and Contributors. All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""ZCatalog z3 interfaces. + +$Id$ +""" + +# create IZCatalog +from Products.Five.fiveconfigure import createZope2Bridge +from IZCatalog import IZCatalog as z2IZCatalog +import interfaces + +createZope2Bridge(z2IZCatalog, interfaces, 'IZCatalog') + +del createZope2Bridge +del z2IZCatalog +del interfaces diff --git a/tests/testCatalog.py b/tests/testCatalog.py new file mode 100644 index 00000000..5ee6fad1 --- /dev/null +++ b/tests/testCatalog.py @@ -0,0 +1,800 @@ +############################################################################## +# +# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +""" Unittests for Catalog. + +$Id$ +""" + +import unittest +import Testing +import Zope2 +Zope2.startup() + +from itertools import chain +import random + +import ExtensionClass +import OFS.Application +from AccessControl.SecurityManagement import setSecurityManager +from AccessControl.SecurityManagement import noSecurityManager +from AccessControl import Unauthorized +from Products.ZCatalog import Vocabulary +from Products.ZCatalog.Catalog import Catalog +from Products.ZCatalog.Catalog import CatalogError +from ZODB.DB import DB +from ZODB.DemoStorage import DemoStorage +import transaction + +from Products.PluginIndexes.FieldIndex.FieldIndex import FieldIndex +from Products.PluginIndexes.KeywordIndex.KeywordIndex import KeywordIndex +from Products.PluginIndexes.TextIndex.Lexicon import Lexicon +from Products.PluginIndexes.TextIndex.TextIndex import TextIndex + + +def createDatabase(): + # XXX We have to import and init products in order for PluginIndexes to + # be registered. + OFS.Application.import_products() + + # Create a DemoStorage and put an Application in it + db = DB(DemoStorage()) + conn = db.open() + root = conn.root() + app = OFS.Application.Application() + root['Application'] = app + transaction.commit() + + # Init products + #OFS.Application.initialize(app) + OFS.Application.install_products(app) # XXX: this is still icky + + return app + +app = createDatabase() + +def sort(iterable): + L = list(iterable) + L.sort() + return L + +from OFS.Folder import Folder as OFS_Folder +class Folder(OFS_Folder): + def __init__(self, id): + self._setId(id) + OFS_Folder.__init__(self) + + +class CatalogBase: + def setUp(self): + self._vocabulary = Vocabulary.Vocabulary('Vocabulary', 'Vocabulary', + globbing=1) + self._catalog = Catalog() + + def tearDown(self): + self._vocabulary = self._catalog = None + +class TestAddDelColumn(CatalogBase,unittest.TestCase): + def testAdd(self): + self._catalog.addColumn('id') + self.assertEqual(self._catalog.schema.has_key('id'), 1, + 'add column failed') + + def testAddBad(self): + from Products.ZCatalog.Catalog import CatalogError + self.assertRaises(CatalogError, self._catalog.addColumn, '_id') + + def testDel(self): + self._catalog.addColumn('id') + self._catalog.delColumn('id') + self.assert_(self._catalog.schema.has_key('id') != 1, + 'del column failed') + +class TestAddDelIndexes(CatalogBase, unittest.TestCase): + def testAddFieldIndex(self): + idx = FieldIndex('id') + self._catalog.addIndex('id', idx) + self.assert_(isinstance(self._catalog.indexes['id'], + type(FieldIndex('id'))), + 'add field index failed') + + def testAddTextIndex(self): + idx = TextIndex('id') + self._catalog.addIndex('id', idx) + i = self._catalog.indexes['id'] + te = TextIndex('id', None, None, Lexicon()) + self.assert_(isinstance(i, type(te)), 'add text index failed') + + def testAddKeywordIndex(self): + idx = KeywordIndex('id') + self._catalog.addIndex('id', idx) + i = self._catalog.indexes['id'] + self.assert_(isinstance(i, type(KeywordIndex('id'))), + 'add kw index failed') + + def testDelFieldIndex(self): + idx = FieldIndex('id') + self._catalog.addIndex('id', idx) + self._catalog.delIndex('id') + self.assert_(self._catalog.indexes.has_key('id') != 1, + 'del index failed') + + def testDelTextIndex(self): + idx = TextIndex('id') + self._catalog.addIndex('id', idx) + self._catalog.delIndex('id') + self.assert_(self._catalog.indexes.has_key('id') != 1, + 'del index failed') + + def testDelKeywordIndex(self): + idx = KeywordIndex('id') + self._catalog.addIndex('id', idx) + self._catalog.delIndex('id') + self.assert_(self._catalog.indexes.has_key('id') != 1, + 'del index failed') + +# Removed unittests dealing with catalog instantiation and vocabularies +# since catalog no longer creates/manages vocabularies automatically (Casey) + +# Test of the ZCatalog object, as opposed to Catalog + +class zdummy(ExtensionClass.Base): + def __init__(self, num): + self.num = num + + def title(self): + return '%d' % self.num + +class zdummyFalse(zdummy): + + def __nonzero__(self): + return False + + +class TestZCatalog(unittest.TestCase): + + def setUp(self): + from Products.ZCatalog.ZCatalog import ZCatalog + self._catalog = ZCatalog('Catalog') + self._catalog.resolve_path = self._resolve_num + self._catalog.addIndex('title', 'KeywordIndex') + self._catalog.addColumn('title') + + self.upper = 10 + + self.d = {} + for x in range(0, self.upper): + # make uid a string of the number + ob = zdummy(x) + self.d[str(x)] = ob + self._catalog.catalog_object(ob, str(x)) + + def _resolve_num(self, num): + return self.d[num] + + def test_z2interfaces(self): + from Interface.Verify import verifyClass + from Products.ZCatalog.IZCatalog import IZCatalog + from Products.ZCatalog.ZCatalog import ZCatalog + + verifyClass(IZCatalog, ZCatalog) + + def test_z3interfaces(self): + from Products.ZCatalog.interfaces import IZCatalog + from Products.ZCatalog.ZCatalog import ZCatalog + from zope.interface.verify import verifyClass + + verifyClass(IZCatalog, ZCatalog) + + def testGetMetadataForUID(self): + testNum = str(self.upper - 3) # as good as any.. + data = self._catalog.getMetadataForUID(testNum) + self.assertEqual(data['title'], testNum) + + def testGetIndexDataForUID(self): + testNum = str(self.upper - 3) + data = self._catalog.getIndexDataForUID(testNum) + self.assertEqual(data['title'][0], testNum) + + def testSearch(self): + query = {'title': ['5','6','7']} + sr = self._catalog.searchResults(query) + self.assertEqual(len(sr), 3) + sr = self._catalog.search(query) + self.assertEqual(len(sr), 3) + + def testUpdateMetadata(self): + self._catalog.catalog_object(zdummy(1), '1') + data = self._catalog.getMetadataForUID('1') + self.assertEqual(data['title'], '1') + self._catalog.catalog_object(zdummy(2), '1', update_metadata=0) + data = self._catalog.getMetadataForUID('1') + self.assertEqual(data['title'], '1') + self._catalog.catalog_object(zdummy(2), '1', update_metadata=1) + data = self._catalog.getMetadataForUID('1') + self.assertEqual(data['title'], '2') + # update_metadata defaults to true, test that here + self._catalog.catalog_object(zdummy(1), '1') + data = self._catalog.getMetadataForUID('1') + self.assertEqual(data['title'], '1') + + def testReindexIndexDoesntDoMetadata(self): + self.d['0'].num = 9999 + self._catalog.reindexIndex('title', {}) + data = self._catalog.getMetadataForUID('0') + self.assertEqual(data['title'], '0') + + def testReindexIndexesFalse(self): + # setup + false_id = self.upper + 1 + ob = zdummyFalse(false_id) + self.d[str(false_id)] = ob + self._catalog.catalog_object(ob, str(false_id)) + # test, object evaluates to false; there was bug which caused the + # object to be removed from index + ob.num = 9999 + self._catalog.reindexIndex('title', {}) + result = self._catalog(title='9999') + self.assertEquals(1, len(result)) + + +class dummy(ExtensionClass.Base): + att1 = 'att1' + att2 = 'att2' + att3 = ['att3'] + def __init__(self, num): + self.num = num + + def col1(self): + return 'col1' + + def col2(self): + return 'col2' + + def col3(self): + return ['col3'] + +class TestCatalogObject(unittest.TestCase): + + upper = 1000 + + nums = range(upper) + for i in range(upper): + j = random.randrange(0, upper) + tmp = nums[i] + nums[i] = nums[j] + nums[j] = tmp + + def setUp(self): + self._vocabulary = Vocabulary.Vocabulary('Vocabulary','Vocabulary', + globbing=1) + + col1 = FieldIndex('col1') + col2 = TextIndex('col2') + col3 = KeywordIndex('col3') + + self._catalog = Catalog() + self._catalog.addIndex('col1', col1) + self._catalog.addIndex('col2', col2) + self._catalog.addIndex('col3', col3) + self._catalog.addColumn('col1') + self._catalog.addColumn('col2') + self._catalog.addColumn('col3') + + att1 = FieldIndex('att1') + att2 = TextIndex('att2') + att3 = KeywordIndex('att3') + num = FieldIndex('num') + + self._catalog.addIndex('att1', att1) + self._catalog.addIndex('att2', att2) + self._catalog.addIndex('att3', att3) + self._catalog.addIndex('num', num) + self._catalog.addColumn('att1') + self._catalog.addColumn('att2') + self._catalog.addColumn('att3') + self._catalog.addColumn('num') + + for x in range(0, self.upper): + self._catalog.catalogObject(dummy(self.nums[x]), `x`) + self._catalog.aq_parent = dummy('foo') # fake out acquisition + + def tearDown(self): + self._vocabulary = self._catalog = None + + def testResultLength(self): + a = self._catalog() + self.assertEqual(len(a), self.upper, + 'length should be %s, its %s' % (self.upper, len(a))) + + def testEmptyMappingReturnsAll(self): + upper = self.upper + a = self._catalog({}) + self.assertEqual(len(a), upper, + 'length should be %s, its %s' % (upper, len(a))) + # Queries consisting of empty strings should do the same + a = self._catalog({'col1':'', 'col2':'', 'col3':''}) + self.assertEqual(len(a), upper, + 'length should be %s, its %s' % (upper, len(a))) + + def testFieldIndexLength(self): + a = self._catalog(att1='att1') + self.assertEqual(len(a), self.upper, + 'should be %s, but is %s' % (self.upper, len(a))) + + def testTextIndexLength(self): + a = self._catalog(att2='att2') + self.assertEqual(len(a), self.upper, + 'should be %s, but is %s' % (self.upper, len(a))) + + def testKeywordIndexLength(self): + a = self._catalog(att3='att3') + self.assertEqual(len(a), self.upper, + 'should be %s, but is %s' % (self.upper, len(a))) + + def testUncatalogFieldIndex(self): + self.uncatalog() + a = self._catalog(att1='att1') + self.assertEqual(len(a), 0, 'len: %s' % len(a)) + + def testUncatalogTextIndex(self): + self.uncatalog() + a = self._catalog(att2='att2') + self.assertEqual(len(a), 0, 'len: %s' % len(a)) + + def testUncatalogKeywordIndex(self): + self.uncatalog() + a = self._catalog(att3='att3') + self.assertEqual(len(a), 0, 'len: %s' % len(a)) + + def testBadUncatalog(self): + try: + self._catalog.uncatalogObject('asdasdasd') + except: + self.fail('uncatalogObject raised exception on bad uid') + + def testUniqueValuesForLength(self): + a = self._catalog.uniqueValuesFor('att1') + self.assertEqual(len(a), 1, 'bad number of unique values %s' % a) + + def testUniqueValuesForContent(self): + a = self._catalog.uniqueValuesFor('att1') + self.assertEqual(a[0], 'att1', 'bad content %s' % a[0]) + + def testUncatalogTwice(self): + self._catalog.uncatalogObject(`0`) + self.assertRaises(Exception, '_second') + + def testCatalogLength(self): + for x in range(0, self.upper): + self._catalog.uncatalogObject(`x`) + self.assertEqual(len(self._catalog), 0) + + def _second(self): + self._catalog.uncatalogObject(`0`) + + def uncatalog(self): + for x in range(0, self.upper): + self._catalog.uncatalogObject(`x`) + + def testGoodSortIndex(self): + upper = self.upper + a = self._catalog(sort_on='num') + self.assertEqual(len(a), upper, + 'length should be %s, its %s' % (upper, len(a))) + for x in range(self.upper): + self.assertEqual(a[x].num, x) + + def testBadSortIndex(self): + self.assertRaises(CatalogError, self.badsortindex) + + def badsortindex(self): + a = self._catalog(sort_on='foofaraw') + + def testWrongKindOfIndexForSort(self): + self.assertRaises(CatalogError, self.wrongsortindex) + + def wrongsortindex(self): + a = self._catalog(sort_on='att2') + + def testTextIndexQWithSortOn(self): + upper = self.upper + a = self._catalog(sort_on='num', att2='att2') + self.assertEqual(len(a), upper, + 'length should be %s, its %s' % (upper, len(a))) + for x in range(self.upper): + self.assertEqual(a[x].num, x) + + def testTextIndexQWithoutSortOn(self): + upper = self.upper + a = self._catalog(att2='att2') + self.assertEqual(len(a), upper, + 'length should be %s, its %s' % (upper, len(a))) + for x in range(self.upper): + self.assertEqual(a[x].data_record_score_, 1) + + def testKeywordIndexWithMinRange(self): + a = self._catalog(att3={'query': 'att', 'range': 'min'}) + self.assertEqual(len(a), self.upper) + + def testKeywordIndexWithMaxRange(self): + a = self._catalog(att3={'query': 'att35', 'range': ':max'}) + self.assertEqual(len(a), self.upper) + + def testKeywordIndexWithMinMaxRangeCorrectSyntax(self): + a = self._catalog(att3={'query': ['att', 'att35'], 'range': 'min:max'}) + self.assertEqual(len(a), self.upper) + + def testKeywordIndexWithMinMaxRangeWrongSyntax(self): + # checkKeywordIndex with min/max range wrong syntax. + a = self._catalog(att3={'query': ['att'], 'range': 'min:max'}) + self.assert_(len(a) != self.upper) + + def testCombinedTextandKeywordQuery(self): + a = self._catalog(att3='att3', att2='att2') + self.assertEqual(len(a), self.upper) + + def testLargeSortedResultSetWithSmallIndex(self): + # This exercises the optimization in the catalog that iterates + # over the sort index rather than the result set when the result + # set is much larger than the sort index. + a = self._catalog(sort_on='att1') + self.assertEqual(len(a), self.upper) + + def testBadSortLimits(self): + self.assertRaises( + AssertionError, self._catalog, sort_on='num', sort_limit=0) + self.assertRaises( + AssertionError, self._catalog, sort_on='num', sort_limit=-10) + + def testSortLimit(self): + full = self._catalog(sort_on='num') + a = self._catalog(sort_on='num', sort_limit=10) + self.assertEqual([r.num for r in a], [r.num for r in full[:10]]) + self.assertEqual(a.actual_result_count, self.upper) + a = self._catalog(sort_on='num', sort_limit=10, sort_order='reverse') + rev = [r.num for r in full[-10:]] + rev.reverse() + self.assertEqual([r.num for r in a], rev) + self.assertEqual(a.actual_result_count, self.upper) + + def testBigSortLimit(self): + a = self._catalog(sort_on='num', sort_limit=self.upper*3) + self.assertEqual(a.actual_result_count, self.upper) + self.assertEqual(a[0].num, 0) + a = self._catalog( + sort_on='num', sort_limit=self.upper*3, sort_order='reverse') + self.assertEqual(a.actual_result_count, self.upper) + self.assertEqual(a[0].num, self.upper - 1) + + def testUpdateMetadataFalse(self): + ob = dummy(9999) + self._catalog.catalogObject(ob, `9999`) + brain = self._catalog(num=9999)[0] + self.assertEqual(brain.att1, 'att1') + ob.att1 = 'foobar' + self._catalog.catalogObject(ob, `9999`, update_metadata=0) + brain = self._catalog(num=9999)[0] + self.assertEqual(brain.att1, 'att1') + self._catalog.catalogObject(ob, `9999`) + brain = self._catalog(num=9999)[0] + self.assertEqual(brain.att1, 'foobar') + + +class objRS(ExtensionClass.Base): + + def __init__(self,num): + self.number = num + +class TestRS(unittest.TestCase): + + def setUp(self): + self._vocabulary = Vocabulary.Vocabulary('Vocabulary','Vocabulary' + , globbing=1) + self._catalog = Catalog() + index = FieldIndex('number') + self._catalog.addIndex('number', index) + self._catalog.addColumn('number') + + for i in range(5000): + obj = objRS(random.randrange(0,20000)) + self._catalog.catalogObject(obj,i) + + self._catalog.aq_parent = objRS(200) + + def testRangeSearch(self): + for i in range(1000): + m = random.randrange(0,20000) + n = m + 1000 + + for r in self._catalog.searchResults( + number= {'query': (m,n) , 'range' : 'min:max' } ): + size = r.number + self.assert_(m<=size and size<=n, + "%d vs [%d,%d]" % (r.number,m,n)) + +class TestMerge(unittest.TestCase): + # Test merging results from multiple catalogs + + def setUp(self): + vocabulary = Vocabulary.Vocabulary( + 'Vocabulary','Vocabulary', globbing=1) + self.catalogs = [] + for i in range(3): + cat = Catalog() + cat.addIndex('num', FieldIndex('num')) + cat.addIndex('big', FieldIndex('big')) + cat.addIndex('title', TextIndex('title')) + cat.vocabulary = vocabulary + cat.aq_parent = zdummy(16336) + for i in range(10): + obj = zdummy(i) + obj.big = i > 5 + cat.catalogObject(obj, str(i)) + self.catalogs.append(cat) + + def testNoFilterOrSort(self): + from Products.ZCatalog.Catalog import mergeResults + results = [cat.searchResults(_merge=0) for cat in self.catalogs] + merged_rids = [r.getRID() for r in mergeResults( + results, has_sort_keys=False, reverse=False)] + expected = [r.getRID() for r in chain(*results)] + self.assertEqual(sort(merged_rids), sort(expected)) + + def testSortedOnly(self): + from Products.ZCatalog.Catalog import mergeResults + results = [cat.searchResults(sort_on='num', _merge=0) + for cat in self.catalogs] + merged_rids = [r.getRID() for r in mergeResults( + results, has_sort_keys=True, reverse=False)] + expected = sort(chain(*results)) + expected = [rid for sortkey, rid, getitem in expected] + self.assertEqual(merged_rids, expected) + + def testSortReverse(self): + from Products.ZCatalog.Catalog import mergeResults + results = [cat.searchResults(sort_on='num', _merge=0) + for cat in self.catalogs] + merged_rids = [r.getRID() for r in mergeResults( + results, has_sort_keys=True, reverse=True)] + expected = sort(chain(*results)) + expected.reverse() + expected = [rid for sortkey, rid, getitem in expected] + self.assertEqual(merged_rids, expected) + + def testLimitSort(self): + from Products.ZCatalog.Catalog import mergeResults + results = [cat.searchResults(sort_on='num', sort_limit=2, _merge=0) + for cat in self.catalogs] + merged_rids = [r.getRID() for r in mergeResults( + results, has_sort_keys=True, reverse=False)] + expected = sort(chain(*results)) + expected = [rid for sortkey, rid, getitem in expected] + self.assertEqual(merged_rids, expected) + + def testScored(self): + from Products.ZCatalog.Catalog import mergeResults + results = [cat.searchResults(title='4 or 5 or 6', _merge=0) + for cat in self.catalogs] + merged_rids = [r.getRID() for r in mergeResults( + results, has_sort_keys=True, reverse=False)] + expected = sort(chain(*results)) + expected = [rid for sortkey, (nscore, score, rid), getitem in expected] + self.assertEqual(merged_rids, expected) + + def testSmallIndexSort(self): + # Test that small index sort optimization is not used for merging + from Products.ZCatalog.Catalog import mergeResults + results = [cat.searchResults(sort_on='big', _merge=0) + for cat in self.catalogs] + merged_rids = [r.getRID() for r in mergeResults( + results, has_sort_keys=True, reverse=False)] + expected = sort(chain(*results)) + expected = [rid for sortkey, rid, getitem in expected] + self.assertEqual(merged_rids, expected) + + +class PickySecurityManager: + def __init__(self, badnames=[]): + self.badnames = badnames + def validateValue(self, value): + return 1 + def validate(self, accessed, container, name, value): + if name not in self.badnames: + return 1 + raise Unauthorized(name) + + +class TestZCatalogGetObject(unittest.TestCase): + # Test what objects are returned by brain.getObject() + + _old_flag = None + + def setUp(self): + from Products.ZCatalog.ZCatalog import ZCatalog + catalog = ZCatalog('catalog') + catalog.addIndex('id', 'FieldIndex') + root = Folder('') + root.getPhysicalRoot = lambda: root + self.root = root + self.root.catalog = catalog + + def tearDown(self): + noSecurityManager() + if self._old_flag is not None: + self._restore_getObject_flag() + + def _init_getObject_flag(self, flag): + from Products.ZCatalog import CatalogBrains + self._old_flag = CatalogBrains.GETOBJECT_RAISES + CatalogBrains.GETOBJECT_RAISES = flag + + def _restore_getObject_flag(self): + from Products.ZCatalog import CatalogBrains + CatalogBrains.GETOBJECT_RAISES = self._old_flag + + def test_getObject_found(self): + # Check normal traversal + root = self.root + catalog = root.catalog + root.ob = Folder('ob') + catalog.catalog_object(root.ob) + brain = catalog.searchResults()[0] + self.assertEqual(brain.getPath(), '/ob') + self.assertEqual(brain.getObject().getId(), 'ob') + + def test_getObject_missing_raises_NotFound(self): + # Check that if the object is missing we raise + from zExceptions import NotFound + self._init_getObject_flag(True) + root = self.root + catalog = root.catalog + root.ob = Folder('ob') + catalog.catalog_object(root.ob) + brain = catalog.searchResults()[0] + del root.ob + self.assertRaises((NotFound, AttributeError, KeyError), brain.getObject) + + def test_getObject_restricted_raises_Unauthorized(self): + # Check that if the object's security does not allow traversal, + # None is returned + from zExceptions import NotFound + self._init_getObject_flag(True) + root = self.root + catalog = root.catalog + root.fold = Folder('fold') + root.fold.ob = Folder('ob') + catalog.catalog_object(root.fold.ob) + brain = catalog.searchResults()[0] + # allow all accesses + pickySecurityManager = PickySecurityManager() + setSecurityManager(pickySecurityManager) + self.assertEqual(brain.getObject().getId(), 'ob') + # disallow just 'ob' access + pickySecurityManager = PickySecurityManager(['ob']) + setSecurityManager(pickySecurityManager) + self.assertRaises(Unauthorized, brain.getObject) + # disallow just 'fold' access + pickySecurityManager = PickySecurityManager(['fold']) + setSecurityManager(pickySecurityManager) + ob = brain.getObject() + self.failIf(ob is None) + self.assertEqual(ob.getId(), 'ob') + + def test_getObject_missing_returns_None(self): + # Check that if the object is missing None is returned + self._init_getObject_flag(False) + root = self.root + catalog = root.catalog + root.ob = Folder('ob') + catalog.catalog_object(root.ob) + brain = catalog.searchResults()[0] + del root.ob + self.assertEqual(brain.getObject(), None) + + def test_getObject_restricted_returns_None(self): + # Check that if the object's security does not allow traversal, + # None is returned + self._init_getObject_flag(False) + root = self.root + catalog = root.catalog + root.fold = Folder('fold') + root.fold.ob = Folder('ob') + catalog.catalog_object(root.fold.ob) + brain = catalog.searchResults()[0] + # allow all accesses + pickySecurityManager = PickySecurityManager() + setSecurityManager(pickySecurityManager) + self.assertEqual(brain.getObject().getId(), 'ob') + # disallow just 'ob' access + pickySecurityManager = PickySecurityManager(['ob']) + setSecurityManager(pickySecurityManager) + self.assertEqual(brain.getObject(), None) + # disallow just 'fold' access + pickySecurityManager = PickySecurityManager(['fold']) + setSecurityManager(pickySecurityManager) + ob = brain.getObject() + self.failIf(ob is None) + self.assertEqual(ob.getId(), 'ob') + + # Now test _unrestrictedGetObject + + def test_unrestrictedGetObject_found(self): + # Check normal traversal + root = self.root + catalog = root.catalog + root.ob = Folder('ob') + catalog.catalog_object(root.ob) + brain = catalog.searchResults()[0] + self.assertEqual(brain.getPath(), '/ob') + self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob') + + def test_unrestrictedGetObject_restricted(self): + # Check that if the object's security does not allow traversal, + # it's still is returned + root = self.root + catalog = root.catalog + root.fold = Folder('fold') + root.fold.ob = Folder('ob') + catalog.catalog_object(root.fold.ob) + brain = catalog.searchResults()[0] + # allow all accesses + pickySecurityManager = PickySecurityManager() + setSecurityManager(pickySecurityManager) + self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob') + # disallow just 'ob' access + pickySecurityManager = PickySecurityManager(['ob']) + setSecurityManager(pickySecurityManager) + self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob') + # disallow just 'fold' access + pickySecurityManager = PickySecurityManager(['fold']) + setSecurityManager(pickySecurityManager) + self.assertEqual(brain._unrestrictedGetObject().getId(), 'ob') + + def test_unrestrictedGetObject_missing_raises_NotFound(self): + # Check that if the object is missing we raise + from zExceptions import NotFound + self._init_getObject_flag(True) + root = self.root + catalog = root.catalog + root.ob = Folder('ob') + catalog.catalog_object(root.ob) + brain = catalog.searchResults()[0] + del root.ob + self.assertRaises((NotFound, AttributeError, KeyError), + brain._unrestrictedGetObject) + + def test_unrestrictedGetObject_missing_returns_None(self): + # Check that if the object is missing None is returned + self._init_getObject_flag(False) + root = self.root + catalog = root.catalog + root.ob = Folder('ob') + catalog.catalog_object(root.ob) + brain = catalog.searchResults()[0] + del root.ob + self.assertEqual(brain._unrestrictedGetObject(), None) + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest( unittest.makeSuite( TestAddDelColumn ) ) + suite.addTest( unittest.makeSuite( TestAddDelIndexes ) ) + suite.addTest( unittest.makeSuite( TestZCatalog ) ) + suite.addTest( unittest.makeSuite( TestCatalogObject ) ) + suite.addTest( unittest.makeSuite( TestRS ) ) + suite.addTest( unittest.makeSuite( TestMerge ) ) + suite.addTest( unittest.makeSuite( TestZCatalogGetObject ) ) + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite')