-
Notifications
You must be signed in to change notification settings - Fork 18
/
_actions.py
154 lines (113 loc) · 6.25 KB
/
_actions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from __future__ import annotations
__all__ = ("SingleColumnAction", "MultiColumnAction", "CoordColumn", "MagColumnDN", "SumColumns", "AddColumn",
"DivideColumns", "SubtractColumns", "MultiplyColumns", "FractionalResidualColumns",
"MagColumnNanoJansky",)
from typing import Iterable
import numpy as np
import pandas as pd
from astropy import units
from ..configurableActions import ConfigurableActionStructField, ConfigurableActionField
from ._baseDataFrameActions import DataFrameAction
from ._evalColumnExpression import makeColumnExpressionAction
from lsst.pex.config import Field
class SingleColumnAction(DataFrameAction):
column = Field(doc="Column to load for this action", dtype=str, optional=False)
@property
def columns(self) -> Iterable[str]:
return (self.column, )
def __call__(self, df, **kwargs):
return df[self.column]
class MultiColumnAction(DataFrameAction):
actions = ConfigurableActionStructField(doc="Configurable actions to use in a joint action")
@property
def columns(self) -> Iterable[str]:
yield from (column for action in self.actions for column in action.columns)
class CoordColumn(SingleColumnAction):
inRadians = Field(doc="Return the column in radians if true", default=True, dtype=bool)
def __call__(self, df):
col = super().__call__(df)
return col * 180 / np.pi if self.inRadians else col
class MagColumnDN(SingleColumnAction):
coadd_zeropoint = Field(doc="Magnitude zero point", dtype=float, default=27)
def __call__(self, df: pd.DataFrame, **kwargs):
if not (fluxMag0 := kwargs.get('fluxMag0')):
fluxMag0 = 1/np.power(10, -0.4*self.coadd_zeropoint)
with np.warnings.catch_warnings():
np.warnings.filterwarnings('ignore', r'invalid value encountered')
np.warnings.filterwarnings('ignore', r'divide by zero')
return -2.5 * np.log10(df[self.column] / fluxMag0)
class MagColumnNanoJansky(SingleColumnAction):
def __call__(self, df: pd.DataFrame, **kwargs):
with np.warnings.catch_warnings():
np.warnings.filterwarnings('ignore', r'invalid value encountered')
np.warnings.filterwarnings('ignore', r'divide by zero')
return -2.5 * np.log10((df[self.column] * 1e-9) / 3631.0)
class NanoJansky(SingleColumnAction):
ab_flux_scale = Field(doc="Scaling of ab flux", dtype=float, default=(0*units.ABmag).to_value(units.nJy))
coadd_zeropoint = Field(doc="Magnitude zero point", dtype=float, default=27)
def __call__(self, df, **kwargs):
dataNumber = super().__call__(df, **kwargs)
if not (fluxMag0 := kwargs.get('fluxMag0')):
fluxMag0 = 1/np.power(10, -0.4*self.coadd_zeropoint)
return self.ab_flux_scale * dataNumber / fluxMag0
def setDefaults(self):
super().setDefaults()
self.cache = True # cache this action for future calls
class NanoJanskyErr(SingleColumnAction):
flux_mag_err = Field(doc="Error in the magnitude zeropoint", dtype=float, default=0)
flux_action = ConfigurableActionField(doc="Action to use if flux is not provided to the call method",
default=NanoJansky, dtype=DataFrameAction)
@property
def columns(self):
yield from zip((self.column,), self.flux_action.columns)
def __call__(self, df, flux_column=None, flux_mag_err=None, **kwargs):
if flux_column is None:
flux_column = self.flux_action(df, **kwargs)
if flux_mag_err is None:
flux_mag_err = self.flux_mag_err
_docs = """This is a `DataFrameAction` that is designed to add two columns
together and return the result.
"""
SumColumns = makeColumnExpressionAction("SumColumns", "colA+colB",
exprDefaults={"colA": SingleColumnAction,
"colB": SingleColumnAction},
docstring=_docs)
_docs = """This is a `MultiColumnAction` that is designed to subtract two columns
together and return the result.
"""
SubtractColumns = makeColumnExpressionAction("SubtractColumns", "colA-colB",
exprDefaults={"colA": SingleColumnAction,
"colB": SingleColumnAction},
docstring=_docs)
_docs = """This is a `MultiColumnAction` that is designed to multiply two columns
together and return the result.
"""
MultiplyColumns = makeColumnExpressionAction("MultiplyColumns", "colA*colB",
exprDefaults={"colA": SingleColumnAction,
"colB": SingleColumnAction},
docstring=_docs)
_docs = """This is a `MultiColumnAction` that is designed to divide two columns
together and return the result.
"""
DivideColumns = makeColumnExpressionAction("DivideColumns", "colA/colB",
exprDefaults={"colA": SingleColumnAction,
"colB": SingleColumnAction},
docstring=_docs)
_docs = """This is a `MultiColumnAction` that is designed to divide two columns
together, subtract one and return the result.
"""
FractionalResidualColumns = makeColumnExpressionAction("FractionalResidualColumns", "(colA-colB)/colB",
exprDefaults={"colA": SingleColumnAction,
"colB": SingleColumnAction},
docstring=_docs)
class AddColumn(DataFrameAction):
aggregator = ConfigurableActionField(doc="This is an instance of a Dataframe action that will be used "
"to create a new column", dtype=DataFrameAction)
newColumn = Field(doc="Name of the new column to add", dtype=str)
@property
def columns(self) -> Iterable[str]:
yield from self.aggregator.columns
def __call__(self, df, **kwargs) -> pd.DataFrame:
# do your calculation and and
df[self.newColumn] = self.aggregator(df, kwargs)
return df