Skip to content

Commit

Permalink
[HOPSWORKS-2084] Remove default_storage option from feature groups (#118
Browse files Browse the repository at this point in the history
)

Co-authored-by: Fabio Buso <fabio@logicalclocks.com>
  • Loading branch information
moritzmeister and SirOibaf committed Nov 3, 2020
1 parent c2877ec commit 7169a09
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 118 deletions.
27 changes: 17 additions & 10 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from hsfs import engine
from hsfs import feature_group as fg
from hsfs.core import feature_group_api, storage_connector_api, tags_api, hudi_engine
from hsfs.client import exceptions


class FeatureGroupEngine:
Expand All @@ -31,7 +32,7 @@ def __init__(self, feature_store_id):
)
self._tags_api = tags_api.TagsApi(feature_store_id, self.ENTITY_TYPE)

def save(self, feature_group, feature_dataframe, storage, write_options):
def save(self, feature_group, feature_dataframe, write_options):

if len(feature_group.features) == 0:
# User didn't provide a schema. extract it from the dataframe
Expand All @@ -54,7 +55,7 @@ def save(self, feature_group, feature_dataframe, storage, write_options):

table_name = self._get_table_name(feature_group)

if storage.lower() == "online" or storage.lower() == "all":
if feature_group.online_enabled:
# Add JDBC connection configuration in case of online feature group
online_conn = self._storage_connector_api.get_online_connector()

Expand All @@ -68,10 +69,9 @@ def save(self, feature_group, feature_dataframe, storage, write_options):
feature_group,
feature_dataframe,
self.APPEND,
hudi_engine.HudiEngine.HUDI_BULK_INSERT
if feature_group.time_travel_format == "HUDI"
else None,
storage,
hudi_engine.HudiEngine.HUDI_BULK_INSERT if feature_group.time_travel_format == "HUDI" else None,
feature_group.online_enabled,
None,
offline_write_options,
online_write_options,
)
Expand All @@ -88,7 +88,13 @@ def insert(
offline_write_options = write_options
online_write_options = write_options

if storage.lower() == "online" or storage.lower() == "all":
if not feature_group.online_enabled and storage == "online":
raise exceptions.FeatureStoreException(
"Online storage is not enabled for this feature group."
)
elif (
feature_group.online_enabled and storage != "offline"
) or storage == "online":
# Add JDBC connection configuration in case of online feature group
online_conn = self._storage_connector_api.get_online_connector()

Expand All @@ -97,7 +103,7 @@ def insert(

online_write_options = {**jdbc_options, **online_write_options}

if (storage.lower() == "offline" or storage.lower() == "all") and overwrite:
if overwrite:
self._feature_group_api.delete_content(feature_group)

engine.get_instance().save_dataframe(
Expand All @@ -106,6 +112,7 @@ def insert(
feature_dataframe,
self.APPEND,
operation,
feature_group.online_enabled,
storage,
offline_write_options,
online_write_options,
Expand Down Expand Up @@ -155,8 +162,8 @@ def get_tags(self, feature_group, name):
"""Get tag with a certain name or all tags for a feature group."""
return [tag.to_dict() for tag in self._tags_api.get(feature_group, name)]

def sql(self, query, feature_store_name, dataframe_type, storage):
if storage.lower() == "online":
def sql(self, query, feature_store_name, dataframe_type, online):
if online:
online_conn = self._storage_connector_api.get_online_connector()
else:
online_conn = None
Expand Down
17 changes: 6 additions & 11 deletions python/hsfs/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,10 @@ def __init__(
feature_store_id
)

def read(
self,
storage="offline",
dataframe_type="default",
read_options={},
):
def read(self, online=False, dataframe_type="default", read_options={}):
query = self._query_constructor_api.construct_query(self)

if storage.lower() == "online":
if online:
sql_query = query.query_online
online_conn = self._storage_connector_api.get_online_connector()
else:
Expand All @@ -69,10 +64,10 @@ def read(
sql_query, self._feature_store_name, online_conn, dataframe_type
)

def show(self, n, storage="offline"):
def show(self, n, online=False):
query = self._query_constructor_api.construct_query(self)

if storage.lower() == "online":
if online:
sql_query = query["queryOnline"]
online_conn = self._storage_connector_api.get_online_connector()
else:
Expand Down Expand Up @@ -112,9 +107,9 @@ def to_dict(self):
"joins": self._joins,
}

def to_string(self, storage="offline"):
def to_string(self, online=False):
return self._query_constructor_api.construct_query(self)[
"query" if storage == "offline" else "queryOnline"
"queryOnline" if online else "query"
]

def __str__(self):
Expand Down
7 changes: 7 additions & 0 deletions python/hsfs/core/statistics_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from hsfs import engine, statistics
from hsfs.core import statistics_api
from hsfs.client import exceptions


class StatisticsEngine:
Expand All @@ -29,6 +30,12 @@ def __init__(self, feature_store_id, entity_type):
def compute_statistics(self, metadata_instance, feature_dataframe):
"""Compute statistics for a dataframe and send the result json to Hopsworks."""
commit_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
if len(feature_dataframe.head(1)) == 0:
raise exceptions.FeatureStoreException(
"There is no data in the entity that you are trying to compute "
"statistics for. A possible cause might be that you inserted only data "
"to the online storage of a feature group."
)
content_str = engine.get_instance().profile(
feature_dataframe,
metadata_instance.statistics_config.columns,
Expand Down
4 changes: 2 additions & 2 deletions python/hsfs/core/training_dataset_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ def read(self, training_dataset, split, user_read_options):
path,
)

def query(self, training_dataset, storage):
def query(self, training_dataset, online):
return self._training_dataset_api.get_query(training_dataset)[
"queryOnline" if storage.lower() == "online" else "query"
"queryOnline" if online else "query"
]

def _write(self, training_dataset, dataset, write_options, save_mode):
Expand Down
1 change: 1 addition & 0 deletions python/hsfs/engine/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def save_dataframe(
feature_group,
dataframe,
save_mode,
online_enabled,
storage,
offline_write_options,
online_write_options,
Expand Down
13 changes: 9 additions & 4 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,12 @@ def save_dataframe(
dataframe,
save_mode,
operation,
online_enabled,
storage,
offline_write_options,
online_write_options,
):
if storage.lower() == "offline":
if storage == "offline" or not online_enabled:
self._save_offline_dataframe(
table_name,
feature_group,
Expand All @@ -157,22 +158,26 @@ def save_dataframe(
operation,
offline_write_options,
)
elif storage.lower() == "online":
elif storage == "online":
self._save_online_dataframe(
table_name, dataframe, save_mode, online_write_options
)
elif storage.lower() == "all":
elif online_enabled and storage is None:
self._save_offline_dataframe(
table_name,
feature_group,
dataframe,
save_mode,
operation,
offline_write_options,
)
self._save_online_dataframe(
table_name, dataframe, save_mode, online_write_options
)
else:
raise FeatureStoreException("Storage not supported")
raise FeatureStoreException(
"Error writing to offline and online feature store."
)

def _save_offline_dataframe(
self,
Expand Down

0 comments on commit 7169a09

Please sign in to comment.