Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions client/streamsql/feature.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import streamsql.operation as op
import math


Expand All @@ -6,39 +7,24 @@ def __init__(self,
name="",
table="",
column="",
operation=None,
transform=op.NoOp,
normalize=op.NoOp,
truncate=op.NoOp,
fill_missing=op.NoOp,
parent_entity=None):
self.name = name
self.table = table
self.column = column
self.operation = operation
self.transform = transform
self.normalize = normalize
self.truncate = truncate
self.fill_missing = fill_missing
self.parent_entity = parent_entity

def _instatiate(self, sources):
return _NumericFeature(self, sources)


class Sqrt:
@classmethod
def name(cls):
return "sqrt"

@classmethod
def apply(cls, val):
return math.sqrt(val)


class Pow:
def __init__(self, factor):
self._factor = factor

def name(self):
return "pow"

def apply(self, val):
return val**self._factor


class _NumericFeature:
def __init__(self, definition, sources):
self._definition = definition
Expand All @@ -51,7 +37,14 @@ def lookup(self, entity):
table = self._sources.get_table(self._definition.table)
column = table.column(self._definition.column)
init_value = column[entity]
return self._apply_feature(init_value, column=column)

def _apply_feature(self, init_value, column=None):
return self._definition.operation.apply(init_value)
return self._apply_feature(column, init_value)

def _apply_feature(self, column, init_value):
d = self._definition
if math.isnan(init_value):
init_value = d.fill_missing.apply(column, init_value)
fn_order = [d.truncate, d.normalize, d.transform]
cur_value = init_value
for fn in fn_order:
cur_value = fn.apply(column, cur_value)
return cur_value
18 changes: 18 additions & 0 deletions client/streamsql/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,21 @@ def __init__(self, name, series):

def __getitem__(self, key):
return self._series.loc[key]

def min(self):
return self._series.min()

def max(self):
return self._series.max()

def mean(self):
return self._series.mean()

def median(self):
return self._series.median()

def std(self):
return self._series.std()

def quantile(self, *quantiles):
return self._series.quantile(quantiles)
129 changes: 129 additions & 0 deletions client/streamsql/operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import math


class ZScoreTrunc:
def __init__(self, max_devs=3):
self._max_devs = max_devs

def apply(self, column, value):
zscore = ZScore.apply(column, value)
capped_zscore = _cap(zscore, -self._max_devs, self._max_devs)
return ZScore.revert(column, capped_zscore)


class QuantileTrunc:
def __init__(self, bottom=0, top=1):
self._bottom = bottom
self._top = top

def apply(self, column, value):
bottom_val, top_val = column.quantile(self._bottom, self._top)
return _cap(value, bottom_val, top_val)


class NoOp:
@classmethod
def name(cls):
return "no_op"

@classmethod
def apply(cls, column, value):
return value


class Median:
@classmethod
def name(cls):
return "median"

@classmethod
def apply(cls, column, *ignore):
return column.median()


class Mean:
@classmethod
def name(cls):
return "mean"

@classmethod
def apply(cls, column, *ignore):
return column.mean()


class Zero:
@classmethod
def name(cls):
return "zero"

@classmethod
def apply(cls, column, *ignore):
return 0


class ZScore:
@classmethod
def name(cls):
return "z_score"

@classmethod
def apply(cls, column, value):
return (value - column.mean()) / column.std()

@classmethod
def revert(cls, column, value):
return value * column.std() + column.mean()


class MinMax:
"""MinMax linearly scales a feature to fit between a min and max"""
def __init__(self, min=0, max=1):
self._min = min
self._max = max
self._range = max - min

def name(self, name):
return "min_max"

def apply(self, column, value):
clm_max = column.max()
clm_min = column.min()
clm_range = clm_max - clm_min
# Transform value to what it would be if the column was scaled from zero
# to one.
zero_to_one = (value - clm_min) / clm_range
# Transform value to what it would be if the column was scaled from zero
# to this MinMax range.
scaled = zero_to_one * self._range
# Shift value so that it starts at the minimum.
return scaled + self._min


class Sqrt:
@classmethod
def name(cls):
return "sqrt"

@classmethod
def apply(cls, column, val):
return math.sqrt(val)


class Pow:
def __init__(self, factor):
self._factor = factor

def name(self):
return "pow"

def apply(self, column, val):
return val**self._factor


