Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ __pycache__/
venv/
.coverage
client/build/
*.pyc
17 changes: 17 additions & 0 deletions client/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
attrs==19.3.0
coverage==5.1
importlib-metadata==1.6.0
more-itertools==8.2.0
numpy==1.18.4
packaging==20.3
pandas==1.0.3
pluggy==0.13.1
py==1.8.1
pyparsing==2.4.7
pytest==5.4.1
python-dateutil==2.8.1
pytz==2020.1
six==1.14.0
wcwidth==0.1.9
yapf==0.30.0
zipp==3.1.0
Empty file added client/streamsql/__init__.py
Empty file.
8 changes: 8 additions & 0 deletions client/streamsql/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class Error(Exception):
"""Base class for public StreamSQL client errors."""


class TableAlreadyExists(Error):
"""Raised when the requested table name already exists."""
def __init__(self, name):
self.message = "Table with name \"{0}\" already exists".format(name)
83 changes: 83 additions & 0 deletions client/streamsql/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from streamsql.errors import TableAlreadyExists
import pandas
from pandasql import sqldf


class FeatureStore:
"""Feature is a local in-memory feature store

The local feature store provides a subset of the core functionality
of the backend one. It should only be used for models that are
trained and used locally.
"""
def __init__(self):
self._tables = dict()

def create_table_from_csv(self, csv_file, table_name="", primary_key=""):
"""Create a table from a local csv file"""
if table_name in self._tables:
raise TableAlreadyExists(table_name)

table = Table.from_csv(csv_file=csv_file, primary_key=primary_key)
self._tables[table_name] = table
return table

def get_table(self, table_name):
return self._tables[table_name]

def has_table(self, table_name):
return table_name in self._tables

def materialize_table(
self,
name="",
query="",
dependencies=[],
output_columns=[],
primary_key="",
):
"""Create a Table by applying a SQL query on other Tables"""
query_ctx = {
dep: self.get_table(dep)._dataframe
for dep in dependencies
}
dataframe = sqldf(query, query_ctx)
dataframe.columns = output_columns
dataframe.set_index(keys=primary_key,
inplace=True,
verify_integrity=True)
table = Table(dataframe)
self._tables[name] = table
return table


class Table:
"""Table is an in-memory implementation of the StreamSQL table"""
@classmethod
def from_csv(cls, csv_file="", primary_key=""):
"""Create a Table from a CSV file."""
dataframe = Table._dataframe_from_csv(csv_file, primary_key)
return cls(dataframe)

def __init__(self, dataframe):
"""Create a Table from a pandas.DataFrame"""
self._dataframe = dataframe

def lookup(self, key):
"""Lookup returns an array from a table by its primary key"""
item_series = self._dataframe.loc[key]
return item_series.to_list()

def __eq__(self, other):
if isinstance(other, Table):
return self._dataframe.equals(other._dataframe)
else:
return NotImplemented

def _dataframe_from_csv(file_name, index_col):
dataframe = pandas.read_csv(file_name, index_col=index_col)
Table._clean_dataframe(dataframe)
return dataframe

def _clean_dataframe(dataframe):
dataframe.index = dataframe.index.astype(str, copy=False)
121 changes: 121 additions & 0 deletions client/tests/test_local_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import pytest
import streamsql.local
import streamsql.errors
import os, sys
import pandas as pd

test_dir = os.path.dirname(os.path.realpath(__file__))
testdata_dir = os.path.join(test_dir, 'testdata')
users_file = os.path.join(testdata_dir, 'users.csv')
purchases_file = os.path.join(testdata_dir, 'purchases.csv')


@pytest.fixture
def feature_store():
return streamsql.local.FeatureStore()


@pytest.fixture
def user_table(feature_store):
return create_user_table(feature_store)


@pytest.fixture
def purchase_table(feature_store):
return create_purchase_table(feature_store)


def create_user_table(feature_store):
return feature_store.create_table_from_csv(users_file,
table_name="users",
primary_key="id")


def create_purchase_table(feature_store):
return feature_store.create_table_from_csv(purchases_file,
table_name="purchases",
primary_key="id")


def test_table_already_exists(feature_store):
create_user_table(feature_store)
with pytest.raises(streamsql.errors.TableAlreadyExists):
create_user_table(feature_store)


def test_has_table(feature_store):
create_user_table(feature_store)
assert feature_store.has_table("users")


def test_not_has_table(feature_store):
create_user_table(feature_store)
assert not feature_store.has_table("users2")


def test_get_table(feature_store):
created_table = create_user_table(feature_store)
got_table = feature_store.get_table("users")
assert created_table == got_table


def test_get_table_fail(feature_store):
with pytest.raises(KeyError):
feature_store.get_table("users2")


def test_table_lookup(user_table):
assert user_table.lookup("1") == ["simba"]


def test_table_lookup_fail(user_table):
with pytest.raises(KeyError):
user_table.lookup("abc")


def test_table_simple_sql(feature_store):
create_user_table(feature_store)
nora_table = feature_store.materialize_table(
name="nora",
query="SELECT id, name FROM users WHERE name == 'nora'",
dependencies=["users"],
output_columns=["new_id", "new_name"],
primary_key=["new_id"],
)
df = pd.DataFrame(["nora"], index=["2"], columns=["new_name"])
df.index.name = "new_id"
expected = streamsql.local.Table(df)
assert nora_table == expected


def test_table_join_sql(feature_store):
create_user_table(feature_store)
create_purchase_table(feature_store)
dollars_spent_table = feature_store.materialize_table(
name="dollars_spent",
query="""SELECT user.id, SUM(price) FROM purchases purchase
INNER JOIN users user ON purchase.user=user.id GROUP BY user.id
ORDER BY user.id ASC
""",
dependencies=["users", "purchases"],
output_columns=["user", "spent"],
primary_key=["user"],
)
df = pd.DataFrame([1000, 10], index=["1", "3"], columns=["spent"])
df.index.name = "user"
expected = streamsql.local.Table(df)
assert dollars_spent_table == expected


def test_materialized_table_is_stored(feature_store):
create_user_table(feature_store)
created = feature_store.materialize_table(
name="user_cpy",
query="SELECT id, name FROM users",
dependencies=["users"],
output_columns=["id", "name"],
primary_key=["id"],
)
assert feature_store.has_table("user_cpy")
got = feature_store.get_table("user_cpy")
assert created == got
4 changes: 4 additions & 0 deletions client/tests/testdata/purchases.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,user,item,price
0,1,laptop,1000
1,3,peanut butter,5
2,3,peanut butter,5
4 changes: 4 additions & 0 deletions client/tests/testdata/users.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,name
1,simba
2,nora
3,chups