diff --git a/.gitignore b/.gitignore index 25a68ea..7229501 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ __pycache__/ venv/ .coverage client/build/ +*.pyc diff --git a/client/requirements.txt b/client/requirements.txt new file mode 100644 index 0000000..d6e9c5d --- /dev/null +++ b/client/requirements.txt @@ -0,0 +1,17 @@ +attrs==19.3.0 +coverage==5.1 +importlib-metadata==1.6.0 +more-itertools==8.2.0 +numpy==1.18.4 +packaging==20.3 +pandas==1.0.3 +pluggy==0.13.1 +py==1.8.1 +pyparsing==2.4.7 +pytest==5.4.1 +python-dateutil==2.8.1 +pytz==2020.1 +six==1.14.0 +wcwidth==0.1.9 +yapf==0.30.0 +zipp==3.1.0 diff --git a/client/streamsql/__init__.py b/client/streamsql/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/client/streamsql/errors.py b/client/streamsql/errors.py new file mode 100644 index 0000000..eefe394 --- /dev/null +++ b/client/streamsql/errors.py @@ -0,0 +1,8 @@ +class Error(Exception): + """Base class for public StreamSQL client errors.""" + + +class TableAlreadyExists(Error): + """Raised when the requested table name already exists.""" + def __init__(self, name): + self.message = "Table with name \"{0}\" already exists".format(name) diff --git a/client/streamsql/local.py b/client/streamsql/local.py new file mode 100644 index 0000000..82209fa --- /dev/null +++ b/client/streamsql/local.py @@ -0,0 +1,83 @@ +from streamsql.errors import TableAlreadyExists +import pandas +from pandasql import sqldf + + +class FeatureStore: + """Feature is a local in-memory feature store + + The local feature store provides a subset of the core functionality + of the backend one. It should only be used for models that are + trained and used locally. + """ + def __init__(self): + self._tables = dict() + + def create_table_from_csv(self, csv_file, table_name="", primary_key=""): + """Create a table from a local csv file""" + if table_name in self._tables: + raise TableAlreadyExists(table_name) + + table = Table.from_csv(csv_file=csv_file, primary_key=primary_key) + self._tables[table_name] = table + return table + + def get_table(self, table_name): + return self._tables[table_name] + + def has_table(self, table_name): + return table_name in self._tables + + def materialize_table( + self, + name="", + query="", + dependencies=[], + output_columns=[], + primary_key="", + ): + """Create a Table by applying a SQL query on other Tables""" + query_ctx = { + dep: self.get_table(dep)._dataframe + for dep in dependencies + } + dataframe = sqldf(query, query_ctx) + dataframe.columns = output_columns + dataframe.set_index(keys=primary_key, + inplace=True, + verify_integrity=True) + table = Table(dataframe) + self._tables[name] = table + return table + + +class Table: + """Table is an in-memory implementation of the StreamSQL table""" + @classmethod + def from_csv(cls, csv_file="", primary_key=""): + """Create a Table from a CSV file.""" + dataframe = Table._dataframe_from_csv(csv_file, primary_key) + return cls(dataframe) + + def __init__(self, dataframe): + """Create a Table from a pandas.DataFrame""" + self._dataframe = dataframe + + def lookup(self, key): + """Lookup returns an array from a table by its primary key""" + item_series = self._dataframe.loc[key] + return item_series.to_list() + + def __eq__(self, other): + if isinstance(other, Table): + return self._dataframe.equals(other._dataframe) + else: + return NotImplemented + + def _dataframe_from_csv(file_name, index_col): + dataframe = pandas.read_csv(file_name, index_col=index_col) + Table._clean_dataframe(dataframe) + return dataframe + + def _clean_dataframe(dataframe): + dataframe.index = dataframe.index.astype(str, copy=False) diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py new file mode 100644 index 0000000..babb6c1 --- /dev/null +++ b/client/tests/test_local_backend.py @@ -0,0 +1,121 @@ +import pytest +import streamsql.local +import streamsql.errors +import os, sys +import pandas as pd + +test_dir = os.path.dirname(os.path.realpath(__file__)) +testdata_dir = os.path.join(test_dir, 'testdata') +users_file = os.path.join(testdata_dir, 'users.csv') +purchases_file = os.path.join(testdata_dir, 'purchases.csv') + + +@pytest.fixture +def feature_store(): + return streamsql.local.FeatureStore() + + +@pytest.fixture +def user_table(feature_store): + return create_user_table(feature_store) + + +@pytest.fixture +def purchase_table(feature_store): + return create_purchase_table(feature_store) + + +def create_user_table(feature_store): + return feature_store.create_table_from_csv(users_file, + table_name="users", + primary_key="id") + + +def create_purchase_table(feature_store): + return feature_store.create_table_from_csv(purchases_file, + table_name="purchases", + primary_key="id") + + +def test_table_already_exists(feature_store): + create_user_table(feature_store) + with pytest.raises(streamsql.errors.TableAlreadyExists): + create_user_table(feature_store) + + +def test_has_table(feature_store): + create_user_table(feature_store) + assert feature_store.has_table("users") + + +def test_not_has_table(feature_store): + create_user_table(feature_store) + assert not feature_store.has_table("users2") + + +def test_get_table(feature_store): + created_table = create_user_table(feature_store) + got_table = feature_store.get_table("users") + assert created_table == got_table + + +def test_get_table_fail(feature_store): + with pytest.raises(KeyError): + feature_store.get_table("users2") + + +def test_table_lookup(user_table): + assert user_table.lookup("1") == ["simba"] + + +def test_table_lookup_fail(user_table): + with pytest.raises(KeyError): + user_table.lookup("abc") + + +def test_table_simple_sql(feature_store): + create_user_table(feature_store) + nora_table = feature_store.materialize_table( + name="nora", + query="SELECT id, name FROM users WHERE name == 'nora'", + dependencies=["users"], + output_columns=["new_id", "new_name"], + primary_key=["new_id"], + ) + df = pd.DataFrame(["nora"], index=["2"], columns=["new_name"]) + df.index.name = "new_id" + expected = streamsql.local.Table(df) + assert nora_table == expected + + +def test_table_join_sql(feature_store): + create_user_table(feature_store) + create_purchase_table(feature_store) + dollars_spent_table = feature_store.materialize_table( + name="dollars_spent", + query="""SELECT user.id, SUM(price) FROM purchases purchase + INNER JOIN users user ON purchase.user=user.id GROUP BY user.id + ORDER BY user.id ASC + """, + dependencies=["users", "purchases"], + output_columns=["user", "spent"], + primary_key=["user"], + ) + df = pd.DataFrame([1000, 10], index=["1", "3"], columns=["spent"]) + df.index.name = "user" + expected = streamsql.local.Table(df) + assert dollars_spent_table == expected + + +def test_materialized_table_is_stored(feature_store): + create_user_table(feature_store) + created = feature_store.materialize_table( + name="user_cpy", + query="SELECT id, name FROM users", + dependencies=["users"], + output_columns=["id", "name"], + primary_key=["id"], + ) + assert feature_store.has_table("user_cpy") + got = feature_store.get_table("user_cpy") + assert created == got diff --git a/client/tests/testdata/purchases.csv b/client/tests/testdata/purchases.csv new file mode 100644 index 0000000..3a6452e --- /dev/null +++ b/client/tests/testdata/purchases.csv @@ -0,0 +1,4 @@ +id,user,item,price +0,1,laptop,1000 +1,3,peanut butter,5 +2,3,peanut butter,5 diff --git a/client/tests/testdata/users.csv b/client/tests/testdata/users.csv new file mode 100644 index 0000000..9a31522 --- /dev/null +++ b/client/tests/testdata/users.csv @@ -0,0 +1,4 @@ +id,name +1,simba +2,nora +3,chups