Skip to content

Commit

Permalink
[Feature store] Support EmitPolicy in aggregations (#903)
Browse files Browse the repository at this point in the history
  • Loading branch information
katyakats committed May 2, 2021
1 parent 65744d2 commit 9a23e62
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
12 changes: 11 additions & 1 deletion mlrun/feature_store/feature_set.py
Expand Up @@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from typing import List, Optional

import pandas as pd
from storey import EmitPolicy

import mlrun

Expand Down Expand Up @@ -351,6 +352,7 @@ def add_aggregation(
state_name=None,
after=None,
before=None,
emit_policy: Optional[EmitPolicy] = None,
):
"""add feature aggregation rule
Expand All @@ -366,6 +368,8 @@ def add_aggregation(
:param state_name: optional, graph state name
:param after: optional, after which graph state it runs
:param before: optional, comes before graph state
:param emit_policy optional. Define emit policy of the aggregations. For example EmitAfterMaxEvent (will emit
the Nth event). The default behaviour is emitting every event
"""
aggregation = FeatureAggregation(
name, column, operations, windows, period
Expand All @@ -384,14 +388,20 @@ def upsert_feature(name):
aggregations = state.class_args.get("aggregates", [])
aggregations.append(aggregation)
state.class_args["aggregates"] = aggregations
if emit_policy:
state.class_args["emit_policy"] = emit_policy
else:
class_args = {}
if emit_policy:
class_args["emit_policy"] = emit_policy
state = graph.add_step(
name=state_name,
after=after or previous_step,
before=before,
class_name="storey.AggregateByKey",
aggregates=[aggregation],
table=".",
**class_args,
)

for operation in operations:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -51,4 +51,4 @@ fsspec~=0.9.0
v3iofs~=0.1.5
# 3.4 and above failed builidng in some images - see https://github.com/pyca/cryptography/issues/5771
cryptography~=3.3.2
storey~=0.4.6; python_version >= '3.7'
storey~=0.4.8; python_version >= '3.7'
3 changes: 2 additions & 1 deletion tests/system/feature_store/test_feature_store.py
Expand Up @@ -5,7 +5,7 @@

import pandas as pd
import pytest
from storey import MapClass
from storey import EmitAfterMaxEvent, MapClass

import mlrun
import mlrun.feature_store as fs
Expand Down Expand Up @@ -396,6 +396,7 @@ def test_multiple_entities(self):
operations=["sum", "max"],
windows=["1h"],
period="10m",
emit_policy=EmitAfterMaxEvent(1),
)
fs.infer_metadata(
data_set,
Expand Down

0 comments on commit 9a23e62

Please sign in to comment.