def _cap(value, lower, upper):
if value > upper:
return upper
elif value < lower:
return lower
else:
return value
38 changes: 38 additions & 0 deletions client/tests/test_feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import pytest
import streamsql.local
import streamsql.feature as feature
import streamsql.operation as op

test_dir = os.path.dirname(os.path.realpath(__file__))
testdata_dir = os.path.join(test_dir, 'testdata')
numeric_tester_file = os.path.join(testdata_dir, 'numeric_tester.csv')


@pytest.fixture
def feature_store():
return streamsql.local.FeatureStore()


def create_numeric_tester_table(feature_store):
return feature_store.create_table_from_csv(numeric_tester_file,
table_name="numeric_tester",
primary_key="id")


def test_numeric_feature(feature_store):
create_numeric_tester_table(feature_store)
feature_store.register_features(
feature.Numeric(
name="do-everything",
table="numeric_tester",
column="value",
transform=op.Pow(2),
normalize=op.MinMax(min=0, max=4),
fill_missing=op.Mean,
truncate=op.QuantileTrunc(bottom=0.2),
parent_entity="id",
))
inputs = feature_store.online_features(["do-everything"],
entities={"id": "3"})
assert inputs == [4]
22 changes: 18 additions & 4 deletions client/tests/test_local_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import streamsql.local
import streamsql.errors
import streamsql.feature
import os, sys
import streamsql.operation
import os
import pandas as pd

test_dir = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -127,13 +128,26 @@ def test_materialized_table_is_stored(feature_store):
def test_numeric_feature(feature_store):
create_users_table(feature_store)
feature = streamsql.feature.Numeric(
name="sq_price",
name="sq_balance",
table="users",
column="balance",
operation=streamsql.feature.Pow(2),
transform=streamsql.operation.Pow(2),
parent_entity="user",
)
feature_store.register_features(feature)
inputs = feature_store.online_features(["sq_price"],
inputs = feature_store.online_features(["sq_balance"],
entities={"user": "1"})
assert inputs == [123**2]


def test_noop_feature(feature_store):
create_users_table(feature_store)
feature = streamsql.feature.Numeric(
name="balance",
table="users",
column="balance",
parent_entity="user",
)
feature_store.register_features(feature)
inputs = feature_store.online_features(["balance"], entities={"user": "1"})
assert inputs == [123]
64 changes: 64 additions & 0 deletions client/tests/test_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import pandas
import pytest
import streamsql.local
import streamsql.operation as op


@pytest.fixture
def int_column():
data = [1, 4, 3, 2, 5, 5]
series = pandas.Series(data)
return streamsql.local.Column("int_column", series)


@pytest.mark.parametrize("initial, expected",
[(3, 3), (100, 4.9663264951887856), (2, 2),
(-1, 1.7003401714778814)])
def test_zscore_cap(int_column, initial, expected):
assert op.ZScoreTrunc(1).apply(int_column, initial) == expected


@pytest.mark.parametrize("initial, expected", [(3, 3), (100, 3.5), (2, 2),
(-1, 1)])
def test_quantile_cap(int_column, initial, expected):
assert op.QuantileTrunc(top=0.5).apply(int_column, initial) == expected


@pytest.mark.parametrize("min, max, transform", [(0, 1, (3, 0.5)),
(-1, 1, (3, 0)),
(-100, 100, (5, 100))])
def test_min_max(int_column, min, max, transform):
scale = op.MinMax(min, max)
initial, expected = transform
actual = scale.apply(int_column, initial)
assert actual == expected


@pytest.mark.parametrize("initial, expected", [(3 + 1 / 3, 0),
(100, 59.19600211726014)])
def test_z_score(int_column, initial, expected):
assert op.ZScore.apply(int_column, initial) == expected


def test_sqrt(int_column):
assert op.Sqrt.apply(int_column, 4) == 2


@pytest.mark.parametrize("factor, transform", [(2, (2, 4)), (-1, (2, 0.5)),
(3, (3, 27)), (0, (100, 1)),
(0, (0, 1))])
def test_pow(int_column, factor, transform):
initial, expected = transform
assert op.Pow(factor).apply(int_column, initial) == expected


def test_median(int_column):
assert op.Median.apply(int_column) == 3.5


def test_mean(int_column):
assert op.Mean.apply(int_column) == 3 + 1 / 3


def test_zero(int_column):
assert op.Zero.apply(int_column) == 0
6 changes: 6 additions & 0 deletions client/tests/testdata/numeric_tester.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,value
0,-100
1,100
2,200
3,
4,400