-
Notifications
You must be signed in to change notification settings - Fork 647
/
rpyc_proxy.py
709 lines (605 loc) · 27.2 KB
/
rpyc_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
import types
import collections
import rpyc
import cloudpickle as pickle
from rpyc.lib import get_methods
from rpyc.core import netref, AsyncResult, consts
from . import get_connection
from .meta_magic import _LOCAL_ATTRS, RemoteMeta, _KNOWN_DUALS
from modin.config import DoTraceRpyc
from .rpyc_patches import apply_pathes
apply_pathes()
def _batch_loads(items):
return tuple(pickle.loads(item) for item in items)
def _tuplize(arg):
"""turns any sequence or iterator into a flat tuple"""
return tuple(arg)
def _pickled_array(obj):
return pickle.dumps(obj.__array__())
class WrappingConnection(rpyc.Connection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self._remote_batch_loads = None
self._remote_cls_cache = {}
self._static_cache = collections.defaultdict(dict)
self._remote_dumps = None
self._remote_tuplize = None
self._remote_pickled_array = None
def __wrap(self, local_obj):
while True:
# unwrap magic wrappers first; keep unwrapping in case it's a wrapper-in-a-wrapper
# this shouldn't usually happen, so this is mostly a safety net
try:
local_obj = object.__getattribute__(local_obj, "__remote_end__")
except AttributeError:
break
# do not pickle netrefs of our current connection, but do pickle those of other;
# example of this: an object made in _other_ remote context being pased to ours
if isinstance(local_obj, netref.BaseNetref) and local_obj.____conn__ is self:
return None
return bytes(pickle.dumps(local_obj))
def deliver(self, args, kw):
"""
More efficient, batched version of rpyc.classic.deliver()
"""
pickled_args = [self.__wrap(arg) for arg in args]
pickled_kw = [(k, self.__wrap(v)) for (k, v) in kw.items()]
pickled = [i for i in pickled_args if i is not None] + [
v for (k, v) in pickled_kw if v is not None
]
remote = iter(self._remote_batch_loads(tuple(pickled)))
delivered_args = []
for local_arg, pickled_arg in zip(args, pickled_args):
delivered_args.append(
next(remote) if pickled_arg is not None else local_arg
)
delivered_kw = {}
for k, pickled_v in pickled_kw:
delivered_kw[k] = next(remote) if pickled_v is not None else kw[k]
return tuple(delivered_args), delivered_kw
def obtain(self, remote):
while True:
try:
remote = object.__getattribute__(remote, "__remote_end__")
except AttributeError:
break
if isinstance(remote, netref.BaseNetref) and remote.____conn__ is self:
return pickle.loads(self._remote_dumps(remote))
return remote
def obtain_tuple(self, remote):
while True:
try:
remote = object.__getattribute__(remote, "__remote_end__")
except AttributeError:
break
return self._remote_tuplize(remote)
def sync_request(self, handler, *args):
"""
Intercept outgoing synchronous requests from RPyC to add caching or
fulfilling them locally if possible to improve performance.
We should try to make as few remote calls as possible, because each
call adds up to latency.
"""
if handler == consts.HANDLE_INSPECT:
# always inspect classes from modin, pandas and numpy locally,
# do not go to network for those
id_name = str(args[0][0])
if id_name.split(".", 1)[0] in ("modin", "pandas", "numpy"):
try:
modobj = __import__(id_name)
for subname in id_name.split(".")[1:]:
modobj = getattr(modobj, subname)
except (ImportError, AttributeError):
pass
else:
return get_methods(netref.LOCAL_ATTRS, modobj)
modname, clsname = id_name.rsplit(".", 1)
try:
modobj = __import__(modname)
for subname in modname.split(".")[1:]:
modobj = getattr(modobj, subname)
clsobj = getattr(modobj, clsname)
except (ImportError, AttributeError):
pass
else:
return get_methods(netref.LOCAL_ATTRS, clsobj)
elif handler in (consts.HANDLE_GETATTR, consts.HANDLE_STR, consts.HANDLE_HASH):
if handler == consts.HANDLE_GETATTR:
obj, attr = args
key = (attr, handler)
else:
obj = args[0]
key = handler
if str(obj.____id_pack__[0]) in {"numpy", "numpy.dtype"}:
# always assume numpy attributes and numpy.dtype attributes are always the same;
# note that we're using RPyC id_pack as cache key, and it includes the name,
# class id and instance id, so this cache is unique to each instance of, say,
# numpy.dtype(), hence numpy.int16 and numpy.float64 got different caches.
cache = self._static_cache[obj.____id_pack__]
try:
result = cache[key]
except KeyError:
result = cache[key] = super().sync_request(handler, *args)
if handler == consts.HANDLE_GETATTR:
# save an entry in our cache telling that we get this attribute cached
self._static_cache[result.____id_pack__]["__getattr__"] = True
return result
return super().sync_request(handler, *args)
def async_request(self, handler, *args, **kw):
"""
Override async request handling to intercept outgoing deletion requests because we cache
certain things, and if we allow deletion of cached things our cache becomes stale.
We can clean the cache upon deletion, but it would increase cache misses a lot.
Also note that memory is not leaked forever, RPyC frees all of it upon disconnect.
"""
if handler == consts.HANDLE_DEL:
obj, _ = args
if obj.____id_pack__ in self._static_cache:
# object is cached by us, so ignore the request or remote end dies and cache is suddenly stale;
# we shouldn't remove item from cache as it would reduce performance
res = AsyncResult(self)
res._is_ready = True # simulate finished async request
return res
return super().async_request(handler, *args, **kw)
def __patched_netref(self, id_pack):
"""
Default RPyC behavior is to defer almost everything to be always obtained
from remote side. This is almost always correct except when Python behaves
strangely. For example, when checking for isinstance() or issubclass() it
gets obj.__bases__ tuple and uses its elements *after* calling a decref
on the __bases__, because Python assumes that the class type holds
a reference to __bases__, which isn't true for RPyC proxy classes, so in
RPyC case the element gets destroyed and undefined behavior happens.
So we're patching RPyC netref __getattribute__ to keep a reference
for certain read-only properties to better emulate local objects.
Also __array__() implementation works only for numpy arrays, but not other types,
like scalars (which should become arrays)
"""
result = super()._netref_factory(id_pack)
cls = type(result)
if not hasattr(cls, "__readonly_cache__"):
orig_getattribute = cls.__getattribute__
type.__setattr__(cls, "__readonly_cache__", {})
def __getattribute__(this, name):
if name in {"__bases__", "__base__", "__mro__"}:
cache = object.__getattribute__(this, "__readonly_cache__")
try:
return cache[name]
except KeyError:
res = cache[name] = orig_getattribute(this, name)
return res
return orig_getattribute(this, name)
cls.__getattribute__ = __getattribute__
return result
def _netref_factory(self, id_pack):
id_name, cls_id, inst_id = id_pack
id_name = str(id_name)
first = id_name.split(".", 1)[0]
if first in ("modin", "numpy", "pandas") and inst_id:
try:
cached_cls = self._remote_cls_cache[(id_name, cls_id)]
except KeyError:
result = self.__patched_netref(id_pack)
self._remote_cls_cache[(id_name, cls_id)] = type(result)
else:
result = cached_cls(self, id_pack)
else:
result = self.__patched_netref(id_pack)
# try getting __real_cls__ from result.__class__ BUT make sure to
# NOT get it from some parent class for result.__class__, otherwise
# multiple wrappings happen
# we cannot use 'result.__class__' as this could cause a lookup of
# '__class__' on remote end
try:
local_cls = object.__getattribute__(result, "__class__")
except AttributeError:
return result
try:
# first of all, check if remote object has a known "wrapping" class
# example: _DataFrame has DataFrame dual-nature wrapper
local_cls = _KNOWN_DUALS[local_cls]
except KeyError:
pass
try:
# Try to get local_cls.__real_cls__ but look it up within
# local_cls.__dict__ to not grab it from any parent class.
# Also get the __dict__ by using low-level __getattribute__
# to override any potential __getattr__ callbacks on the class.
wrapping_cls = object.__getattribute__(local_cls, "__dict__")[
"__real_cls__"
]
except (AttributeError, KeyError):
return result
return wrapping_cls.from_remote_end(result)
def _box(self, obj):
while True:
try:
obj = object.__getattribute__(obj, "__remote_end__")
except AttributeError:
break
return super()._box(obj)
def _init_deliver(self):
remote_proxy = self.modules["modin.experimental.cloud.rpyc_proxy"]
self._remote_batch_loads = remote_proxy._batch_loads
self._remote_dumps = self.modules["rpyc.lib.compat"].pickle.dumps
self._remote_tuplize = remote_proxy._tuplize
self._remote_pickled_array = remote_proxy._pickled_array
class WrappingService(rpyc.ClassicService):
if DoTraceRpyc.get():
from .tracing.tracing_connection import TracingWrappingConnection as _protocol
else:
_protocol = WrappingConnection
def on_connect(self, conn):
super().on_connect(conn)
conn._init_deliver()
def _in_empty_class():
class Empty:
pass
return frozenset(Empty.__dict__.keys())
_EMPTY_CLASS_ATTRS = _in_empty_class()
_PROXY_LOCAL_ATTRS = frozenset(["__name__", "__remote_end__"])
_NO_OVERRIDE = (
_LOCAL_ATTRS
| _PROXY_LOCAL_ATTRS
| rpyc.core.netref.DELETED_ATTRS
| frozenset(["__getattribute__"])
| _EMPTY_CLASS_ATTRS
)
def make_proxy_cls(
remote_cls: netref.BaseNetref,
origin_cls: type,
override: type,
cls_name: str = None,
):
"""
Makes a new class type which inherits from <origin_cls> (for isinstance() and issubtype()),
takes methods from <override> as-is and proxy all requests for other members to <remote_cls>.
Note that origin_cls and remote_cls are assumed to be the same class types, but one is local
and other is obtained from RPyC.
Effectively implements subclassing, but without subclassing. This is needed because it is
impossible to subclass a remote-obtained class, something in the very internals of RPyC bugs out.
Parameters
----------
remote_cls: netref.BaseNetref
Type obtained from RPyC connection, expected to mirror origin_cls
origin_cls: type
The class to prepare a proxying wrapping for
override: type
The mixin providing methods and attributes to overlay on top of remote values and methods.
cls_name: str, optional
The name to give to the resulting class.
Returns
-------
type
New wrapper that takes attributes from override and relays requests to all other
attributes to remote_cls
"""
class ProxyMeta(RemoteMeta):
"""
This metaclass deals with printing a telling repr() to assist in debugging,
and to actually implement the "subclass without subclassing" thing by
directly adding references to attributes of "override" and by making proxy methods
for other functions of origin_cls. Class-level attributes being proxied is managed
by RemoteMeta parent.
Do note that we cannot do the same for certain special members like __getitem__
because CPython for optimization doesn't do a lookup of "type(obj).__getitem__(foo)" when
"obj[foo]" is called, but it effectively does "type(obj).__dict__['__getitem__'](foo)"
(but even without checking for __dict__), so all present methods must be declared
beforehand.
"""
def __repr__(self):
return f"<proxy for {origin_cls.__module__}.{origin_cls.__name__}:{cls_name or origin_cls.__name__}>"
def __prepare__(*args, **kw):
"""
Cooks the __dict__ of the type being constructed. Takes attributes from <override> as is
and adds proxying wrappers for other attributes of <origin_cls>.
This "manual inheritance" is needed for RemoteMeta.__getattribute__ which first looks into
type(obj).__dict__ (EXCLUDING parent classes) and then goes to proxy type.
"""
namespace = type.__prepare__(*args, **kw)
namespace["__remote_methods__"] = {}
# try computing overridden differently to allow subclassing one override from another
no_override = set(_NO_OVERRIDE)
for base in override.__mro__:
if base == object:
continue
for attr_name, attr_value in base.__dict__.items():
if (
attr_name not in namespace
and attr_name not in no_override
and getattr(object, attr_name, None) != attr_value
):
namespace[
attr_name
] = attr_value # force-inherit an attribute manually
no_override.add(attr_name)
for base in origin_cls.__mro__:
if base == object:
continue
# try unwrapping a dual-nature class first
while True:
try:
sub_base = object.__getattribute__(base, "__real_cls__")
except AttributeError:
break
if sub_base is base:
break
base = sub_base
for name, entry in base.__dict__.items():
if (
name not in namespace
and name not in no_override
and isinstance(entry, types.FunctionType)
):
def method(_self, *_args, __method_name__=name, **_kw):
try:
remote = _self.__remote_methods__[__method_name__]
except KeyError:
# use remote_cls.__getattr__ to force RPyC return us
# a proxy for remote method call instead of its local wrapper
_self.__remote_methods__[
__method_name__
] = remote = remote_cls.__getattr__(__method_name__)
return remote(_self.__remote_end__, *_args, **_kw)
method.__name__ = name
namespace[name] = method
return namespace
class Wrapper(override, origin_cls, metaclass=ProxyMeta):
"""
Subclass origin_cls replacing attributes with what is defined in override while
relaying requests for all other attributes to remote_cls.
"""
__name__ = cls_name or origin_cls.__name__
__wrapper_remote__ = remote_cls
def __init__(self, *a, __remote_end__=None, **kw):
if __remote_end__ is None:
try:
preprocess = object.__getattribute__(self, "_preprocess_init_args")
except AttributeError:
pass
else:
a, kw = preprocess(*a, **kw)
__remote_end__ = remote_cls(*a, **kw)
while True:
# unwrap the object if it's a wrapper
try:
__remote_end__ = object.__getattribute__(
__remote_end__, "__remote_end__"
)
except AttributeError:
break
object.__setattr__(self, "__remote_end__", __remote_end__)
@classmethod
def from_remote_end(cls, remote_inst):
return cls(__remote_end__=remote_inst)
def __getattribute__(self, name):
"""
Implement "default" resolution order to override whatever __getattribute__
a parent being wrapped may have defined, but only look up on own __dict__
without looking into ancestors' ones, because we copy them in __prepare__.
Effectively, any attributes not currently known to Wrapper (i.e. not defined here
or in override class) will be retrieved from the remote end.
Algorithm (mimicking default Python behavior):
1) check if type(self).__dict__[name] exists and is a get/set data descriptor
2) check if self.__dict__[name] exists
3) check if type(self).__dict__[name] is a non-data descriptor
4) check if type(self).__dict__[name] exists
5) pass through to remote end
"""
if name == "__class__":
return object.__getattribute__(self, "__class__")
dct = object.__getattribute__(self, "__dict__")
if name == "__dict__":
return dct
cls_dct = object.__getattribute__(type(self), "__dict__")
try:
cls_attr, has_cls_attr = cls_dct[name], True
except KeyError:
has_cls_attr = False
else:
oget = None
try:
oget = object.__getattribute__(cls_attr, "__get__")
object.__getattribute__(cls_attr, "__set__")
except AttributeError:
pass # not a get/set data descriptor, go next
else:
return oget(self, type(self))
# type(self).name is not a get/set data descriptor
try:
return dct[name]
except KeyError:
# instance doesn't have an attribute
if has_cls_attr:
# type(self) has this attribute, but it's not a get/set descriptor
if oget:
# this attribute is a get data descriptor
return oget(self, type(self))
return cls_attr # not a data descriptor whatsoever
# this instance/class does not have this attribute, pass it through to remote end
return getattr(dct["__remote_end__"], name)
if override.__setattr__ == object.__setattr__:
# no custom attribute setting, define our own relaying to remote end
def __setattr__(self, name, value):
if name not in _PROXY_LOCAL_ATTRS:
setattr(self.__remote_end__, name, value)
else:
object.__setattr__(self, name, value)
if override.__delattr__ == object.__delattr__:
# no custom __delattr__, define our own
def __delattr__(self, name):
if name not in _PROXY_LOCAL_ATTRS:
delattr(self.__remote_end__, name)
return Wrapper
def _deliveringWrapper(
origin_cls: type, methods=(), mixin: type = None, target_name: str = None
):
"""
Prepare a proxying wrapper for origin_cls which overrides methods specified in
"methods" with "delivering" versions of methods.
A "delivering" method is a method which delivers its arguments to a remote end
before calling the remote method, effectively calling it with arguments passed
by value, not by reference.
This is mostly a workaround for RPyC bug when it translates a non-callable
type to a remote type which has __call__() method (which would raise TypeError
when called because local class is not callable).
Note: this could lead to some weird side-effects if any arguments passed
in are very funny, but this should never happen in a real data science life.
Parameters
----------
origin_cls: type
Local class to make a "delivering wrapper" for.
methods: sequence of method names, optional
List of methods to override making "delivering wrappers" for.
mixin: type, optional
Parent mixin class to subclass (to inherit already prepared wrappers).
If not specified, a new mixin is created.
target_name: str, optional
Name to give to prepared wrapper class.
If not specified, take the name of local class being wrapped.
Returns
-------
type
The "delivering wrapper" mixin, to be used in conjunction with make_proxy_cls()
"""
conn = get_connection()
remote_cls = getattr(conn.modules[origin_cls.__module__], origin_cls.__name__)
if mixin is None:
class DeliveringMixin:
pass
mixin = DeliveringMixin
for method in methods:
def wrapper(self, *args, __remote_conn__=conn, __method_name__=method, **kw):
args, kw = __remote_conn__.deliver(args, kw)
cache = object.__getattribute__(self, "__remote_methods__")
try:
remote = cache[__method_name__]
except KeyError:
# see comments in ProxyMeta.__prepare__ on using remote_cls.__getattr__
cache[__method_name__] = remote = remote_cls.__getattr__(
__method_name__
)
return remote(self.__remote_end__, *args, **kw)
wrapper.__name__ = method
setattr(mixin, method, wrapper)
return make_proxy_cls(
remote_cls, origin_cls, mixin, target_name or origin_cls.__name__
)
def _prepare_loc_mixin():
"""
Prepare a mixin that overrides .loc and .iloc properties with versions
which return a special "delivering" instances of indexers.
"""
from modin.pandas.indexing import _LocIndexer, _iLocIndexer
DeliveringLocIndexer = _deliveringWrapper(
_LocIndexer, ["__getitem__", "__setitem__"]
)
DeliveringILocIndexer = _deliveringWrapper(
_iLocIndexer, ["__getitem__", "__setitem__"]
)
class DeliveringMixin:
@property
def loc(self):
return DeliveringLocIndexer(self.__remote_end__)
@property
def iloc(self):
return DeliveringILocIndexer(self.__remote_end__)
return DeliveringMixin
def make_dataframe_wrapper(DataFrame):
"""
Prepares a "delivering wrapper" proxy class for DataFrame.
It makes DF.loc, DF.groupby() and other methods listed below deliver their
arguments to remote end by value.
"""
from modin.pandas.series import Series
conn = get_connection()
class ObtainingItems:
def items(self):
return conn.obtain_tuple(self.__remote_end__.items())
def iteritems(self):
return conn.obtain_tuple(self.__remote_end__.iteritems())
ObtainingItems = _deliveringWrapper(Series, mixin=ObtainingItems)
class DataFrameOverrides(_prepare_loc_mixin()):
@classmethod
def _preprocess_init_args(
cls,
data=None,
index=None,
columns=None,
dtype=None,
copy=None,
query_compiler=None,
):
(data,) = conn.deliver((data,), {})[0]
return (), dict(
data=data,
index=index,
columns=columns,
dtype=dtype,
copy=copy,
query_compiler=query_compiler,
)
@property
def dtypes(self):
remote_dtypes = self.__remote_end__.dtypes
return ObtainingItems(__remote_end__=remote_dtypes)
DeliveringDataFrame = _deliveringWrapper(
DataFrame,
[
"groupby",
"agg",
"aggregate",
"__getitem__",
"astype",
"drop",
"merge",
"apply",
"applymap",
],
DataFrameOverrides,
"DataFrame",
)
return DeliveringDataFrame
def make_base_dataset_wrapper(BasePandasDataset):
"""
Prepares a "delivering wrapper" proxy class for BasePandasDataset.
Look for deatils in make_dataframe_wrapper() and _deliveringWrapper().
"""
DeliveringBasePandasDataset = _deliveringWrapper(
BasePandasDataset,
["agg", "aggregate"],
_prepare_loc_mixin(),
"BasePandasDataset",
)
return DeliveringBasePandasDataset
def make_dataframe_groupby_wrapper(DataFrameGroupBy):
"""
Prepares a "delivering wrapper" proxy class for DataFrameGroupBy.
Look for deatils in make_dataframe_wrapper() and _deliveringWrapper().
"""
DeliveringDataFrameGroupBy = _deliveringWrapper(
DataFrameGroupBy,
["agg", "aggregate", "apply"],
target_name="DataFrameGroupBy",
)
return DeliveringDataFrameGroupBy
def make_series_wrapper(Series):
"""
Prepares a "delivering wrapper" proxy class for Series.
Note that for now _no_ methods that really deliver their arguments by value
are overridded here, so what it mostly does is it produces a wrapper class
inherited from normal Series but wrapping all access to remote end transparently.
"""
return _deliveringWrapper(Series, ["apply"], target_name="Series")