From 3c8a3acf9d01b6692eb2c70984047f71d0d4bb1b Mon Sep 17 00:00:00 2001 From: Simba Date: Mon, 4 May 2020 14:22:45 -0700 Subject: [PATCH 1/4] Adds .pyc file in git --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 25a68ea..7229501 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ __pycache__/ venv/ .coverage client/build/ +*.pyc From 64d859a72974ab3fae31c2297d3fc28c23e05b41 Mon Sep 17 00:00:00 2001 From: Simba Date: Mon, 4 May 2020 14:34:11 -0700 Subject: [PATCH 2/4] Add base of local.FeatureStore The python local.FeatureStore can read a CSV and turn it into a table. The local table can performed indexed look ups via a primary key. The local.FeatureStore will grow into a version of StreamSQL that can be used locally, though it will likely have a subset of the core functionality since it doesn't make much sense to have Kafka and other streaming primitives locally. The local.Table is a thin wrapper over a pandas dataframe to allow us to use the pandasql package to mimic core feature store functionality locally. --- client/requirements.txt | 17 ++++++++++++ client/streamsql/__init__.py | 0 client/streamsql/errors.py | 8 ++++++ client/streamsql/local.py | 42 ++++++++++++++++++++++++++++++ client/tests/test_local_backend.py | 38 +++++++++++++++++++++++++++ client/tests/testdata/users.csv | 4 +++ 6 files changed, 109 insertions(+) create mode 100644 client/requirements.txt create mode 100644 client/streamsql/__init__.py create mode 100644 client/streamsql/errors.py create mode 100644 client/streamsql/local.py create mode 100644 client/tests/test_local_backend.py create mode 100644 client/tests/testdata/users.csv diff --git a/client/requirements.txt b/client/requirements.txt new file mode 100644 index 0000000..d6e9c5d --- /dev/null +++ b/client/requirements.txt @@ -0,0 +1,17 @@ +attrs==19.3.0 +coverage==5.1 +importlib-metadata==1.6.0 +more-itertools==8.2.0 +numpy==1.18.4 +packaging==20.3 +pandas==1.0.3 +pluggy==0.13.1 +py==1.8.1 +pyparsing==2.4.7 +pytest==5.4.1 +python-dateutil==2.8.1 +pytz==2020.1 +six==1.14.0 +wcwidth==0.1.9 +yapf==0.30.0 +zipp==3.1.0 diff --git a/client/streamsql/__init__.py b/client/streamsql/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/client/streamsql/errors.py b/client/streamsql/errors.py new file mode 100644 index 0000000..eefe394 --- /dev/null +++ b/client/streamsql/errors.py @@ -0,0 +1,8 @@ +class Error(Exception): + """Base class for public StreamSQL client errors.""" + + +class TableAlreadyExists(Error): + """Raised when the requested table name already exists.""" + def __init__(self, name): + self.message = "Table with name \"{0}\" already exists".format(name) diff --git a/client/streamsql/local.py b/client/streamsql/local.py new file mode 100644 index 0000000..d98c0b1 --- /dev/null +++ b/client/streamsql/local.py @@ -0,0 +1,42 @@ +from streamsql.errors import TableAlreadyExists +import pandas + + +class FeatureStore: + """Feature is a local in-memory feature store + + The local feature store provides a subset of the core functionality + of the backend one. It should only be used for models that are + trained and used locally. + """ + def __init__(self): + self._tables = dict() + + def create_table_from_csv(self, csv_file, table_name="", primary_key=""): + """Create a table from a local csv file""" + if table_name in self._tables: + raise TableAlreadyExists(table_name) + + table = Table(csv_file=csv_file, primary_key=primary_key) + self._tables[table_name] = table + return table + + +class Table: + """Table is an in-memory implementation of the StreamSQL table""" + def __init__(self, csv_file="", primary_key=""): + """Table can be instatiated from a CSV file.""" + self._dataframe = self._dataframe_from_csv(csv_file, primary_key) + + def lookup(self, key): + """Lookup returns an array from a table by its primary key""" + item_series = self._dataframe.loc[key] + return item_series.to_list() + + def _dataframe_from_csv(self, file_name, index_col): + dataframe = pandas.read_csv(file_name, index_col=index_col) + self._clean_dataframe(dataframe) + return dataframe + + def _clean_dataframe(self, dataframe): + dataframe.index = dataframe.index.astype(str, copy=False) diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py new file mode 100644 index 0000000..477c26e --- /dev/null +++ b/client/tests/test_local_backend.py @@ -0,0 +1,38 @@ +import pytest +import streamsql.local +import streamsql.errors +import os, sys + +test_dir = os.path.dirname(os.path.realpath(__file__)) +users_file = os.path.join(test_dir, 'testdata', 'users.csv') + + +@pytest.fixture +def feature_store(): + return streamsql.local.FeatureStore() + + +@pytest.fixture +def user_table(feature_store): + return create_user_table(feature_store) + + +def create_user_table(feature_store): + return feature_store.create_table_from_csv(users_file, + table_name="users", + primary_key="id") + + +def test_table_lookup(user_table): + assert user_table.lookup("1") == ["simba"] + + +def test_table_lookup_fail(user_table): + with pytest.raises(KeyError): + user_table.lookup("abc") + + +def test_table_exists(feature_store): + create_user_table(feature_store) + with pytest.raises(streamsql.errors.TableAlreadyExists): + create_user_table(feature_store) diff --git a/client/tests/testdata/users.csv b/client/tests/testdata/users.csv new file mode 100644 index 0000000..9a31522 --- /dev/null +++ b/client/tests/testdata/users.csv @@ -0,0 +1,4 @@ +id,name +1,simba +2,nora +3,chups From 87807cba3df9f9d9e93d450a8e0cfd6e77c937f3 Mon Sep 17 00:00:00 2001 From: Simba Date: Mon, 4 May 2020 20:12:53 -0700 Subject: [PATCH 3/4] Adds get_table and has_table to local.FeatureStore --- client/streamsql/local.py | 12 ++++++++++++ client/tests/test_local_backend.py | 31 ++++++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/client/streamsql/local.py b/client/streamsql/local.py index d98c0b1..2f0d44f 100644 --- a/client/streamsql/local.py +++ b/client/streamsql/local.py @@ -21,6 +21,12 @@ def create_table_from_csv(self, csv_file, table_name="", primary_key=""): self._tables[table_name] = table return table + def get_table(self, table_name): + return self._tables[table_name] + + def has_table(self, table_name): + return table_name in self._tables + class Table: """Table is an in-memory implementation of the StreamSQL table""" @@ -33,6 +39,12 @@ def lookup(self, key): item_series = self._dataframe.loc[key] return item_series.to_list() + def __eq__(self, other): + if isinstance(other, Table): + return self._dataframe.equals(other._dataframe) + else: + return NotImplemented + def _dataframe_from_csv(self, file_name, index_col): dataframe = pandas.read_csv(file_name, index_col=index_col) self._clean_dataframe(dataframe) diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py index 477c26e..28db50e 100644 --- a/client/tests/test_local_backend.py +++ b/client/tests/test_local_backend.py @@ -23,6 +23,33 @@ def create_user_table(feature_store): primary_key="id") +def test_table_already_exists(feature_store): + create_user_table(feature_store) + with pytest.raises(streamsql.errors.TableAlreadyExists): + create_user_table(feature_store) + + +def test_has_table(feature_store): + create_user_table(feature_store) + assert feature_store.has_table("users") + + +def test_not_has_table(feature_store): + create_user_table(feature_store) + assert not feature_store.has_table("users2") + + +def test_get_table(feature_store): + created_table = create_user_table(feature_store) + got_table = feature_store.get_table("users") + assert created_table == got_table + + +def test_get_table_fail(feature_store): + with pytest.raises(KeyError): + feature_store.get_table("users2") + + def test_table_lookup(user_table): assert user_table.lookup("1") == ["simba"] @@ -32,7 +59,3 @@ def test_table_lookup_fail(user_table): user_table.lookup("abc") -def test_table_exists(feature_store): - create_user_table(feature_store) - with pytest.raises(streamsql.errors.TableAlreadyExists): - create_user_table(feature_store) From 8578548e57b957e15c9e8ba12c648d7f210ce707 Mon Sep 17 00:00:00 2001 From: Simba Date: Mon, 4 May 2020 21:27:41 -0700 Subject: [PATCH 4/4] Add materialize_table to local.FeatureStore materialize_table allows new tables to be created by applying a SQL query on existing ones. --- client/streamsql/local.py | 43 ++++++++++++++++---- client/tests/test_local_backend.py | 62 ++++++++++++++++++++++++++++- client/tests/testdata/purchases.csv | 4 ++ 3 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 client/tests/testdata/purchases.csv diff --git a/client/streamsql/local.py b/client/streamsql/local.py index 2f0d44f..82209fa 100644 --- a/client/streamsql/local.py +++ b/client/streamsql/local.py @@ -1,5 +1,6 @@ from streamsql.errors import TableAlreadyExists import pandas +from pandasql import sqldf class FeatureStore: @@ -17,7 +18,7 @@ def create_table_from_csv(self, csv_file, table_name="", primary_key=""): if table_name in self._tables: raise TableAlreadyExists(table_name) - table = Table(csv_file=csv_file, primary_key=primary_key) + table = Table.from_csv(csv_file=csv_file, primary_key=primary_key) self._tables[table_name] = table return table @@ -27,12 +28,40 @@ def get_table(self, table_name): def has_table(self, table_name): return table_name in self._tables + def materialize_table( + self, + name="", + query="", + dependencies=[], + output_columns=[], + primary_key="", + ): + """Create a Table by applying a SQL query on other Tables""" + query_ctx = { + dep: self.get_table(dep)._dataframe + for dep in dependencies + } + dataframe = sqldf(query, query_ctx) + dataframe.columns = output_columns + dataframe.set_index(keys=primary_key, + inplace=True, + verify_integrity=True) + table = Table(dataframe) + self._tables[name] = table + return table + class Table: """Table is an in-memory implementation of the StreamSQL table""" - def __init__(self, csv_file="", primary_key=""): - """Table can be instatiated from a CSV file.""" - self._dataframe = self._dataframe_from_csv(csv_file, primary_key) + @classmethod + def from_csv(cls, csv_file="", primary_key=""): + """Create a Table from a CSV file.""" + dataframe = Table._dataframe_from_csv(csv_file, primary_key) + return cls(dataframe) + + def __init__(self, dataframe): + """Create a Table from a pandas.DataFrame""" + self._dataframe = dataframe def lookup(self, key): """Lookup returns an array from a table by its primary key""" @@ -45,10 +74,10 @@ def __eq__(self, other): else: return NotImplemented - def _dataframe_from_csv(self, file_name, index_col): + def _dataframe_from_csv(file_name, index_col): dataframe = pandas.read_csv(file_name, index_col=index_col) - self._clean_dataframe(dataframe) + Table._clean_dataframe(dataframe) return dataframe - def _clean_dataframe(self, dataframe): + def _clean_dataframe(dataframe): dataframe.index = dataframe.index.astype(str, copy=False) diff --git a/client/tests/test_local_backend.py b/client/tests/test_local_backend.py index 28db50e..babb6c1 100644 --- a/client/tests/test_local_backend.py +++ b/client/tests/test_local_backend.py @@ -2,9 +2,12 @@ import streamsql.local import streamsql.errors import os, sys +import pandas as pd test_dir = os.path.dirname(os.path.realpath(__file__)) -users_file = os.path.join(test_dir, 'testdata', 'users.csv') +testdata_dir = os.path.join(test_dir, 'testdata') +users_file = os.path.join(testdata_dir, 'users.csv') +purchases_file = os.path.join(testdata_dir, 'purchases.csv') @pytest.fixture @@ -17,12 +20,23 @@ def user_table(feature_store): return create_user_table(feature_store) +@pytest.fixture +def purchase_table(feature_store): + return create_purchase_table(feature_store) + + def create_user_table(feature_store): return feature_store.create_table_from_csv(users_file, table_name="users", primary_key="id") +def create_purchase_table(feature_store): + return feature_store.create_table_from_csv(purchases_file, + table_name="purchases", + primary_key="id") + + def test_table_already_exists(feature_store): create_user_table(feature_store) with pytest.raises(streamsql.errors.TableAlreadyExists): @@ -59,3 +73,49 @@ def test_table_lookup_fail(user_table): user_table.lookup("abc") +def test_table_simple_sql(feature_store): + create_user_table(feature_store) + nora_table = feature_store.materialize_table( + name="nora", + query="SELECT id, name FROM users WHERE name == 'nora'", + dependencies=["users"], + output_columns=["new_id", "new_name"], + primary_key=["new_id"], + ) + df = pd.DataFrame(["nora"], index=["2"], columns=["new_name"]) + df.index.name = "new_id" + expected = streamsql.local.Table(df) + assert nora_table == expected + + +def test_table_join_sql(feature_store): + create_user_table(feature_store) + create_purchase_table(feature_store) + dollars_spent_table = feature_store.materialize_table( + name="dollars_spent", + query="""SELECT user.id, SUM(price) FROM purchases purchase + INNER JOIN users user ON purchase.user=user.id GROUP BY user.id + ORDER BY user.id ASC + """, + dependencies=["users", "purchases"], + output_columns=["user", "spent"], + primary_key=["user"], + ) + df = pd.DataFrame([1000, 10], index=["1", "3"], columns=["spent"]) + df.index.name = "user" + expected = streamsql.local.Table(df) + assert dollars_spent_table == expected + + +def test_materialized_table_is_stored(feature_store): + create_user_table(feature_store) + created = feature_store.materialize_table( + name="user_cpy", + query="SELECT id, name FROM users", + dependencies=["users"], + output_columns=["id", "name"], + primary_key=["id"], + ) + assert feature_store.has_table("user_cpy") + got = feature_store.get_table("user_cpy") + assert created == got diff --git a/client/tests/testdata/purchases.csv b/client/tests/testdata/purchases.csv new file mode 100644 index 0000000..3a6452e --- /dev/null +++ b/client/tests/testdata/purchases.csv @@ -0,0 +1,4 @@ +id,user,item,price +0,1,laptop,1000 +1,3,peanut butter,5 +2,3,peanut butter,5