-
Notifications
You must be signed in to change notification settings - Fork 3
/
appsetup.py
255 lines (194 loc) · 7.09 KB
/
appsetup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Code to initialize the application server."""
import ZODB.ActivityMonitor
import ZODB.interfaces
import zope.interface
import zope.component
import zope.component.hooks
from zope.security.interfaces import IParticipation
from zope.security.management import system_user
import zope.processlifetime
@zope.interface.implementer(IParticipation)
class SystemConfigurationParticipation(object):
principal = system_user
interaction = None
_configured = False
def config(file, features=(), execute=True):
r"""Execute the ZCML configuration file.
This procedure defines the global site setup. Optionally you can also
provide a list of features that are inserted in the configuration context
before the execution is started.
Let's create a trivial sample ZCML file.
>>> import tempfile
>>> fn = tempfile.mktemp('.zcml')
>>> zcml = open(fn, 'w')
>>> written = zcml.write('''
... <configure xmlns:meta="http://namespaces.zope.org/meta"
... xmlns:zcml="http://namespaces.zope.org/zcml">
... <meta:provides feature="myFeature" />
... <configure zcml:condition="have myFeature2">
... <meta:provides feature="myFeature4" />
... </configure>
... </configure>
... ''')
>>> zcml.close()
We can now pass the file into the `config()` function:
# End an old interaction first
>>> from zope.security.management import endInteraction
>>> endInteraction()
>>> context = config(fn, features=('myFeature2', 'myFeature3'))
>>> context.hasFeature('myFeature')
True
>>> context.hasFeature('myFeature2')
True
>>> context.hasFeature('myFeature3')
True
>>> context.hasFeature('myFeature4')
True
Further, we should have access to the configuration file name and context
now:
>>> getConfigSource() is fn
True
>>> getConfigContext() is context
True
Let's now clean up by removing the temporary file:
>>> import os
>>> os.remove(fn)
"""
global _configured
global __config_source
__config_source = file
if _configured:
return
from zope.configuration import xmlconfig, config
# Set user to system_user, so we can do anything we want
from zope.security.management import newInteraction
newInteraction(SystemConfigurationParticipation())
# Hook up custom component architecture calls
zope.component.hooks.setHooks()
# Load server-independent site config
context = config.ConfigurationMachine()
xmlconfig.registerCommonDirectives(context)
for feature in features:
context.provideFeature(feature)
context = xmlconfig.file(file, context=context, execute=execute)
# Reset user
from zope.security.management import endInteraction
endInteraction()
_configured = execute
global __config_context
__config_context = context
return context
def database(db):
"""Load ZODB database from Python module or FileStorage file"""
if type(db) is str:
# Database name
if db.endswith('.py'):
# Python source, exec it
globals = {}
exec(compile(open(db).read(), db, 'exec'), globals)
if 'DB' in globals:
db = globals['DB']
else:
storage = globals['Storage']
from ZODB.DB import DB
db = DB(storage, cache_size=4000)
elif db.endswith(".fs"):
from ZODB.FileStorage import FileStorage
from ZODB.DB import DB
storage = FileStorage(db)
db = DB(storage, cache_size=4000)
# The following will fail unless the application has been configured.
from zope.event import notify
notify(zope.processlifetime.DatabaseOpened(db))
return db
def multi_database(database_factories):
"""Set up a multi-database from an iterable of database factories
Return a sequence of databases, and a mapping of from database name to
database.
>>> class DB:
... def __init__(self, number):
... self.number = number
... def __repr__(self):
... return "DB(%s)" % self.number
... def getActivityMonitor(self):
... return self._activity_monitor
... def setActivityMonitor(self, am):
... self._activity_monitor = am
>>> class Factory:
... def __init__(self, name, number):
... self.name = name
... self.number = number
... def open(self):
... return DB(self.number)
>>> s, m = multi_database(
... [Factory(None, 3), Factory('y', 2), Factory('x', 1)])
>>> list(s)
[DB(3), DB(2), DB(1)]
>>> [d.database_name for d in s]
['', 'y', 'x']
>>> [d.databases is m for d in s]
[True, True, True]
>>> items = m.items()
>>> sorted(list(items))
[('', DB(3)), ('x', DB(1)), ('y', DB(2))]
Each of the databases is registered as an IDatabase utility:
>>> from zope import component
>>> [(component.getUtility(ZODB.interfaces.IDatabase, name) is m[name])
... for name in m]
[True, True, True]
And has an activity monitor:
>>> [isinstance(db.getActivityMonitor(),
... ZODB.ActivityMonitor.ActivityMonitor)
... for db in m.values()]
[True, True, True]
"""
databases = {}
result = []
for factory in database_factories:
name = factory.name or ''
if name in databases:
raise ValueError("Duplicate database name: %r" % name)
db = factory.open()
db.databases = databases
db.database_name = name
databases[name] = db
# Grrr bug in ZODB. Database doesn't declare that it implements
# IDatabase.
if not ZODB.interfaces.IDatabase.providedBy(db):
zope.interface.directlyProvides(db, ZODB.interfaces.IDatabase)
zope.component.provideUtility(db, ZODB.interfaces.IDatabase, name)
db.setActivityMonitor(ZODB.ActivityMonitor.ActivityMonitor())
result.append(db)
return result, databases
__config_context = None
def getConfigContext():
return __config_context
__config_source = None
def getConfigSource():
return __config_source
def reset():
global _configured
_configured = False
global __config_source
__config_source = None
global __config_context
__config_context = None
try:
import zope.testing.cleanup
except ImportError:
pass
else:
zope.testing.cleanup.addCleanUp(reset)