forked from ome/openmicroscopy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fsMonitorServer.py
257 lines (184 loc) · 8.39 KB
/
fsMonitorServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
OMERO.fs MonitorServer module.
Copyright 2009 University of Dundee. All rights reserved.
Use is subject to license terms supplied in LICENSE.txt
"""
import logging
import sys, traceback
import socket
import omero_ext.uuid as uuid # see ticket:3774
try:
from hashlib import sha1 as sha
except:
from sha import sha
# Third party path package. It provides much of the
# functionality of os.path but without the complexity.
# Imported as pathModule to avoid potential clashes.
import path as pathModule
from fsMonitor import MonitorFactory
import omero.all
import omero.grid.monitors as monitors
class MonitorServerI(monitors.MonitorServer):
"""
Co-ordinates a number of Monitors
:group Constructor: __init__
:group Methods exposed in Slice: createMonitor, startMonitor, stopMonitor, destroyMonitor,
getMonitorState
:group Other methods: _getNextMonitorId, callback
"""
def __init__(self):
"""
Intialise the instance variables.
"""
self.log = logging.getLogger("fsserver."+__name__)
#self.fsPrefix = 'omero-fs://' + socket.gethostbyname(socket.gethostname())
#self.prefixLne = len(self.fsPrefix) ## Save calculating it each time?
#: Numerical component of a Monitor Id
self.monitorId = 0
#: Dictionary of Monitors by Id
self.monitors = {}
#: Dictionary of MonitorClientI proxies by Id
self.proxies = {}
"""
Methods published in the slice interface omerofs.ice
"""
def createMonitor(self, mType, eTypes, pMode, pathString, whitelist, blacklist, timeout, blockSize, ignoreSysFiles, ignoreDirEvents, proxy, current=None):
"""
Create a the Monitor for a given path.
:Parameters:
mType :
The type of monitor.
eTypes :
A list of the event types to be monitored.
pMode :
The mode of directory monitoring: flat, recursive or following.
pathString : string
A string representing a path to be monitored.
whitelist : list<string>
A list of extensions of interest.
blacklist : list<string>
A list of subdirectories to be excluded.
timeout : float
A timeout used by some types of monitor.
blockSize : intt
Number of events to pack into one notification.
ignoreSysFiles : boolean
Flag. If true platform-dependent system files are ignored
ignoreDirEvents : boolean
Flag. If true directory events are ignored
proxy :
A proxy to be informed of events
current
An ICE context, this parameter is required to be present
in an ICE interface method.
:return: Monitor Id.
:rtype: string
"""
monitorId = self._getNextMonitorId()
try:
# blockSize (0) and ignoreDirEvents (True) hardwired until slice is changed.
self.monitors[monitorId] = MonitorFactory.createMonitor(mType, eTypes, pMode, pathString,
whitelist, blacklist, timeout, blockSize,
ignoreSysFiles, ignoreDirEvents, self, monitorId)
except Exception, e:
self.log.exception('Failed to create monitor: ')
raise omero.OmeroFSError(reason='Failed to create monitor: ' + str(e))
self.proxies[monitorId] = proxy
self.log.info('Monitor id = ' + monitorId + ' created. Proxy: ' + str(proxy))
return monitorId
def startMonitor(self, id, current=None):
"""
Start the Monitor with the given Id.
:Parameters:
id : string
A string uniquely identifying a Monitor.
current
An ICE context, this parameter is required to be present
in an ICE interface method.
:return: Success status.
:rtype: boolean
"""
try:
self.monitors[id].start()
self.log.info('Monitor id = ' + id + ' started')
except Exception, e:
self.log.error('Monitor id = ' + id + ' failed to start: ' + str(e))
raise omero.OmeroFSError(reason='Monitor id = ' + id + ' failed to start: ' + str(e))
def stopMonitor(self, id, current=None):
"""
Stop the Monitor with the given Id.
:Parameters:
id : string
A string uniquely identifying a Monitor.
current
An ICE context, this parameter is required to be present
in an ICE interface method.
:return: Success status.
:rtype: boolean
"""
try:
self.monitors[id].stop()
self.log.info('Monitor id = ' + id + ' stopped')
except Exception, e:
self.log.error('Monitor id = ' + id + ' failed to stop: ' + str(e))
raise omero.OmeroFSError(reason='Monitor id = ' + id + ' failed to stop: ' + str(e))
def destroyMonitor(self, id, current=None):
"""
Destroy the Monitor with the given Id.
:Parameters:
id : string
A string uniquely identifying a Monitor.
current
An ICE context, this parameter is required to be present
in an ICE interface method.
:return: Success status.
:rtype: boolean
"""
try:
del self.monitors[id]
del self.proxies[id]
self.log.info('Monitor id = ' + id + ' destroyed')
except Exception, e:
self.log.error('Monitor id = ' + id + ' not destroyed: ' + str(e))
raise omero.OmeroFSError(reason='Monitor id = ' + id + ' not destroyed: ' + str(e))
def getMonitorState(self, id):
"""
Get the state of a monitor.
Return the state of an existing monitor.
Raise an exception if the monitor does no exist.
"""
self.log.info('Monitor id = ' + id + ' state requested')
# ***** TO BE IMPLEMENTED *****
# If monitor exists return state
# otherwise raise an exception (no subscription).
# (and ICE exception implies no server)
raise omero.OmeroFSError('Method not yet implemented.')
def _getNextMonitorId(self):
"""
Return next monitor ID and increment.
The monitorID is a unique key to identify a monitor on the
file system. In the present implementation this is a string
generated by uuid.uuid1()
:return: Next monitor Id
:rtype: string
"""
return str(uuid.uuid1())
def callback(self, monitorId, fileList):
"""
Callback required by FSEvents.FSEventStream.
:Parameters:
:return: No explicit return value.
"""
eventList = []
for fileEvent in fileList:
info = monitors.EventInfo(fileEvent[0],fileEvent[1])
eventList.append(info)
proxy = self.proxies[monitorId]
try:
self.log.info('Event notification on monitor id= %s', monitorId)
self.log.debug(' ...notifications are: %s', str(eventList))
proxy.fsEventHappened(monitorId, eventList)
except Exception, e:
self.log.info('Callback to monitor id=' + monitorId + ' failed. Reason: ' + str(e))