forked from npshub/mantid
-
Notifications
You must be signed in to change notification settings - Fork 0
/
funcinspect.py
353 lines (301 loc) · 14.2 KB
/
funcinspect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
# Mantid Repository : https://github.com/mantidproject/mantid
#
# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source,
# Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
# SPDX - License - Identifier: GPL - 3.0 +
"""
Defines functions that can be used to inspect the properties of a
function call. For example
lhs_info() can be used to get retrieve the names and number of
arguments that are being assigned to a function
return
"""
import opcode
import inspect
import sys
import dis
def replace_signature(func, signature):
"""
Replace the signature of the given function object with that given by
varnames
:param func: Function whose signature is to be replaced
:param signature: A tuple of names of arguments and local variables in Python 2
or a Signature object in Python 3
"""
if hasattr(signature, 'parameters'):
# A new Signature object
setattr(func, '__signature__', signature)
else:
# Drop this code when we dro Python 2 support
# Code object is different in Python 3
if hasattr(func, 'func_code'):
# Version 2
code_attr = 'func_code'
f = func.func_code
c = f.__new__(f.__class__, f.co_argcount, f.co_nlocals, f.co_stacksize,
f.co_flags, f.co_code, f.co_consts, f.co_names,
signature, f.co_filename, f.co_name, f.co_firstlineno,
f.co_lnotab, f.co_freevars)
else:
code_attr = '__code__'
f = func.__code__
new_args = [f.__class__, f.co_argcount, f.co_kwonlyargcount,
f.co_nlocals, f.co_stacksize, f.co_flags, f.co_code,
f.co_consts, f.co_names, signature,
f.co_filename, f.co_name, f.co_firstlineno,
f.co_lnotab, f.co_freevars]
# Python 3.8 supports positional-only arguments and has an extra
# keyword in the constructor
if hasattr(f, 'co_posonlyargcount'):
new_args.insert(2, f.co_posonlyargcount)
c = f.__new__(*new_args)
#endif
setattr(func, code_attr, c)
def customise_func(func, name, signature, docstring):
"""
Takes the definition of the algorithm function and replaces
the attributes of the instance to make it look like a handwritten
function definition
:param func: A function object holding the definition
:param name: The name of the algorithm
:param signature: A new signature for the function. Expecting a 2-tuple
of arguments and local variables. See _replace_signature
:param docstring: A string containing the function documentation
"""
func.__name__ = str(name)
func.__doc__ = docstring
func.__signature__ = signature
return func
#-------------------------------------------------------------------------------
class LazyFunctionSignature(inspect.Signature):
"""
Allows for lazy access to the signature of a function, only generating it when it is requested
to reduce the time spent initialising algorithms.
"""
__slots__ = ('_alg_name', '__sig')
def __init__(self, *args, **kwargs):
if "alg_name" not in kwargs:
super().__init__(*args, **kwargs)
self.__sig = self
else:
self._alg_name = kwargs.pop("alg_name")
self.__sig = None
@property
def _signature(self):
if self.__sig is None:
self.__sig = self._create_signature(self._alg_name)
return self.__sig
def __getattr__(self, item):
# Called for each attribute access.
if item in LazyFunctionSignature.__slots__:
return getattr(self, item)
else:
return getattr(self._signature, item)
def _create_signature(self, alg_name):
from inspect import Signature
return Signature(self._create_parameters(alg_name))
def _create_parameters(self, alg_name):
from mantid.api import AlgorithmManager
alg_object = AlgorithmManager.Instance().createUnmanaged(alg_name)
alg_object.initialize()
from inspect import Parameter
pos_or_keyword = Parameter.POSITIONAL_OR_KEYWORD
parameters = []
for name in alg_object.mandatoryProperties():
prop = alg_object.getProperty(name)
# Mandatory parameters are those for which the default value is not valid
if isinstance(prop.isValid, str):
valid_str = prop.isValid
else:
valid_str = prop.isValid()
if len(valid_str) > 0:
parameters.append(Parameter(name, pos_or_keyword))
else:
# None is not quite accurate here, but we are reproducing the
# behavior found in the C++ code for SimpleAPI.
parameters.append(Parameter(name, pos_or_keyword, default=None))
# Add a self parameter since these are called from a class.
parameters.insert(0, Parameter("self", Parameter.POSITIONAL_ONLY))
return parameters
class LazyMethodSignature(LazyFunctionSignature):
"""
Alternate LazyFunctionSignature intended for use in workspace methods. Replaces the input workspace
parameter with self.
"""
def _create_parameters(self, alg_name):
from inspect import Parameter
parameters = super()._create_parameters(alg_name)
try:
parameters.pop(0)
except IndexError:
pass
parameters.insert(0, Parameter("self", Parameter.POSITIONAL_ONLY))
return parameters
# -------------------------------------------------------------------------------
def decompile(code_object):
"""
Taken from
http://thermalnoise.wordpress.com/2007/12/30/exploring-python-bytecode/
Extracts dissasembly information from the byte code and stores it in
a list for further use.
Call signature(s):
instructions=decompile(f.f_code)
Required arguments:
========= =====================================================================
code_object A bytecode object extracted with inspect.currentframe()
or any other mechanism that returns byte code.
Optional keyword arguments: NONE
Outputs:
========= =====================================================================
instructions a list of offsets, op_codes, names, arguments,
argument_value which can be deconstructed to find out various things
about a function call.
Example:
# Two frames back so that we get the callers' caller
f = inspect.currentframe().f_back.f_back
i = f.f_lasti # index of the last attempted instruction in byte code
ins = decompile(f.f_code)
"""
instructions = []
for ins in dis.get_instructions(code_object):
instructions.append( (ins.offset, ins.opcode, ins.opname, ins.arg, ins.argval) )
return instructions
#-------------------------------------------------------------------------------
# A must list all of the operators that behave like a function calls in byte-code
# This is for the lhs functionality
__operator_names = set(['CALL_FUNCTION', 'CALL_FUNCTION_VAR', 'CALL_FUNCTION_KW',
'CALL_FUNCTION_VAR_KW','UNARY_POSITIVE',
'UNARY_NEGATIVE','UNARY_NOT', 'UNARY_CONVERT','UNARY_INVERT',
'GET_ITER', 'BINARY_POWER', 'BINARY_MULTIPLY','BINARY_DIVIDE',
'BINARY_FLOOR_DIVIDE', 'BINARY_TRUE_DIVIDE', 'BINARY_MODULO',
'BINARY_ADD','BINARY_SUBTRACT', 'BINARY_SUBSCR','BINARY_LSHIFT',
'BINARY_RSHIFT','BINARY_AND', 'BINARY_XOR','BINARY_OR',
'INPLACE_POWER', 'INPLACE_MULTIPLY', 'INPLACE_DIVIDE',
'INPLACE_TRUE_DIVIDE','INPLACE_FLOOR_DIVIDE',
'INPLACE_MODULO', 'INPLACE_ADD', 'INPLACE_SUBTRACT',
'INPLACE_LSHIFT','INPLACE_RSHIFT','INPLACE_AND', 'INPLACE_XOR',
'INPLACE_OR', 'COMPARE_OP',
'CALL_FUNCTION_EX', 'LOAD_METHOD', 'CALL_METHOD'])
#--------------------------------------------------------------------------------------
def process_frame(frame):
"""Returns the number of arguments on the left of assignment along
with the names of the variables for the given frame.
Call signature(s)::
Required arguments:
=========================== ==========
frame The code frame to analyse
Outputs:
=========
Returns the a tuple with the number of arguments and their names
"""
# Index of the last attempted instruction in byte code
last_i = frame.f_lasti
ins_stack = decompile(frame.f_code)
call_function_locs = {}
start_index = 0
start_offset = 0
for index, instruction in enumerate(ins_stack):
(offset, op, name, argument, argvalue) = instruction
if name in __operator_names:
call_function_locs[start_offset] = (start_index, index)
start_index = index
start_offset = offset
(offset, op, name, argument, argvalue) = ins_stack[-1]
# Append the index of the last entry to form the last boundary
call_function_locs[start_offset] = (start_index, len(ins_stack)-1)
# last_i should be the offset of a call_function_locs instruction.
# We use this to bracket the bit which we are interested in.
# Bug:
# Some types of call, eg
# def foo(callableObj, *args, **kwargs):
# x = callableObj(*args, **kwargs)
#
# foo(FuncUsingLHS, 'Args')
#
# have the incorrect index at last_i due to the call being passed through
# an intermediate reference. Currently this method does not provide the
# correct answer and throws a KeyError. Ticket #4186
output_var_names = []
max_returns = []
last_func_offset = call_function_locs[last_i][0]
(offset, op, name, argument, argvalue) = ins_stack[last_func_offset + 1]
if name == 'POP_TOP': # no return values
pass
if name == 'STORE_FAST' or name == 'STORE_NAME': # one return value
output_var_names.append(argvalue)
if name == 'UNPACK_SEQUENCE': # Many Return Values, One equal sign
for index in range(argvalue):
(offset_, op_, name_, argument_, argvalue_) = ins_stack[last_func_offset + 2 +index]
output_var_names.append(argvalue_)
max_returns = len(output_var_names)
if name == 'DUP_TOP': # Many Return Values, Many equal signs
# The output here should be a multi-dim list which mimics the variable unpacking sequence.
# For instance a,b=c,d=f() => [ ['a','b'] , ['c','d'] ]
# a,b=c=d=f() => [ ['a','b'] , 'c','d' ] So on and so forth.
# put this in a loop and stack the results in an array.
count = 0
max_returns = 0 # Must count the max_returns ourselves in this case
while count < len(ins_stack[call_function_locs[last_i][0]:call_function_locs[last_i][1]]):
(offset_, op_, name_, argument_, argvalue_) = ins_stack[call_function_locs[last_i][0]+count]
if name_ == 'UNPACK_SEQUENCE': # Many Return Values, One equal sign
hold = []
if argvalue_ > max_returns:
max_returns = argvalue_
for index in range(argvalue_):
(_offset_, _op_, _name_, _argument_, _argvalue_) = ins_stack[call_function_locs[last_i][0] + count+1+index]
hold.append(_argvalue_)
count = count + argvalue_
output_var_names.append(hold)
# Need to now skip the entries we just appended with the for loop.
if name_ == 'STORE_FAST' or name_ == 'STORE_NAME': # One Return Value
if 1 > max_returns:
max_returns = 1
output_var_names.append(argvalue_)
count = count + 1
return (max_returns, tuple(output_var_names))
#-------------------------------------------------------------------------------
def lhs_info(output_type='both', frame=None):
"""Returns the number of arguments on the left of assignment along
with the names of the variables.
Acknowledgements:
Thanks to Tim Charlton and Jon Taylor of the ISIS facility for
figuring this out.
Call signature(s)::
Required arguments: NONE
Optional keyword arguments Meaning:
=========================== ==========
output_type A string enumerating the type of output, one of
output_type = 'nreturns' : The number of return values
expected from the call
output_type = 'names' : Just return a list of
variable names
output_type = 'both' : A tuple containing both of
the above
frame A frame object that points to the frame containing a variable assignment.
Default = inspect.currentframe().f_back.f_back
Outputs:
=========
Depends on the value of the argument. See above.
"""
if not frame:
try:
# Two frames back so that we get the callers' caller, i.e. this should only
# be called from within a function
frame = inspect.currentframe().f_back.f_back
except AttributeError:
raise RuntimeError("lhs_info cannot be used on the command line, only within a function")
# Process the frame noting the advice here:
# http://docs.python.org/library/inspect.html#the-interpreter-stack
try:
ret_vals = process_frame(frame)
finally:
del frame
if output_type == 'nreturns':
ret_vals = ret_vals[0]
elif output_type == 'names':
ret_vals = ret_vals[1]
else:
pass
return ret_vals
#-------------------------------------------------------------------------------