-
Notifications
You must be signed in to change notification settings - Fork 45
/
valuecontainer.py
422 lines (360 loc) · 14.7 KB
/
valuecontainer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
from .ecore import EProxy, EObject, EDataType
from .notification import Notification, Kind
from .ordered_set_patch import ordered_set
from collections.abc import MutableSet, MutableSequence
from typing import Iterable
class BadValueError(TypeError):
def __init__(self, got=None, expected=None, feature=None):
if isinstance(expected, EProxy):
expected.force_resolve()
expected = expected._wrapped
self.got = got
self.expected = expected
self.feature = feature
msg = "Expected type {0}, but got type {1} with value {2} instead "
msg = msg.format(expected, type(got).__name__, got)
if feature:
msg += "for feature {} of {}".format(feature,
feature.eContainingClass)
super().__init__(msg)
class EcoreUtils(object):
@staticmethod
def isinstance(obj, _type):
if obj is None:
return True
elif obj.__class__ is _type:
return True
elif _type.__class__ is EDataType and obj.__class__ is _type.eType:
return True
elif isinstance(obj, EProxy) and not obj.resolved:
return not obj.resolved
elif isinstance(obj, _type):
return True
try:
return _type.__isinstance__(obj)
except AttributeError:
return False
@staticmethod
def get_root(obj):
if not obj:
return None
previous = obj
while previous.eContainer() is not None:
previous = previous.eContainer()
return previous
class PyEcoreValue(object):
def __init__(self, owner, efeature):
super().__init__()
self.owner = owner
self.feature = efeature
self.is_ref = efeature and efeature.is_reference
self.is_cont = self.is_ref and efeature.containment
self.generic_type = None
def check(self, value):
etype = self.generic_type or self.feature.eType
if not etype:
try:
etype = self.feature.eGenericType.eRawType
self.generic_type = etype
except Exception:
raise AttributeError('Feature {} has no type'
'nor generic'.format(self.feature))
if not EcoreUtils.isinstance(value, etype):
raise BadValueError(value, etype, self.feature)
def _update_container(self, value, previous_value=None):
if not self.is_cont:
return
if value:
resource = value.eResource
if resource and value in resource.contents:
resource.remove(value)
prev_container = value._container
prev_feature = value._containment_feature
if (prev_container != self.owner
or prev_feature != self.feature) \
and isinstance(prev_container, EObject):
prev_container.__dict__[prev_feature.name] \
.remove_or_unset(value)
value._container = self.owner
value._containment_feature = self.feature
if previous_value:
previous_value._container = None
previous_value._containment_feature = None
class EValue(PyEcoreValue):
def __init__(self, owner, efeature):
super().__init__(owner, efeature)
self._value = efeature.get_default_value()
def remove_or_unset(self, value, update_opposite=True):
self._set(None, update_opposite)
def _get(self):
return self._value
def _set(self, value, update_opposite=True):
self.check(value)
previous_value = self._value
self._value = value
owner = self.owner
efeature = self.feature
notif = Notification(old=previous_value,
new=value,
feature=efeature,
kind=Kind.UNSET if value is None else Kind.SET)
owner.notify(notif)
owner._isset.add(efeature)
if not self.is_ref:
return
self._update_container(value, previous_value)
if not update_opposite:
return
# if there is no opposite, we set inverse relation and return
if not efeature.eOpposite:
couple = (owner, efeature)
if hasattr(value, '_inverse_rels'):
if hasattr(previous_value, '_inverse_rels'):
previous_value._inverse_rels.remove(couple)
value._inverse_rels.add(couple)
elif value is None and hasattr(previous_value, '_inverse_rels'):
previous_value._inverse_rels.remove(couple)
return
eOpposite = efeature.eOpposite
# if we are in an 'unset' context
opposite_name = eOpposite.name
if value is None:
if previous_value is None:
return
if eOpposite.many:
object.__getattribute__(previous_value, opposite_name) \
.remove(owner, update_opposite=False)
else:
object.__setattr__(previous_value, opposite_name, None)
else:
previous_value = value.__getattribute__(opposite_name)
if eOpposite.many:
previous_value.append(owner, update_opposite=False)
else:
# We disable the eOpposite update
value.__dict__[opposite_name]. \
_set(owner, update_opposite=False)
class ECollection(PyEcoreValue):
@staticmethod
def create(owner, feature):
if feature.derived:
return EDerivedCollection(owner, feature)
elif feature.ordered and feature.unique:
return EOrderedSet(owner, feature)
elif feature.ordered and not feature.unique:
return EList(owner, feature)
elif feature.unique:
return ESet(owner, feature)
else:
return EBag(owner, feature) # see for better implem
def __init__(self, owner, efeature):
super().__init__(owner, efeature)
def remove_or_unset(self, value, update_opposite=True):
self.remove(value, update_opposite)
def _get(self):
return self
def _update_opposite(self, owner, new_value, remove=False):
eOpposite = self.feature.eOpposite
if not eOpposite:
couple = (new_value, self.feature)
if remove and couple in owner._inverse_rels:
owner._inverse_rels.remove(couple)
else:
owner._inverse_rels.add(couple)
return
opposite_name = eOpposite.name
if eOpposite.many and not remove:
owner.__getattribute__(opposite_name).append(new_value, False)
elif eOpposite.many and remove:
owner.__getattribute__(opposite_name).remove(new_value, False)
else:
new_value = None if remove else new_value
owner.__getattribute__(opposite_name) # Force load
owner.__dict__[opposite_name] \
._set(new_value, update_opposite=False)
def remove(self, value, update_opposite=True):
if self.is_ref:
self._update_container(None, previous_value=value)
if update_opposite:
self._update_opposite(value, self.owner, remove=True)
super().remove(value)
self.owner.notify(Notification(old=value,
feature=self.feature,
kind=Kind.REMOVE))
def insert(self, i, y):
self.check(y)
if self.is_ref:
self._update_container(y)
self._update_opposite(y, self.owner)
super().insert(i, y)
self.owner.notify(Notification(new=y,
feature=self.feature,
kind=Kind.ADD))
self.owner._isset.add(self.feature)
def pop(self, index=None):
if index is None:
value = super().pop()
else:
value = super().pop(index)
if self.is_ref:
self._update_container(None, previous_value=value)
self._update_opposite(value, self.owner, remove=True)
self.owner.notify(Notification(old=value,
feature=self.feature,
kind=Kind.REMOVE))
return value
def clear(self):
for x in set(self):
self.remove(x)
def select(self, f):
return [x for x in self if f(x)]
def reject(self, f):
return [x for x in self if not f(x)]
def __iadd__(self, items):
if isinstance(items, Iterable):
self.extend(items)
else:
self.append(items)
return self
class EList(ECollection, list):
def __init__(self, owner, efeature=None):
super().__init__(owner, efeature)
def append(self, value, update_opposite=True):
self.check(value)
if self.is_ref:
self._update_container(value)
if update_opposite:
self._update_opposite(value, self.owner)
super().append(value)
self.owner.notify(Notification(new=value,
feature=self.feature,
kind=Kind.ADD))
self.owner._isset.add(self.feature)
def extend(self, sublist):
check = self.check
if self.is_ref:
_update_container = self._update_container
_update_opposite = self._update_opposite
owner = self.owner
for value in sublist:
check(value)
_update_container(value)
_update_opposite(value, owner)
else:
for value in sublist:
check(value)
super().extend(sublist)
self.owner.notify(Notification(new=sublist,
feature=self.feature,
kind=Kind.ADD_MANY))
self.owner._isset.add(self.feature)
update = extend
def __setitem__(self, i, y):
is_collection = isinstance(y, Iterable)
if isinstance(i, slice) and is_collection:
sliced_elements = self.__getitem__(i)
if self.is_ref:
for element in y:
self.check(element)
self._update_container(element)
self._update_opposite(element, self.owner)
# We remove (not really) all element from the slice
for element in sliced_elements:
self._update_container(None, previous_value=element)
self._update_opposite(element, self.owner, remove=True)
else:
for element in y:
self.check(element)
if sliced_elements and len(sliced_elements) > 1:
self.owner.notify(Notification(old=sliced_elements,
feature=self.feature,
kind=Kind.REMOVE_MANY))
elif sliced_elements:
self.owner.notify(Notification(old=sliced_elements[0],
feature=self.feature,
kind=Kind.REMOVE))
else:
self.check(y)
if self.is_ref:
self._update_container(y)
self._update_opposite(y, self.owner)
super().__setitem__(i, y)
kind = Kind.ADD
if is_collection and len(y) > 1:
kind = Kind.ADD_MANY
elif is_collection:
y = y[0] if y else y
self.owner.notify(Notification(new=y,
feature=self.feature,
kind=kind))
self.owner._isset.add(self.feature)
class EBag(EList):
pass
class EAbstractSet(ECollection):
def __init__(self, owner, efeature=None):
super().__init__(owner, efeature)
self._orderedset_update = False
def add(self, value, update_opposite=True):
self.check(value)
if self.is_ref:
self._update_container(value)
if update_opposite:
self._update_opposite(value, self.owner)
super().add(value)
self.owner.notify(Notification(new=value,
feature=self.feature,
kind=Kind.ADD))
self.owner._isset.add(self.feature)
append = add
def update(self, others):
check = self.check
add = super().add
for value in others:
check(value)
if self.is_ref:
self._update_container(value)
self._update_opposite(value, self.owner)
add(value)
self.owner._isset.add(self.feature)
self.owner.notify(Notification(new=others,
feature=self.feature,
kind=Kind.ADD_MANY))
extend = update
class EOrderedSet(EAbstractSet, ordered_set.OrderedSet):
def __init__(self, owner, efeature=None):
super().__init__(owner, efeature)
ordered_set.OrderedSet.__init__(self)
def copy(self):
return ordered_set.OrderedSet(self)
@staticmethod
def subcopy(sublist):
return ordered_set.OrderedSet(sublist)
class ESet(EOrderedSet):
pass
class EDerivedCollection(MutableSet, MutableSequence, ECollection):
@classmethod
def create(cls, owner, feature=None):
return cls(owner, feature)
def __init__(self, owner, feature=None):
super().__init__(owner, feature)
def __delitem__(self, index):
raise AttributeError('Operation not permited for "{}" feature'
.format(self.feature.name))
def __getitem__(self, index):
raise AttributeError('Operation not permited for "{}" feature'
.format(self.feature.name))
def __len__(self):
raise AttributeError('Operation not permited for "{}" feature'
.format(self.feature.name))
def __setitem__(self, index, item):
raise AttributeError('Operation not permited for "{}" feature'
.format(self.feature.name))
def add(self, value):
raise AttributeError('Operation not permited for "{}" feature'
.format(self.feature.name))
def discard(self, value):
raise AttributeError('Operation not permited for "{}" feature'
.format(self.feature.name))
def insert(self, index, value):
raise AttributeError('Operation not permited for "{}" feature'
.format(self.feature.name))