-
Notifications
You must be signed in to change notification settings - Fork 2
/
keyedDataActions.py
158 lines (128 loc) · 5.44 KB
/
keyedDataActions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# This file is part of analysis_tools.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
__all__ = (
"ChainedKeyedDataActions",
"AddComputedVector",
"KeyedDataSelectorAction",
"KeyedScalars",
)
from typing import Optional, cast
import numpy as np
from lsst.pex.config import Field
from lsst.pex.config.configurableActions import ConfigurableActionField, ConfigurableActionStructField
from lsst.pex.config.listField import ListField
from ...interfaces import (
KeyedData,
KeyedDataAction,
KeyedDataSchema,
Scalar,
ScalarAction,
Vector,
VectorAction,
)
class ChainedKeyedDataActions(KeyedDataAction):
r"""Run a series of `KeyedDataAction`\ s and accumulated their output into
one KeyedData result.
"""
keyedDataActions = ConfigurableActionStructField[KeyedDataAction](
doc="Set of KeyedData actions to run, results will be concatenated into a final output KeyedData"
"object"
)
def getInputSchema(self) -> KeyedDataSchema:
for action in self.keyedDataActions:
yield from action.getInputSchema()
def getOutputSchema(self) -> KeyedDataSchema:
for action in self.keyedDataActions:
output = action.getOutputSchema()
if output is not None:
yield from output
def __call__(self, data: KeyedData, **kwargs) -> KeyedData:
result: KeyedData = {} # type:ignore
for action in self.keyedDataActions:
for column, values in action(data, **kwargs).items():
result[column] = values
return result
class AddComputedVector(KeyedDataAction):
"""Compute a `Vector` from the specified `VectorAction` and add it to a
copy of the KeyedData, returning the result.
"""
action = ConfigurableActionField[VectorAction](doc="Action to use to compute Vector")
keyName = Field[str](doc="Key name to add to KeyedData")
def getInputSchema(self) -> KeyedDataSchema:
yield from self.action.getInputSchema()
def getOutputSchema(self) -> KeyedDataSchema:
return ((self.keyName, Vector),)
def __call__(self, data: KeyedData, **kwargs) -> KeyedData:
result = dict(data)
result[self.keyName.format(**kwargs)] = self.action(data, **kwargs)
return result
class KeyedDataSelectorAction(KeyedDataAction):
"""Extract Vector specified by ``vectorKeys`` from input KeyedData and
optionally apply selectors to down select extracted vectors.
Note this action will not work with keyed scalars, see `getInputSchema` for
expected schema.
"""
vectorKeys = ListField[str](doc="Keys to extract from KeyedData and return", default=[])
selectors = ConfigurableActionStructField[VectorAction](
doc="Selectors for selecting rows, will be AND together",
)
def getInputSchema(self) -> KeyedDataSchema:
yield from ((column, Vector | Scalar) for column in self.vectorKeys) # type: ignore
for action in self.selectors:
yield from action.getInputSchema()
def getOutputSchema(self) -> KeyedDataSchema:
return ((column, Vector | Scalar) for column in self.vectorKeys) # type: ignore
def __call__(self, data: KeyedData, **kwargs) -> KeyedData:
mask: Optional[np.ndarray] = None
for selector in self.selectors:
subMask = selector(data, **kwargs)
if mask is None:
mask = subMask
else:
mask *= subMask # type: ignore
result = {}
for key in self.vectorKeys:
key_arg = key.format_map(kwargs)
result[key_arg] = data[key_arg]
if mask is not None:
return {key: cast(Vector, col)[mask] for key, col in result.items()}
else:
return result
class KeyedScalars(KeyedDataAction):
"""Creates an output of type KeyedData, where the keys are given by the
identifiers in `scalarActions` and the values are the results of the
corresponding `ScalarAction`.
"""
scalarActions = ConfigurableActionStructField[ScalarAction](
doc="Create a KeyedData of individual ScalarActions"
)
def getInputSchema(self) -> KeyedDataSchema:
for action in self.scalarActions:
yield from action.getInputSchema()
def getOutputSchema(self) -> KeyedDataSchema:
for name in self.scalarActions.fieldNames:
yield (name, Scalar)
def __call__(self, data: KeyedData, **kwargs) -> KeyedData:
result: KeyedData = {} # type: ignore
for name, action in self.scalarActions.items():
result[name] = action(data, **kwargs)
return result