-
Notifications
You must be signed in to change notification settings - Fork 74
/
group_population.py
308 lines (244 loc) · 11.3 KB
/
group_population.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import typing
import numpy
from openfisca_core import projectors
from openfisca_core.entities import Role
from openfisca_core.indexed_enums import EnumArray
from .population import Population
class GroupPopulation(Population):
def __init__(self, entity, members):
super().__init__(entity)
self.members = members
self._members_entity_id = None
self._members_role = None
self._members_position = None
self._ordered_members_map = None
def clone(self, simulation):
result = GroupPopulation(self.entity, self.members)
result.simulation = simulation
result._holders = {
variable: holder.clone(self) for (variable, holder) in self._holders.items()
}
result.count = self.count
result.ids = self.ids
result._members_entity_id = self._members_entity_id
result._members_role = self._members_role
result._members_position = self._members_position
result._ordered_members_map = self._ordered_members_map
return result
@property
def members_position(self):
if self._members_position is None and self.members_entity_id is not None:
# We could use self.count and self.members.count , but with the current initilization, we are not sure count will be set before members_position is called
nb_entities = numpy.max(self.members_entity_id) + 1
nb_persons = len(self.members_entity_id)
self._members_position = numpy.empty_like(self.members_entity_id)
counter_by_entity = numpy.zeros(nb_entities)
for k in range(nb_persons):
entity_index = self.members_entity_id[k]
self._members_position[k] = counter_by_entity[entity_index]
counter_by_entity[entity_index] += 1
return self._members_position
@members_position.setter
def members_position(self, members_position):
self._members_position = members_position
@property
def members_entity_id(self):
return self._members_entity_id
@members_entity_id.setter
def members_entity_id(self, members_entity_id):
self._members_entity_id = members_entity_id
@property
def members_role(self):
if self._members_role is None:
default_role = self.entity.flattened_roles[0]
self._members_role = numpy.repeat(default_role, len(self.members_entity_id))
return self._members_role
@members_role.setter
def members_role(self, members_role: typing.Iterable[Role]):
if members_role is not None:
self._members_role = numpy.array(list(members_role))
@property
def ordered_members_map(self):
"""
Mask to group the persons by entity
This function only caches the map value, to see what the map is used for, see value_nth_person method.
"""
if self._ordered_members_map is None:
self._ordered_members_map = numpy.argsort(self.members_entity_id)
return self._ordered_members_map
def get_role(self, role_name):
return next(
(role for role in self.entity.flattened_roles if role.key == role_name),
None,
)
# Aggregation persons -> entity
@projectors.projectable
def sum(self, array, role=None):
"""
Return the sum of ``array`` for the members of the entity.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.sum(salaries)
>>> array([3500])
"""
self.entity.check_role_validity(role)
self.members.check_array_compatible_with_entity(array)
if role is not None:
role_filter = self.members.has_role(role)
return numpy.bincount(
self.members_entity_id[role_filter],
weights=array[role_filter],
minlength=self.count,
)
else:
return numpy.bincount(self.members_entity_id, weights=array)
@projectors.projectable
def any(self, array, role=None):
"""
Return ``True`` if ``array`` is ``True`` for any members of the entity.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.any(salaries >= 1800)
>>> array([True])
"""
sum_in_entity = self.sum(array, role=role)
return sum_in_entity > 0
@projectors.projectable
def reduce(self, array, reducer, neutral_element, role=None):
self.members.check_array_compatible_with_entity(array)
self.entity.check_role_validity(role)
position_in_entity = self.members_position
role_filter = self.members.has_role(role) if role is not None else True
filtered_array = numpy.where(role_filter, array, neutral_element)
result = self.filled_array(
neutral_element
) # Neutral value that will be returned if no one with the given role exists.
# We loop over the positions in the entity
# Looping over the entities is tempting, but potentielly slow if there are a lot of entities
biggest_entity_size = numpy.max(position_in_entity) + 1
for p in range(biggest_entity_size):
values = self.value_nth_person(p, filtered_array, default=neutral_element)
result = reducer(result, values)
return result
@projectors.projectable
def all(self, array, role=None):
"""
Return ``True`` if ``array`` is ``True`` for all members of the entity.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.all(salaries >= 1800)
>>> array([False])
"""
return self.reduce(
array, reducer=numpy.logical_and, neutral_element=True, role=role
)
@projectors.projectable
def max(self, array, role=None):
"""
Return the maximum value of ``array`` for the entity members.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.max(salaries)
>>> array([2000])
"""
return self.reduce(
array, reducer=numpy.maximum, neutral_element=-numpy.infty, role=role
)
@projectors.projectable
def min(self, array, role=None):
"""
Return the minimum value of ``array`` for the entity members.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.min(salaries)
>>> array([0])
>>> household.min(salaries, role = Household.PARENT) # Assuming the 1st two persons are parents
>>> array([1500])
"""
return self.reduce(
array, reducer=numpy.minimum, neutral_element=numpy.infty, role=role
)
@projectors.projectable
def nb_persons(self, role=None):
"""
Returns the number of persons contained in the entity.
If ``role`` is provided, only the entity member with the given role are taken into account.
"""
if role:
if role.subroles:
role_condition = numpy.logical_or.reduce(
[self.members_role == subrole for subrole in role.subroles]
)
else:
role_condition = self.members_role == role
return self.sum(role_condition)
else:
return numpy.bincount(self.members_entity_id)
# Projection person -> entity
@projectors.projectable
def value_from_person(self, array, role, default=0):
"""
Get the value of ``array`` for the person with the unique role ``role``.
``array`` must have the dimension of the number of persons in the simulation
If such a person does not exist, return ``default`` instead
The result is a vector which dimension is the number of entities
"""
self.entity.check_role_validity(role)
if role.max != 1:
raise Exception(
"You can only use value_from_person with a role that is unique in {}. Role {} is not unique.".format(
self.key, role.key
)
)
self.members.check_array_compatible_with_entity(array)
members_map = self.ordered_members_map
result = self.filled_array(default, dtype=array.dtype)
if isinstance(array, EnumArray):
result = EnumArray(result, array.possible_values)
role_filter = self.members.has_role(role)
entity_filter = self.any(role_filter)
result[entity_filter] = array[members_map][role_filter[members_map]]
return result
@projectors.projectable
def value_nth_person(self, n, array, default=0):
"""
Get the value of array for the person whose position in the entity is n.
Note that this position is arbitrary, and that members are not sorted.
If the nth person does not exist, return ``default`` instead.
The result is a vector which dimension is the number of entities.
"""
self.members.check_array_compatible_with_entity(array)
positions = self.members_position
nb_persons_per_entity = self.nb_persons()
members_map = self.ordered_members_map
result = self.filled_array(default, dtype=array.dtype)
# For households that have at least n persons, set the result as the value of criteria for the person for which the position is n.
# The map is needed b/c the order of the nth persons of each household in the persons vector is not necessarily the same than the household order.
result[nb_persons_per_entity > n] = array[members_map][
positions[members_map] == n
]
if isinstance(array, EnumArray):
result = EnumArray(result, array.possible_values)
return result
@projectors.projectable
def value_from_first_person(self, array):
return self.value_nth_person(0, array)
# Projection entity -> person(s)
def project(self, array, role=None):
self.check_array_compatible_with_entity(array)
self.entity.check_role_validity(role)
if role is None:
return array[self.members_entity_id]
else:
role_condition = self.members.has_role(role)
return numpy.where(role_condition, array[self.members_entity_id], 0)