/
callbacks.py
355 lines (264 loc) · 9.43 KB
/
callbacks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
"""Callbacks available from R's C-API.
The callbacks make available in R's C-API can be specified as
Python functions, with the module providing the adapter code
that makes it possible."""
from contextlib import contextmanager
import logging
import typing
import os
from rpy2.rinterface_lib import openrlib
from rpy2.rinterface_lib import ffi_proxy
from rpy2.rinterface_lib import conversion
logger = logging.getLogger(__name__)
_CCHAR_ENCODING = 'utf-8'
# TODO: rename to "replace_in_module"
@contextmanager
def obj_in_module(module, name: str, obj: typing.Any):
obj_orig = getattr(module, name)
setattr(module, name, obj)
try:
yield
finally:
setattr(module, name, obj_orig)
def consoleflush():
pass
_FLUSHCONSOLE_EXCEPTION_LOG = 'R[flush console]: %s'
@ffi_proxy.callback(ffi_proxy._consoleflush_def,
openrlib._rinterface_cffi)
def _consoleflush() -> None:
try:
consoleflush()
except Exception as e:
logger.error(_FLUSHCONSOLE_EXCEPTION_LOG, str(e))
def consoleread(prompt: str) -> str:
"""Read input for the R console.
:param prompt: The message prompted.
:return: A string with the input returned by the user.
"""
return input(prompt)
_READCONSOLE_EXCEPTION_LOG = 'R[read into console]: %s'
_READCONSOLE_INTERNAL_EXCEPTION_LOG = ('Internal rpy2 error with '
'_consoleread callback: %s')
@ffi_proxy.callback(ffi_proxy._consoleread_def,
openrlib._rinterface_cffi)
def _consoleread(prompt, buf, n: int, addtohistory) -> int:
success = None
try:
s = conversion._cchar_to_str(prompt, _CCHAR_ENCODING)
reply = consoleread(s)
except Exception as e:
success = 0
logger.error(_READCONSOLE_EXCEPTION_LOG, str(e))
if success == 0:
return success
try:
# TODO: Should the coding be dynamically extracted from
# elsewhere ?
reply_b = reply.encode('utf-8')
reply_n = min(n, len(reply_b))
pybuf = bytearray(n)
pybuf[:reply_n] = reply_b[:reply_n]
pybuf[reply_n] = ord('\n')
pybuf[reply_n+1] = 0
openrlib.ffi.memmove(buf,
pybuf,
n)
if reply_n == 0:
success = 0
else:
success = 1
except Exception as e:
success = 0
logger.error(_READCONSOLE_INTERNAL_EXCEPTION_LOG,
str(e))
return success
def consolereset() -> None:
pass
_RESETCONSOLE_EXCEPTION_LOG = 'R[reset console]: %s'
@ffi_proxy.callback(ffi_proxy._consolereset_def,
openrlib._rinterface_cffi)
def _consolereset() -> None:
try:
consolereset()
except Exception as e:
logger.error(_RESETCONSOLE_EXCEPTION_LOG, str(e))
def consolewrite_print(s: str) -> None:
"""R writing to the console/terminal.
:param s: the data to write to the console/terminal.
"""
# TODO: is the callback for flush working with Linux ?
print(s, end='', flush=True)
def consolewrite_warnerror(s: str) -> None:
# TODO: use an rpy2/R-specific warning instead of UserWarning.
logger.warning(_WRITECONSOLE_EXCEPTION_LOG, s)
_WRITECONSOLE_EXCEPTION_LOG = 'R[write to console]: %s'
@ffi_proxy.callback(ffi_proxy._consolewrite_ex_def,
openrlib._rinterface_cffi)
def _consolewrite_ex(buf, n: int, otype: int) -> None:
s = conversion._cchar_to_str_with_maxlen(buf, n, _CCHAR_ENCODING)
try:
if otype == 0:
consolewrite_print(s)
else:
consolewrite_warnerror(s)
except Exception as e:
logger.error(_WRITECONSOLE_EXCEPTION_LOG, str(e))
def showmessage(s: str) -> None:
print('R wants to show a message')
print(s)
_SHOWMESSAGE_EXCEPTION_LOG = 'R[show message]: %s'
@ffi_proxy.callback(ffi_proxy._showmessage_def,
openrlib._rinterface_cffi)
def _showmessage(buf):
s = conversion._cchar_to_str(buf, _CCHAR_ENCODING)
try:
showmessage(s)
except Exception as e:
logger.error(_SHOWMESSAGE_EXCEPTION_LOG, str(e))
def choosefile(new):
return input('Enter file name:')
_CHOOSEFILE_EXCEPTION_LOG = 'R[choose file]: %s'
@ffi_proxy.callback(ffi_proxy._choosefile_def,
openrlib._rinterface_cffi)
def _choosefile(new, buf, n: int) -> int:
try:
res = choosefile(new)
except Exception as e:
logger.error(_CHOOSEFILE_EXCEPTION_LOG, str(e))
res = None
if res is None:
return 0
res_cdata = conversion._str_to_cchar(res)
openrlib.ffi.memmove(buf, res_cdata, len(res_cdata))
return len(res_cdata)
def showfiles(filenames: typing.Tuple[str, ...],
headers: typing.Tuple[str, ...],
wtitle: typing.Optional[str],
pager: typing.Optional[str]) -> None:
"""R showing files.
:param filenames: A tuple of file names.
:param headers: A tuple of strings (TODO: check what it is)
:wtitle: Title of the "window" showing the files.
:pager: Pager to use to show the list of files.
"""
for fn in filenames:
print('File: %s' % fn)
with open(fn) as fh:
for row in fh:
print(row)
print('---')
_SHOWFILE_EXCEPTION_LOG = 'R[show file]: %s'
_SHOWFILE_INTERNAL_EXCEPTION_LOG = ('Internal rpy2 error while '
'showing files for R: %s')
@ffi_proxy.callback(ffi_proxy._showfiles_def,
openrlib._rinterface_cffi)
def _showfiles(nfiles: int, files, headers, wtitle, delete, pager) -> int:
filenames = []
headers_str = []
wtitle_str = None
pager_str = None
try:
wtitle_str = conversion._cchar_to_str(wtitle, _CCHAR_ENCODING)
pager_str = conversion._cchar_to_str(pager, _CCHAR_ENCODING)
for i in range(nfiles):
filenames.append(
conversion._cchar_to_str(files[i],
_CCHAR_ENCODING)
)
headers_str.append(
conversion._cchar_to_str(headers[i],
_CCHAR_ENCODING)
)
except Exception as e:
logger.error(_SHOWFILE_INTERNAL_EXCEPTION_LOG, str(e))
if len(filenames):
res = 0
else:
res = 1
try:
showfiles(tuple(filenames),
tuple(headers_str),
wtitle_str,
pager_str)
except Exception as e:
res = 1
logger.error(_SHOWFILE_EXCEPTION_LOG, str(e))
return res
def cleanup(saveact, status, runlast):
pass
_CLEANUP_EXCEPTION_LOG = 'R[cleanup]: %s'
@ffi_proxy.callback(ffi_proxy._cleanup_def,
openrlib._rinterface_cffi)
def _cleanup(saveact, status, runlast):
try:
cleanup(saveact, status, runlast)
except Exception as e:
logger.error(_CLEANUP_EXCEPTION_LOG, str(e))
def processevents() -> None:
"""Process R events.
This function can be periodically called by R to handle
events such as window resizing in an interactive graphical
device."""
pass
_PROCESSEVENTS_EXCEPTION_LOG = 'R[processevents]: %s'
@ffi_proxy.callback(ffi_proxy._processevents_def,
openrlib._rinterface_cffi)
def _processevents() -> None:
try:
processevents()
except KeyboardInterrupt:
# This function is a Python callback. A keyboard interruption is
# captured in Python, but R must be notified that an interruption
# occured so it can handle it.
rlib = openrlib.rlib
if os.name == 'nt':
# On Windows, the global C-level R variable `UserBreak` is set
# to one to notifying R that a `SIGBREAK` has been sent
# (see the R source - `src/gnuwin32/embeddedR.c:my_onintr()`).
rlib.UserBreak = 1
else:
# On UNIX-like, R can be notified that a SIGINT has been sent
# through the C-level R variable `R_interrupts_pending`
# (see the R source - `src/main/main.c:handleInterrupt()`)
rlib.R_interrupts_pending = 1
except Exception as e:
logger.error(_PROCESSEVENTS_EXCEPTION_LOG, str(e))
def busy(x: int) -> None:
"""R is busy.
:param x: TODO this is an integer but I do not know what it does.
"""
pass
_BUSY_EXCEPTION_LOG = 'R[busy]: %s'
@ffi_proxy.callback(ffi_proxy._busy_def,
openrlib._rinterface_cffi)
def _busy(which: int) -> None:
try:
busy(which)
except Exception as e:
logger.error(_BUSY_EXCEPTION_LOG, str(e))
def callback() -> None:
pass
_CALLBACK_EXCEPTION_LOG = 'R[callback]: %s'
@ffi_proxy.callback(ffi_proxy._callback_def,
openrlib._rinterface_cffi)
def _callback() -> None:
try:
callback()
except Exception as e:
logger.error(_CALLBACK_EXCEPTION_LOG, str(e))
def yesnocancel(question: str) -> int:
"""Asking a user to answer yes, no, or cancel.
:param question: The question asked to the user
:return: An integer with the answer.
"""
return int(input(question))
_YESNOCANCEL_EXCEPTION_LOG = 'R[yesnocancel]: %s'
@ffi_proxy.callback(ffi_proxy._yesnocancel_def,
openrlib._rinterface_cffi)
def _yesnocancel(question):
try:
q = conversion._cchar_to_str(question, _CCHAR_ENCODING)
res = yesnocancel(q)
except Exception as e:
logger.error(_YESNOCANCEL_EXCEPTION_LOG, str(e))
return res