-
Notifications
You must be signed in to change notification settings - Fork 336
/
pyro_wrapper.py
272 lines (218 loc) · 9.12 KB
/
pyro_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2009-2012:
# Gabes Jean, naparuba@gmail.com
# Gerhard Lausser, Gerhard.Lausser@consol.de
# Gregory Starck, g.starck@gmail.com
# Hartmut Goebel, h.goebel@goebel-consult.de
#
# This file is part of Shinken.
#
# Shinken is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Shinken is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Shinken. If not, see <http://www.gnu.org/licenses/>.
import select
import errno
import time
import socket
from log import logger
# Try to import Pyro (3 or 4.1) and if not, Pyro4 (4.2 and 4.3)
try:
import Pyro
import Pyro.core
except ImportError: # ok, no Pyro3, maybe 4
import Pyro4 as Pyro
""" This class is a wrapper for managing Pyro 3 and 4 version """
class InvalidWorkDir(Exception): pass
class PortNotFree(Exception): pass
PYRO_VERSION = 'UNKNOWN'
# Try to see if we are Python 3 or 4
try:
Pyro.core.ObjBase
# Some one already go here, so we are in 4 if None
if Pyro.core.ObjBase is None:
raise AttributeError
PYRO_VERSION = Pyro.constants.VERSION
Pyro.errors.CommunicationError = Pyro.errors.ProtocolError
Pyro.errors.TimeoutError = Pyro.errors.ProtocolError
class Pyro3Daemon(Pyro.core.Daemon):
pyro_version = 3
protocol = 'PYROLOC'
def __init__(self, host, port, use_ssl=False):
self.port = port
# Port = 0 means "I don't want pyro"
if self.port == 0:
return
try:
Pyro.core.initServer()
except (OSError, IOError), e: # must be problem with workdir:
raise InvalidWorkDir(e)
# Set the protocol as asked (ssl or not)
if use_ssl:
prtcol = 'PYROSSL'
else:
prtcol = 'PYRO'
logger.info("Initializing Pyro connection with host:%s port:%s ssl:%s" % (host, port, str(use_ssl)))
# Now the real start
try:
Pyro.core.Daemon.__init__(self, host=host, port=port, prtcol=prtcol, norange=True)
except OSError, e:
# must be problem with workdir:
raise InvalidWorkDir(e)
except Pyro.errors.DaemonError, e:
msg = "Error: Sorry, the port %d is not free: %s" % (port, e)
raise PortNotFree(msg)
def register(self, obj, name):
return self.connect(obj, name)
def unregister(self, obj):
try:
self.disconnect(obj)
except Exception:
pass
def get_sockets(self):
if self.port != 0:
return self.getServerSockets()
return []
def handleRequests(self, s):
try:
Pyro.core.Daemon.handleRequests(self)
# Sometime Pyro send us xml pickling implementation (gnosis) is not available
# and I don't know why... :(
except NotImplementedError:
pass
def create_uri(address, port, obj_name, use_ssl):
if not use_ssl:
return "PYROLOC://%s:%d/%s" % (address, port, obj_name)
else:
return "PYROLOCSSL://%s:%d/%s" % (address, port, obj_name)
# Timeout way is also changed between 3 and 4
# it's a method in 3, a property in 4
def set_timeout(con, timeout):
con._setTimeout(timeout)
def getProxy(uri):
return Pyro.core.getProxyForURI(uri)
# Shutdown in 3 take True as arg
def shutdown(con):
con.shutdown(True)
PyroClass = Pyro3Daemon
except AttributeError, exp:
PYRO_VERSION = Pyro.constants.VERSION
# Ok, in Pyro 4, interface do not need to
# inherit from ObjBase, just a dummy class is good
Pyro.core.ObjBase = dict
Pyro.errors.URIError = Pyro.errors.ProtocolError
Pyro.core.getProxyForURI = Pyro.core.Proxy
Pyro.config.HMAC_KEY = "NOTSET"
old_versions = ["4.1", "4.2", "4.3", "4.4"]
# Version not supported for now, we have to work on it
bad_versions = []
last_known_working_version = "4.14"
msg_waitall_issue_versions = ["4.1", "4.2", "4.3", "4.4", "4.5", "4.6", "4.7", '4.8',
'4.9', '4.10', '4.11', '4.12', '4.13']
class Pyro4Daemon(Pyro.core.Daemon):
pyro_version = 4
protocol = 'PYRO'
def __init__(self, host, port, use_ssl=False):
self.port = port
# Port = 0 means "I don't want pyro"
if self.port == 0:
return
# Some version with Pyro got problems with the socket.MSG_WAITALL
# It was "solved" in 4.14. But before this, just delete it
if PYRO_VERSION in msg_waitall_issue_versions:
if hasattr(socket, 'MSG_WAITALL'):
del socket.MSG_WAITALL
# Pyro 4 is by default a thread, should do select
# (I hate threads!)
# And of course the name changed since 4.5...
# Since then, we got a better sock reuse, so
# before 4.5 we must wait 35 s for the port to stop
# and in >=4.5 we can use REUSE socket :)
max_try = 35
if PYRO_VERSION in old_versions:
Pyro.config.SERVERTYPE = "select"
elif PYRO_VERSION in bad_versions:
logger.error("Your pyro version (%s) is not supported. Please install version (%s) " % (PYRO_VERSION, last_known_working_version))
exit(1)
else:
Pyro.config.SERVERTYPE = "multiplex"
# For Pyro >4.X hash
if hasattr(Pyro.config, 'SOCK_REUSE'):
Pyro.config.SOCK_REUSE = True
max_try = 1
nb_try = 0
is_good = False
# Ok, Pyro4 do not close sockets like it should,
# so we got TIME_WAIT socket :(
# so we allow to retry during 35 sec (30 sec is the default
# timewait for close sockets)
while nb_try < max_try:
nb_try += 1
logger.info("Initializing Pyro connection with host:%s port:%s ssl:%s" % (host, port, str(use_ssl)))
# And port already use now raise an exception
try:
Pyro.core.Daemon.__init__(self, host=host, port=port)
# Ok, we got our daemon, we can exit
break
except socket.error, exp:
msg = "Error: Sorry, the port %d is not free: %s" % (port, str(exp))
# At 35 (or over), we are very not happy
if nb_try >= max_try:
raise PortNotFree(msg)
logger.error(msg + "but we try another time in 1 sec")
time.sleep(1)
except Exception, e:
# must be a problem with pyro workdir:
raise InvalidWorkDir(e)
# Get the server socket but not if disabled
def get_sockets(self):
if self.port == 0:
return []
if PYRO_VERSION in old_versions:
return self.sockets()
else:
return self.sockets
def handleRequests(self, s):
if PYRO_VERSION in old_versions:
Pyro.core.Daemon.handleRequests(self, [s])
else:
Pyro.core.Daemon.events(self, [s])
def create_uri(address, port, obj_name, use_ssl=False):
return "PYRO:%s@%s:%d" % (obj_name, address, port)
def set_timeout(con, timeout):
con._pyroTimeout = timeout
def getProxy(uri):
return Pyro.core.Proxy(uri)
# Shutdown in 4 do not take arg
def shutdown(con):
con.shutdown()
con.close()
PyroClass = Pyro4Daemon
class ShinkenPyroDaemon(PyroClass):
"""Class for wrapping select calls for Pyro"""
locationStr = '__NOTSET__' # To by pass a bug in Pyro, this should be set in __init__, but
# if we try to print an uninitialized object, it's not happy
objectsById = [] # Same here...
def get_socks_activity(self, timeout):
try:
ins, _, _ = select.select(self.get_sockets(), [], [], timeout)
except select.error, e:
errnum, _ = e
if errnum == errno.EINTR:
return []
raise
return ins
# Common exceptions to be catch
Pyro_exp_pack = (Pyro.errors.ProtocolError, Pyro.errors.URIError, \
Pyro.errors.CommunicationError, \
Pyro.errors.DaemonError, Pyro.errors.ConnectionClosedError, \
Pyro.errors.TimeoutError, Pyro.errors.NamingError)