From 0267040b261ba43744605a6379ebff8e69ac59d7 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 10:00:28 -0700 Subject: [PATCH 01/16] Add column parameter to feature operations The column parameter is useful for normalization, binning, and other operations. Adding to the simple operations to keep the APIs consistent --- client/streamsql/feature.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client/streamsql/feature.py b/client/streamsql/feature.py index cb77e5d..ad2f5a3 100644 --- a/client/streamsql/feature.py +++ b/client/streamsql/feature.py @@ -24,7 +24,7 @@ def name(cls): return "sqrt" @classmethod - def apply(cls, val): + def apply(cls, column, val): return math.sqrt(val) @@ -35,7 +35,7 @@ def __init__(self, factor): def name(self): return "pow" - def apply(self, val): + def apply(self, column, val): return val**self._factor @@ -51,7 +51,7 @@ def lookup(self, entity): table = self._sources.get_table(self._definition.table) column = table.column(self._definition.column) init_value = column[entity] - return self._apply_feature(init_value, column=column) + return self._apply_feature(column, init_value) - def _apply_feature(self, init_value, column=None): - return self._definition.operation.apply(init_value) + def _apply_feature(self, column, init_value): + return self._definition.operation.apply(column, init_value) From 415cb3ef183357af648fc6f8d4a1c6b1394469ab Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 10:57:24 -0700 Subject: [PATCH 02/16] Move operations from feature.py to operation.py Operations will be used as default parameters in features. Make it an import allows them to be used without changing file ordering. --- client/streamsql/feature.py | 23 +---------------------- client/streamsql/operation.py | 22 ++++++++++++++++++++++ client/tests/test_local_backend.py | 3 ++- 3 files changed, 25 insertions(+), 23 deletions(-) create mode 100644 client/streamsql/operation.py diff --git a/client/streamsql/feature.py b/client/streamsql/feature.py index ad2f5a3..b1f2b10 100644 --- a/client/streamsql/feature.py +++ b/client/streamsql/feature.py @@ -1,4 +1,4 @@ -import math +import streamsql.operation as op class Numeric: @@ -18,27 +18,6 @@ def _instatiate(self, sources): return _NumericFeature(self, sources) -class Sqrt: - @classmethod - def name(cls): - return "sqrt" - - @classmethod - def apply(cls, column, val): - return math.sqrt(val) - - -class Pow: - def __init__(self, factor): - self._factor = factor - - def name(self): - return "pow" - - def apply(self, column, val): - return val**self._factor - - class _NumericFeature: def __init__(self, definition, sources): self._definition = definition diff --git a/client/streamsql/operation.py b/client/streamsql/operation.py new file mode 100644 index 0000000..ae4f3a9 --- /dev/null +++ b/client/streamsql/operation.py @@ -0,0 +1,22 @@ +import math + + +class Sqrt: + @classmethod + def name(cls): + return "sqrt" + + @classmethod + def apply(cls, column, val): + return math.sqrt(val) + + +class Pow: + def __init__(self, factor): + self._factor = factor + + def name(self): + return "pow" + + def apply(self, column, val): + return val**self._factor diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py index 0ff2f88..d2f2a45 100644 --- a/client/tests/test_local_backend.py +++ b/client/tests/test_local_backend.py @@ -2,6 +2,7 @@ import streamsql.local import streamsql.errors import streamsql.feature +import streamsql.operation import os, sys import pandas as pd @@ -130,7 +131,7 @@ def test_numeric_feature(feature_store): name="sq_price", table="users", column="balance", - operation=streamsql.feature.Pow(2), + operation=streamsql.operation.Pow(2), parent_entity="user", ) feature_store.register_features(feature) From b64fdbfc6043189eb22b1d31d57e3b1a4ed49a69 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 10:59:33 -0700 Subject: [PATCH 03/16] Add NoOp operation NoOp operation is cleaner than checking for None. It's also a nice utility to match the interface with an operation that does nothing. --- client/streamsql/feature.py | 2 +- client/streamsql/operation.py | 10 ++++++++++ client/tests/test_local_backend.py | 14 ++++++++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/client/streamsql/feature.py b/client/streamsql/feature.py index b1f2b10..ff1d869 100644 --- a/client/streamsql/feature.py +++ b/client/streamsql/feature.py @@ -6,7 +6,7 @@ def __init__(self, name="", table="", column="", - operation=None, + operation=op.NoOp, parent_entity=None): self.name = name self.table = table diff --git a/client/streamsql/operation.py b/client/streamsql/operation.py index ae4f3a9..307a74e 100644 --- a/client/streamsql/operation.py +++ b/client/streamsql/operation.py @@ -1,6 +1,16 @@ import math +class NoOp: + @classmethod + def name(cls): + return "no_op" + + @classmethod + def apply(cls, column, value): + return value + + class Sqrt: @classmethod def name(cls): diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py index d2f2a45..77ab5d8 100644 --- a/client/tests/test_local_backend.py +++ b/client/tests/test_local_backend.py @@ -138,3 +138,17 @@ def test_numeric_feature(feature_store): inputs = feature_store.online_features(["sq_price"], entities={"user": "1"}) assert inputs == [123**2] + + +def test_noop_feature(feature_store): + create_users_table(feature_store) + feature = streamsql.feature.Numeric( + name="price", + table="users", + column="balance", + parent_entity="user", + ) + feature_store.register_features(feature) + inputs = feature_store.online_features(["price"], + entities={"user": "1"}) + assert inputs == [123] From 9818054c44e6b6ffb261bcf5cebe09a4ecb13089 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 11:13:24 -0700 Subject: [PATCH 04/16] Remove unused sys dependency from local.py tests --- client/tests/test_local_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py index 77ab5d8..eee9b57 100644 --- a/client/tests/test_local_backend.py +++ b/client/tests/test_local_backend.py @@ -3,7 +3,7 @@ import streamsql.errors import streamsql.feature import streamsql.operation -import os, sys +import os import pandas as pd test_dir = os.path.dirname(os.path.realpath(__file__)) From 3493562e09d4c2cd8bb7cb200042d29883702d88 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 11:14:20 -0700 Subject: [PATCH 05/16] Rename price feature to match balance column In the local tests, the feature name was inconsistent with the source column name. --- client/tests/test_local_backend.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py index eee9b57..e07cc06 100644 --- a/client/tests/test_local_backend.py +++ b/client/tests/test_local_backend.py @@ -128,14 +128,14 @@ def test_materialized_table_is_stored(feature_store): def test_numeric_feature(feature_store): create_users_table(feature_store) feature = streamsql.feature.Numeric( - name="sq_price", + name="sq_balance", table="users", column="balance", operation=streamsql.operation.Pow(2), parent_entity="user", ) feature_store.register_features(feature) - inputs = feature_store.online_features(["sq_price"], + inputs = feature_store.online_features(["sq_balance"], entities={"user": "1"}) assert inputs == [123**2] @@ -143,12 +143,12 @@ def test_numeric_feature(feature_store): def test_noop_feature(feature_store): create_users_table(feature_store) feature = streamsql.feature.Numeric( - name="price", + name="balance", table="users", column="balance", parent_entity="user", ) feature_store.register_features(feature) - inputs = feature_store.online_features(["price"], + inputs = feature_store.online_features(["balance"], entities={"user": "1"}) assert inputs == [123] From 1207428077698bf71e65bbac925a7144faa55793 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 12:31:24 -0700 Subject: [PATCH 06/16] Add MinMax Operation MinMax will be used as a scaling operation for Feature. --- client/streamsql/local.py | 6 ++++++ client/streamsql/operation.py | 24 ++++++++++++++++++++++++ client/tests/test_operations.py | 20 ++++++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 client/tests/test_operations.py diff --git a/client/streamsql/local.py b/client/streamsql/local.py index 1e1fe2a..5740446 100644 --- a/client/streamsql/local.py +++ b/client/streamsql/local.py @@ -117,3 +117,9 @@ def __init__(self, name, series): def __getitem__(self, key): return self._series.loc[key] + + def min(self): + return self._series.min() + + def max(self): + return self._series.max() diff --git a/client/streamsql/operation.py b/client/streamsql/operation.py index 307a74e..dd72a4b 100644 --- a/client/streamsql/operation.py +++ b/client/streamsql/operation.py @@ -11,6 +11,30 @@ def apply(cls, column, value): return value +class MinMax: + """MinMax linearly scales a feature to fit between a min and max""" + def __init__(self, min=0, max=1): + self._min = min + self._max = max + self._range = max - min + + def name(self, name): + return "min_max" + + def apply(self, column, value): + clm_max = column.max() + clm_min = column.min() + clm_range = clm_max - clm_min + # Transform value to what it would be if the column was scaled from zero + # to one. + zero_to_one = (value - clm_min) / clm_range + # Transform value to what it would be if the column was scaled from zero + # to this MinMax range. + scaled = zero_to_one * self._range + # Shift value so that it starts at the minimum. + return scaled + self._min + + class Sqrt: @classmethod def name(cls): diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py new file mode 100644 index 0000000..b64dba4 --- /dev/null +++ b/client/tests/test_operations.py @@ -0,0 +1,20 @@ +import pandas +import pytest +import streamsql.local +import streamsql.operation as op + +@pytest.fixture +def int_column(): + data = [1,4,3,2,5] + series = pandas.Series(data) + return streamsql.local.Column("int_column", series) + +@pytest.mark.parametrize( + "min, max, transform", + [(0, 1, (3, 0.5)), (-1, 1, (3, 0)), (-100, 100, (5, 100))] +) +def test_min_max(int_column, min, max, transform): + scale = op.MinMax(min, max) + initial, expected = transform + actual = scale.apply(int_column, initial) + assert actual == expected From 84291492fee828eb57043f457dec42d5ee1f6896 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 12:38:17 -0700 Subject: [PATCH 07/16] Add Sqrt and Pow operation tests --- client/tests/test_operations.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py index b64dba4..07fbc2f 100644 --- a/client/tests/test_operations.py +++ b/client/tests/test_operations.py @@ -18,3 +18,15 @@ def test_min_max(int_column, min, max, transform): initial, expected = transform actual = scale.apply(int_column, initial) assert actual == expected + + +def test_sqrt(int_column): + assert op.Sqrt().apply(int_column, 4) == 2 + + +@pytest.mark.parametrize("factor, transform", [(2, (2, 4)), (-1, (2, 0.5)), + (3, (3, 27)), (0, (100, 1)), + (0, (0, 1))]) +def test_pow(int_column, factor, transform): + initial, expected = transform + assert op.Pow(factor).apply(int_column, initial) == expected From fc7de90ad8674f63cae29a8b99c2a6356f1d0a79 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 12:40:00 -0700 Subject: [PATCH 08/16] Format Python test code Forgot to do it before last commit. --- client/tests/test_local_backend.py | 3 +-- client/tests/test_operations.py | 11 ++++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py index e07cc06..a33ec5f 100644 --- a/client/tests/test_local_backend.py +++ b/client/tests/test_local_backend.py @@ -149,6 +149,5 @@ def test_noop_feature(feature_store): parent_entity="user", ) feature_store.register_features(feature) - inputs = feature_store.online_features(["balance"], - entities={"user": "1"}) + inputs = feature_store.online_features(["balance"], entities={"user": "1"}) assert inputs == [123] diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py index 07fbc2f..0e3ac15 100644 --- a/client/tests/test_operations.py +++ b/client/tests/test_operations.py @@ -3,16 +3,17 @@ import streamsql.local import streamsql.operation as op + @pytest.fixture def int_column(): - data = [1,4,3,2,5] + data = [1, 4, 3, 2, 5] series = pandas.Series(data) return streamsql.local.Column("int_column", series) -@pytest.mark.parametrize( - "min, max, transform", - [(0, 1, (3, 0.5)), (-1, 1, (3, 0)), (-100, 100, (5, 100))] -) + +@pytest.mark.parametrize("min, max, transform", [(0, 1, (3, 0.5)), + (-1, 1, (3, 0)), + (-100, 100, (5, 100))]) def test_min_max(int_column, min, max, transform): scale = op.MinMax(min, max) initial, expected = transform From 7849408ac30f47396aa135d9690498359acd0bb7 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 12:43:03 -0700 Subject: [PATCH 09/16] Change int_column fixture for operation tests This makes the column have a different mean and median value to help testing. --- client/tests/test_operations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py index 0e3ac15..67a5b41 100644 --- a/client/tests/test_operations.py +++ b/client/tests/test_operations.py @@ -6,7 +6,7 @@ @pytest.fixture def int_column(): - data = [1, 4, 3, 2, 5] + data = [1, 4, 3, 2, 5, 5] series = pandas.Series(data) return streamsql.local.Column("int_column", series) From 290126df934dcf16d1de910758edddb0c1f02e5e Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 12:47:24 -0700 Subject: [PATCH 10/16] Change operation test to use Sqrt class directly The Sqrt object shouldn't be used. --- client/tests/test_operations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py index 67a5b41..fc34110 100644 --- a/client/tests/test_operations.py +++ b/client/tests/test_operations.py @@ -22,7 +22,7 @@ def test_min_max(int_column, min, max, transform): def test_sqrt(int_column): - assert op.Sqrt().apply(int_column, 4) == 2 + assert op.Sqrt.apply(int_column, 4) == 2 @pytest.mark.parametrize("factor, transform", [(2, (2, 4)), (-1, (2, 0.5)), From fff9821e3384a862f7ba60e7f215e1c9196f87b3 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 12:57:36 -0700 Subject: [PATCH 11/16] Add Mean, Median, and Zero operations These operations will be used to fill missing values. --- client/streamsql/local.py | 6 ++++++ client/streamsql/operation.py | 30 ++++++++++++++++++++++++++++++ client/tests/test_operations.py | 12 ++++++++++++ 3 files changed, 48 insertions(+) diff --git a/client/streamsql/local.py b/client/streamsql/local.py index 5740446..39cb713 100644 --- a/client/streamsql/local.py +++ b/client/streamsql/local.py @@ -123,3 +123,9 @@ def min(self): def max(self): return self._series.max() + + def mean(self): + return self._series.mean() + + def median(self): + return self._series.median() diff --git a/client/streamsql/operation.py b/client/streamsql/operation.py index dd72a4b..2064a4b 100644 --- a/client/streamsql/operation.py +++ b/client/streamsql/operation.py @@ -11,6 +11,36 @@ def apply(cls, column, value): return value +class Median: + @classmethod + def name(cls): + return "median" + + @classmethod + def apply(cls, column, *ignore): + return column.median() + + +class Mean: + @classmethod + def name(cls): + return "mean" + + @classmethod + def apply(cls, column, *ignore): + return column.mean() + + +class Zero: + @classmethod + def name(cls): + return "zero" + + @classmethod + def apply(cls, column, *ignore): + return 0 + + class MinMax: """MinMax linearly scales a feature to fit between a min and max""" def __init__(self, min=0, max=1): diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py index fc34110..41fb18b 100644 --- a/client/tests/test_operations.py +++ b/client/tests/test_operations.py @@ -31,3 +31,15 @@ def test_sqrt(int_column): def test_pow(int_column, factor, transform): initial, expected = transform assert op.Pow(factor).apply(int_column, initial) == expected + + +def test_median(int_column): + assert op.Median.apply(int_column) == 3.5 + + +def test_mean(int_column): + assert op.Mean.apply(int_column) == 3 + 1 / 3 + + +def test_zero(int_column): + assert op.Zero.apply(int_column) == 0 From ca13b75ee30db67d8ebd2abe1aba61ae74f2f357 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 14:13:41 -0700 Subject: [PATCH 12/16] Add Z-Score scaling --- client/streamsql/local.py | 3 +++ client/streamsql/operation.py | 10 ++++++++++ client/tests/test_operations.py | 6 ++++++ 3 files changed, 19 insertions(+) diff --git a/client/streamsql/local.py b/client/streamsql/local.py index 39cb713..1eb87a2 100644 --- a/client/streamsql/local.py +++ b/client/streamsql/local.py @@ -129,3 +129,6 @@ def mean(self): def median(self): return self._series.median() + + def std(self): + return self._series.std() diff --git a/client/streamsql/operation.py b/client/streamsql/operation.py index 2064a4b..2527a15 100644 --- a/client/streamsql/operation.py +++ b/client/streamsql/operation.py @@ -41,6 +41,16 @@ def apply(cls, column, *ignore): return 0 +class ZScore: + @classmethod + def name(cls): + return "z_score" + + @classmethod + def apply(cls, column, value): + return (value - column.mean()) / column.std() + + class MinMax: """MinMax linearly scales a feature to fit between a min and max""" def __init__(self, min=0, max=1): diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py index 41fb18b..0f56a94 100644 --- a/client/tests/test_operations.py +++ b/client/tests/test_operations.py @@ -21,6 +21,12 @@ def test_min_max(int_column, min, max, transform): assert actual == expected +@pytest.mark.parametrize("initial, expected", [(3 + 1 / 3, 0), + (100, 59.19600211726014)]) +def test_z_score(int_column, initial, expected): + assert op.ZScore.apply(int_column, initial) == expected + + def test_sqrt(int_column): assert op.Sqrt.apply(int_column, 4) == 2 From 13e5193c61bc88b225b7d7fb5d7ac94d420c8fa1 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 20:04:02 -0700 Subject: [PATCH 13/16] Add ZScoreCap and QuantileCap These two operations provide a basis for outlier handling. --- client/streamsql/local.py | 3 +++ client/streamsql/operation.py | 33 +++++++++++++++++++++++++++++++++ client/tests/test_operations.py | 13 +++++++++++++ 3 files changed, 49 insertions(+) diff --git a/client/streamsql/local.py b/client/streamsql/local.py index 1eb87a2..46eaa64 100644 --- a/client/streamsql/local.py +++ b/client/streamsql/local.py @@ -132,3 +132,6 @@ def median(self): def std(self): return self._series.std() + + def quantile(self, *quantiles): + return self._series.quantile(quantiles) diff --git a/client/streamsql/operation.py b/client/streamsql/operation.py index 2527a15..60f13e8 100644 --- a/client/streamsql/operation.py +++ b/client/streamsql/operation.py @@ -1,6 +1,26 @@ import math +class ZScoreCap: + def __init__(self, max_devs=3): + self._max_devs = max_devs + + def apply(self, column, value): + zscore = ZScore.apply(column, value) + capped_zscore = _cap(zscore, -self._max_devs, self._max_devs) + return ZScore.revert(column, capped_zscore) + + +class QuantileCap: + def __init__(self, bottom=0, top=1): + self._bottom = bottom + self._top = top + + def apply(self, column, value): + bottom_val, top_val = column.quantile(self._bottom, self._top) + return _cap(value, bottom_val, top_val) + + class NoOp: @classmethod def name(cls): @@ -50,6 +70,10 @@ def name(cls): def apply(cls, column, value): return (value - column.mean()) / column.std() + @classmethod + def revert(cls, column, value): + return value * column.std() + column.mean() + class MinMax: """MinMax linearly scales a feature to fit between a min and max""" @@ -94,3 +118,12 @@ def name(self): def apply(self, column, val): return val**self._factor + + +def _cap(value, lower, upper): + if value > upper: + return upper + elif value < lower: + return lower + else: + return value diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py index 0f56a94..eb108cb 100644 --- a/client/tests/test_operations.py +++ b/client/tests/test_operations.py @@ -11,6 +11,19 @@ def int_column(): return streamsql.local.Column("int_column", series) +@pytest.mark.parametrize("initial, expected", + [(3, 3), (100, 4.9663264951887856), (2, 2), + (-1, 1.7003401714778814)]) +def test_zscore_cap(int_column, initial, expected): + assert op.ZScoreCap(1).apply(int_column, initial) == expected + + +@pytest.mark.parametrize("initial, expected", [(3, 3), (100, 3.5), (2, 2), + (-1, 1)]) +def test_quantile_cap(int_column, initial, expected): + assert op.QuantileCap(top=0.5).apply(int_column, initial) == expected + + @pytest.mark.parametrize("min, max, transform", [(0, 1, (3, 0.5)), (-1, 1, (3, 0)), (-100, 100, (5, 100))]) From 06ca41d46f5676f380168e6d3dc30a5f918cf5ac Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 21:57:06 -0700 Subject: [PATCH 14/16] Rename Cap to Trunc for operations Truncate is slightly clearer and more consistent in the numeric feature definition --- client/streamsql/operation.py | 4 ++-- client/tests/test_operations.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client/streamsql/operation.py b/client/streamsql/operation.py index 60f13e8..de449a0 100644 --- a/client/streamsql/operation.py +++ b/client/streamsql/operation.py @@ -1,7 +1,7 @@ import math -class ZScoreCap: +class ZScoreTrunc: def __init__(self, max_devs=3): self._max_devs = max_devs @@ -11,7 +11,7 @@ def apply(self, column, value): return ZScore.revert(column, capped_zscore) -class QuantileCap: +class QuantileTrunc: def __init__(self, bottom=0, top=1): self._bottom = bottom self._top = top diff --git a/client/tests/test_operations.py b/client/tests/test_operations.py index eb108cb..25e4525 100644 --- a/client/tests/test_operations.py +++ b/client/tests/test_operations.py @@ -15,13 +15,13 @@ def int_column(): [(3, 3), (100, 4.9663264951887856), (2, 2), (-1, 1.7003401714778814)]) def test_zscore_cap(int_column, initial, expected): - assert op.ZScoreCap(1).apply(int_column, initial) == expected + assert op.ZScoreTrunc(1).apply(int_column, initial) == expected @pytest.mark.parametrize("initial, expected", [(3, 3), (100, 3.5), (2, 2), (-1, 1)]) def test_quantile_cap(int_column, initial, expected): - assert op.QuantileCap(top=0.5).apply(int_column, initial) == expected + assert op.QuantileTrunc(top=0.5).apply(int_column, initial) == expected @pytest.mark.parametrize("min, max, transform", [(0, 1, (3, 0.5)), From 95d9dd8accc800d978ca30ae04df2c3e439f6310 Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 21:59:23 -0700 Subject: [PATCH 15/16] Rename operation to transform in feature Normalizing, truncating, etc. are all operations. This can lead to confusion where-as transform is clearer. --- client/streamsql/feature.py | 4 ++-- client/tests/test_local_backend.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/streamsql/feature.py b/client/streamsql/feature.py index ff1d869..a9c5cd2 100644 --- a/client/streamsql/feature.py +++ b/client/streamsql/feature.py @@ -6,12 +6,12 @@ def __init__(self, name="", table="", column="", - operation=op.NoOp, + transform=op.NoOp, parent_entity=None): self.name = name self.table = table self.column = column - self.operation = operation + self.transform = transform self.parent_entity = parent_entity def _instatiate(self, sources): diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py index a33ec5f..d6dfe59 100644 --- a/client/tests/test_local_backend.py +++ b/client/tests/test_local_backend.py @@ -131,7 +131,7 @@ def test_numeric_feature(feature_store): name="sq_balance", table="users", column="balance", - operation=streamsql.operation.Pow(2), + transform=streamsql.operation.Pow(2), parent_entity="user", ) feature_store.register_features(feature) From 3c4a32fefb3cf3b40ee736f85e53d9423fdd5cfe Mon Sep 17 00:00:00 2001 From: Simba Date: Wed, 6 May 2020 22:01:16 -0700 Subject: [PATCH 16/16] Add normalize, truncate, and fill_missing This allows a variety of typical feature engineering techniques to be appied using StreamSQL. --- client/streamsql/feature.py | 16 +++++++++- client/tests/test_feature.py | 38 ++++++++++++++++++++++++ client/tests/testdata/numeric_tester.csv | 6 ++++ 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 client/tests/test_feature.py create mode 100644 client/tests/testdata/numeric_tester.csv diff --git a/client/streamsql/feature.py b/client/streamsql/feature.py index a9c5cd2..fce498b 100644 --- a/client/streamsql/feature.py +++ b/client/streamsql/feature.py @@ -1,4 +1,5 @@ import streamsql.operation as op +import math class Numeric: @@ -7,11 +8,17 @@ def __init__(self, table="", column="", transform=op.NoOp, + normalize=op.NoOp, + truncate=op.NoOp, + fill_missing=op.NoOp, parent_entity=None): self.name = name self.table = table self.column = column self.transform = transform + self.normalize = normalize + self.truncate = truncate + self.fill_missing = fill_missing self.parent_entity = parent_entity def _instatiate(self, sources): @@ -33,4 +40,11 @@ def lookup(self, entity): return self._apply_feature(column, init_value) def _apply_feature(self, column, init_value): - return self._definition.operation.apply(column, init_value) + d = self._definition + if math.isnan(init_value): + init_value = d.fill_missing.apply(column, init_value) + fn_order = [d.truncate, d.normalize, d.transform] + cur_value = init_value + for fn in fn_order: + cur_value = fn.apply(column, cur_value) + return cur_value diff --git a/client/tests/test_feature.py b/client/tests/test_feature.py new file mode 100644 index 0000000..79366ac --- /dev/null +++ b/client/tests/test_feature.py @@ -0,0 +1,38 @@ +import os +import pytest +import streamsql.local +import streamsql.feature as feature +import streamsql.operation as op + +test_dir = os.path.dirname(os.path.realpath(__file__)) +testdata_dir = os.path.join(test_dir, 'testdata') +numeric_tester_file = os.path.join(testdata_dir, 'numeric_tester.csv') + + +@pytest.fixture +def feature_store(): + return streamsql.local.FeatureStore() + + +def create_numeric_tester_table(feature_store): + return feature_store.create_table_from_csv(numeric_tester_file, + table_name="numeric_tester", + primary_key="id") + + +def test_numeric_feature(feature_store): + create_numeric_tester_table(feature_store) + feature_store.register_features( + feature.Numeric( + name="do-everything", + table="numeric_tester", + column="value", + transform=op.Pow(2), + normalize=op.MinMax(min=0, max=4), + fill_missing=op.Mean, + truncate=op.QuantileTrunc(bottom=0.2), + parent_entity="id", + )) + inputs = feature_store.online_features(["do-everything"], + entities={"id": "3"}) + assert inputs == [4] diff --git a/client/tests/testdata/numeric_tester.csv b/client/tests/testdata/numeric_tester.csv new file mode 100644 index 0000000..1761e4a --- /dev/null +++ b/client/tests/testdata/numeric_tester.csv @@ -0,0 +1,6 @@ +id,value +0,-100 +1,100 +2,200 +3, +4,400