Skip to content

Commit

Permalink
ENH: support chunking micro computation to reduce memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgortmaker committed May 25, 2022
1 parent 66d8f57 commit 1015603
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 72 deletions.
157 changes: 103 additions & 54 deletions pyblp/markets/market.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ def compute_probabilities(
return probabilities, conditionals

def compute_eliminated_probabilities(
self, probabilities: Array, delta: Optional[Array] = None, eliminate_outside: bool = False,
eliminate_product: Optional[int] = None) -> Array:
self, probabilities: Array, delta: Optional[Array] = None, mu: Optional[Array] = None,
eliminate_outside: bool = False, eliminate_product: Optional[int] = None) -> Array:
"""Convert choice probabilities into probabilities with the outside option, an inside product, or both removed
from the choice set. If there are any errors, revert to the full derivation of these eliminated probabilities,
using the delta with which this market was initialized.
Expand Down Expand Up @@ -420,7 +420,7 @@ def compute_eliminated_probabilities(
# if there were any errors, compute the eliminated probabilities directly
if not np.isfinite(eliminated).all():
eliminated, _ = self.compute_probabilities(
delta, eliminate_outside=eliminate_outside, eliminate_product=eliminate_product
delta, mu, eliminate_outside=eliminate_outside, eliminate_product=eliminate_product
)

return eliminated
Expand Down Expand Up @@ -1328,18 +1328,21 @@ def compute_omega_by_theta_jacobian(

return omega_jacobian

def compute_micro_weights(self, dataset: MicroDataset) -> Array:
def compute_micro_weights(self, dataset: MicroDataset, agent_indices: Optional[Array] = None) -> Array:
"""Compute and validate micro dataset weights."""
agents = self.agents
if agent_indices is not None:
agents = agents[agent_indices]

try:
weights = np.asarray(dataset.compute_weights(self.t, self.products, self.agents), options.dtype)
weights = np.asarray(dataset.compute_weights(self.t, self.products, agents), options.dtype)
except Exception as exception:
message = f"Failed to compute weights for micro dataset '{dataset}' because of the above exception."
raise RuntimeError(message) from exception

shapes = [
(self.I, self.J), (self.I, 1 + self.J), (self.I, self.J, self.J), (self.I, 1 + self.J, self.J),
(self.I, self.J, 1 + self.J), (self.I, 1 + self.J, 1 + self.J)
]
I = agents.size
J = self.J
shapes = [(I, J), (I, 1 + J), (I, J, J), (I, 1 + J, J), (I, J, 1 + J), (I, 1 + J, 1 + J)]
if weights.shape not in shapes:
raise ValueError(
f"In market {self.t}, micro dataset '{dataset}' returned an array of shape {weights.shape}, which is "
Expand All @@ -1353,10 +1356,14 @@ def compute_micro_weights(self, dataset: MicroDataset) -> Array:

return weights

def compute_micro_values(self, moment: MicroMoment, weights: Array) -> Array:
def compute_micro_values(self, moment: MicroMoment, weights: Array, agent_indices: Optional[Array] = None) -> Array:
"""Compute and validate micro moment values."""
agents = self.agents
if agent_indices is not None:
agents = agents[agent_indices]

try:
values = np.asarray(moment.compute_values(self.t, self.products, self.agents), options.dtype)
values = np.asarray(moment.compute_values(self.t, self.products, agents), options.dtype)
except Exception as exception:
message = f"Failed to compute values for micro moment '{moment}' because of the above exception."
raise RuntimeError(message) from exception
Expand All @@ -1372,7 +1379,8 @@ def compute_micro_values(self, moment: MicroMoment, weights: Array) -> Array:

def compute_micro_dataset_contributions(
self, datasets: List[MicroDataset], delta: Optional[Array] = None, probabilities: Optional[Array] = None,
probabilities_tangent_mapping: Optional[Dict[int, Array]] = None, compute_jacobians: bool = False) -> (
probabilities_tangent_mapping: Optional[Dict[int, Array]] = None, compute_jacobians: bool = False,
agent_indices: Optional[Array] = None) -> (
Tuple[
Dict[MicroDataset, Array], Dict[MicroDataset, Array], Dict[Tuple[MicroDataset, int], Array],
Dict[Tuple[MicroDataset, int], Array]
Expand All @@ -1382,6 +1390,10 @@ def compute_micro_dataset_contributions(
assert self.delta is not None
delta = self.delta

mu = None
if agent_indices is not None:
mu = self.mu[:, agent_indices]

# pre-compute and validate micro dataset weights, multiplying these with probabilities and using these to
# compute micro value denominators
weights_mapping: Dict[MicroDataset, Array] = {}
Expand All @@ -1399,11 +1411,11 @@ def compute_micro_dataset_contributions(
continue

# compute and validate weights
weights = self.compute_micro_weights(dataset)
weights = self.compute_micro_weights(dataset, agent_indices)

# pre-compute probabilities
if probabilities is None:
probabilities, _ = self.compute_probabilities(delta)
probabilities, _ = self.compute_probabilities(delta, mu)

# pre-compute outside probabilities
if outside_probabilities is None and weights.shape[1] == 1 + self.J:
Expand All @@ -1419,7 +1431,7 @@ def compute_micro_dataset_contributions(
eliminated_probabilities_list = []
for j in range(self.J):
eliminated_probabilities_list.append(self.compute_eliminated_probabilities(
probabilities, delta, eliminate_product=j
probabilities, delta, mu, eliminate_product=j
))

eliminated_probabilities = np.stack(eliminated_probabilities_list)
Expand All @@ -1438,7 +1450,7 @@ def compute_micro_dataset_contributions(
# pre-compute probabilities after the outside option has been removed
if len(weights.shape) == 3 and outside_eliminated_probabilities is None and weights.shape[1] == 1 + self.J:
outside_eliminated_probabilities = self.compute_eliminated_probabilities(
probabilities, delta, eliminate_outside=True
probabilities, delta, mu, eliminate_outside=True
)

if compute_jacobians:
Expand All @@ -1465,11 +1477,12 @@ def compute_micro_dataset_contributions(
)

# both weights and their Jacobians will be multiplied by agent integration weights
agent_weights = self.agents.weights if agent_indices is None else self.agents.weights[agent_indices]
if len(weights.shape) == 2:
agent_weights = weights * self.agents.weights
agent_weights = weights * agent_weights
else:
assert len(weights.shape) == 3
agent_weights = weights * self.agents.weights[..., None]
agent_weights = weights * agent_weights[..., None]

# multiply weights by choice probabilities
dataset_weights = agent_weights.copy()
Expand Down Expand Up @@ -1556,16 +1569,7 @@ def compute_micro_contributions(
"""Compute contributions to micro moment values, Jacobians, and covariances. By default, use the mean utilities
with which this market was initialized and do not compute Jacobian and covariance contributions.
"""

# compute dataset contributions
datasets = [m.dataset for m in moments.micro_moments]
weights_mapping, denominator_mapping, weights_tangent_mapping, denominator_tangent_mapping = (
self.compute_micro_dataset_contributions(
datasets, delta, probabilities, probabilities_tangent_mapping, compute_jacobians
)
)

# compute this market's contribution to micro moment values' and covariances' numerators and denominators
weights_mapping: Dict[MicroDataset, Array] = {}
values_mapping: Dict[int, Array] = {}
micro_numerator = np.zeros((moments.MM, 1), options.dtype)
micro_denominator = np.zeros((moments.MM, 1), options.dtype)
Expand All @@ -1578,36 +1582,81 @@ def compute_micro_contributions(
micro_covariances_numerator = np.full(
(moments.MM, moments.MM), 0 if compute_covariances else np.nan, options.dtype
)
for m, moment in enumerate(moments.micro_moments):
dataset = moment.dataset
if dataset not in weights_mapping:
continue

# compute and validate moment values
weights = weights_mapping[dataset]
values = self.compute_micro_values(moment, weights)
# optionally split up computation by groups of agents to reduce memory usage
agent_indices_chunks = [None]
probabilities_chunk = probabilities
probabilities_tangent_mapping_chunk = probabilities_tangent_mapping
if options.micro_computation_chunks > 1:
agent_indices_chunks = np.array_split(np.arange(self.I), options.micro_computation_chunks)
for agent_indices in agent_indices_chunks:
if agent_indices is not None:
if probabilities is not None:
probabilities_chunk = probabilities[:, agent_indices]
if probabilities_tangent_mapping is not None:
probabilities_tangent_mapping_chunk = {}
for p, probabilities_tangent in probabilities_tangent_mapping.items():
probabilities_tangent_mapping_chunk[p] = probabilities_tangent[:, agent_indices]

# compute dataset contributions
datasets = [m.dataset for m in moments.micro_moments]
(
weights_mapping_chunk, denominator_mapping_chunk, weights_tangent_mapping_chunk,
denominator_tangent_mapping_chunk
) = (
self.compute_micro_dataset_contributions(
datasets, delta, probabilities_chunk, probabilities_tangent_mapping_chunk, compute_jacobians,
agent_indices
)
)

# cache values if necessary
# cache weights if necessary
if keep_mappings:
values_mapping[m] = values
for dataset, weights_chunk in weights_mapping_chunk.items():
if dataset not in weights_mapping:
weights_mapping[dataset] = weights_chunk
else:
weights_mapping[dataset] = np.row_stack([weights_mapping[dataset], weights_chunk])

# compute this market's contribution to micro moment values' and covariances' numerators and denominators
values_mapping_chunk: Dict[int, Array] = {}
for m, moment in enumerate(moments.micro_moments):
dataset = moment.dataset
if dataset not in weights_mapping_chunk:
continue

# compute and validate moment values
weights_chunk = weights_mapping_chunk[dataset]
values_chunk = self.compute_micro_values(moment, weights_chunk, agent_indices)

# cache values if necessary
if keep_mappings:
values_mapping_chunk[m] = values_chunk
if m not in values_mapping:
values_mapping[m] = values_chunk
else:
values_mapping[m] = np.row_stack([values_mapping[m], values_chunk])

# compute the contribution to the numerator and denominator
weighted_values = weights * values
micro_numerator[m] = weighted_values.sum()
micro_denominator[m] = denominator_mapping[dataset]
if compute_jacobians:
for p in range(self.parameters.P):
micro_numerator_jacobian[m, p] = (weights_tangent_mapping[(dataset, p)] * values).sum()
micro_denominator_jacobian[m, p] = denominator_tangent_mapping[(dataset, p)]

# compute the contribution to the covariance numerator
if compute_covariances:
for m2, moment2 in enumerate(moments.micro_moments):
if m2 <= m and moment2.dataset == moment.dataset:
values2 = values_mapping.get(m2)
if values2 is None:
values2 = self.compute_micro_values(moment2, weights)
micro_covariances_numerator[m2, m] = (weighted_values * values2).sum()
# compute the contribution to the numerator and denominator
weighted_values_chunk = weights_chunk * values_chunk
micro_numerator[m] += np.sum(weighted_values_chunk)
micro_denominator[m] += denominator_mapping_chunk[dataset]
if compute_jacobians:
for p in range(self.parameters.P):
micro_numerator_jacobian[m, p] += np.sum(
weights_tangent_mapping_chunk[(dataset, p)] * values_chunk
)
micro_denominator_jacobian[m, p] += denominator_tangent_mapping_chunk[(dataset, p)]

# compute the contribution to the covariance numerator
if compute_covariances:
for m2, moment2 in enumerate(moments.micro_moments):
if m2 <= m and moment2.dataset == moment.dataset:
values2_chunk = values_mapping_chunk.get(m2)
if values2_chunk is None:
values2_chunk = self.compute_micro_values(moment2, weights_chunk, agent_indices)

micro_covariances_numerator[m2, m] += np.sum(weighted_values_chunk * values2_chunk)

return (
micro_numerator, micro_denominator, micro_numerator_jacobian, micro_denominator_jacobian,
Expand Down
38 changes: 24 additions & 14 deletions pyblp/micro.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,41 @@ class MicroDataset(StringRepresentation):
compute_weights(t, products, agents) --> weights
where ``t`` is the market in which to compute weights, ``products`` is the market's :class:`Products`, and
``agents`` is the market's :class:`Agents`. The returned ``weights`` should be an array of one of the following
shapes:
- :math:`I_t \times J_t`: Conditions on inside purchases by assuming :math:`w_{di0t} = 0`. Rows correspond
to agents :math:`i \in I_t` in the same order as ``agent_data`` in :class:`Problem` or :class:`Simulation`
and columns correspond to inside products :math:`j \in J_t` in the same order as ``product_data`` in
where ``t`` is the market in which to compute weights, ``products`` is the market's :class:`Products` (with
:math:`J_t` rows), and ``agents`` is the market's :class:`Agents` (with :math:`I_t` rows), unless
``pyblp.options.micro_computation_chunks`` is larger than its default of ``1``, in which case ``agents`` is a
chunk of the market's :class:`Agents`. Denoting the number of rows in ``agents`` by :math:`I`, the returned
``weights`` should be an array of one of the following shapes:
- :math:`I \times J_t`: Conditions on inside purchases by assuming :math:`w_{di0t} = 0`. Rows correspond to
agents :math:`i \in I` in the same order as ``agent_data`` in :class:`Problem` or :class:`Simulation` and
columns correspond to inside products :math:`j \in J_t` in the same order as ``product_data`` in
:class:`Problem` or :class:`Simulation`.
- :math:`I_t \times (1 + J_t)`: The first column indexes the outside option, which can have nonzero survey
- :math:`I \times (1 + J_t)`: The first column indexes the outside option, which can have nonzero survey
weights :math:`w_{di0t}`.
If the micro dataset contains second choice data, ``weights`` can have a third axis corresponding to second
choices :math:`k` in :math:`w_{dijkt}`:
- :math:`I_t \times J_t \times J_t`: Conditions on inside purchases by assuming
- :math:`I \times J_t \times J_t`: Conditions on inside purchases by assuming
:math:`w_{di0kt} = w_{dij0t} = 0`.
- :math:`I_t \times (1 + J_t) \times J_t`: The first column indexes the outside option, but the second
- :math:`I \times (1 + J_t) \times J_t`: The first column indexes the outside option, but the second
choice is assumed to be an inside option, :math:`w_{dij0t} = 0`.
- :math:`I_t \times J_t \times (1 + J_t)`: The first index in the third axis indexes the outside option, but
- :math:`I \times J_t \times (1 + J_t)`: The first index in the third axis indexes the outside option, but
the first choice is assumed to be an inside option, :math:`w_{di0k} = 0`.
- :math:`I_t \times (1 + J_t) \times (1 + J_t)`: The first column and the first index in the third axis
- :math:`I \times (1 + J_t) \times (1 + J_t)`: The first column and the first index in the third axis
index the outside option as the first and second choice.
.. warning::
Second choice moments can use a lot of memory, especially when :math:`J_t` is large. If this becomes an
issue, consider setting ``pyblp.options.micro_computation_chunks`` to a value higher than its default of
``1``, such as the highest :math:`J_t`. This will cut down on memory usage without much affecting speed.
market_ids : `array-like, optional`
Distinct market IDs with nonzero survey weights :math:`w_{dijt}`. For other markets, :math:`w_{dijt} = 0`, and
``compute_weights`` will not be called.
Expand Down Expand Up @@ -154,8 +162,10 @@ class MicroMoment(StringRepresentation):
compute_values(t, products, agents) --> values
where ``t`` is the market in which to compute weights, ``products`` is the market's :class:`Products`, and
``agents`` is the market's :class:`Agents`. The returned ``values`` should be an array of the same shape as the
where ``t`` is the market in which to compute values, ``products`` is the market's :class:`Products` (with
:math:`J_t` rows), and ``agents`` is the market's :class:`Agents` (with :math:`I_t` rows), unless
``pyblp.options.micro_computation_chunks`` is larger than its default of ``1``, in which case ``agents`` is a
chunk of the market's :class:`Agents`. The returned ``values`` should be an array of the same shape as the
``weights`` returned by ``compute_weights`` of ``dataset``.
Examples
Expand Down
10 changes: 10 additions & 0 deletions pyblp/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@
of memory, one option is to temporarily reduce the number of markets, observations, or agents to cut down on memory
while debugging one's code to see which micro moments are collinear with one another.
micro_computation_chunks : `int`
How finely to break up micro moment computation within market. Computation is broken up by groups of agents within
market. This can help reduce the amount of memory being used by micro moments when there are a large number of
agents and products, and especially when second choice micro moments are being used.
By default, micro moment computation is done in one chunk for each market. To reduce memory usage without changing
any estimation results, for example by splitting up computation into 10 chunks, use
``pyblp.options. micro_computation_chunks = 10``.
"""

import numpy as _np
Expand All @@ -127,3 +136,4 @@
collinear_atol = collinear_rtol = 1e-10
psd_atol = psd_rtol = 1e-8
detect_micro_collinearity = False
micro_computation_chunks = 1

0 comments on commit 1015603

Please sign in to comment.