Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions python/hsfs/core/external_feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,54 @@ def save(self, feature_group):

self._feature_group_api.save(feature_group)

def insert(
self,
feature_group,
feature_dataframe,
write_options: dict,
validation_options: dict = {},
):
if not feature_group.online_enabled:
raise FeatureStoreException(
"Online storage is not enabled for this feature group. External feature groups can only store data in"
+ " online storage. To create an offline only external feature group, use the `save` method."
)

schema = engine.get_instance().parse_schema_feature_group(feature_dataframe)

if not feature_group._id:
# only save metadata if feature group does not exist
feature_group.features = schema
self.save(feature_group)
else:
# else, just verify that feature group schema matches user-provided dataframe
self._verify_schema_compatibility(feature_group.features, schema)

# ge validation on python and non stream feature groups on spark
ge_report = feature_group._great_expectation_engine.validate(
feature_group=feature_group,
dataframe=feature_dataframe,
validation_options=validation_options,
ingestion_result="INGESTED",
ge_type=False,
)

if ge_report is not None and ge_report.ingestion_result == "REJECTED":
return None, ge_report

return (
engine.get_instance().save_dataframe(
feature_group=feature_group,
dataframe=feature_dataframe,
operation=None,
online_enabled=feature_group.online_enabled,
storage="online",
offline_write_options=write_options,
online_write_options=write_options,
),
ge_report,
)

def _update_features_metadata(self, feature_group, features):
# perform changes on copy in case the update fails, so we don't leave
# the user object in corrupted state
Expand Down
47 changes: 47 additions & 0 deletions python/hsfs/core/feature_group_base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

from hsfs.core import feature_group_api, storage_connector_api, tags_api, kafka_api
from hsfs.client.exceptions import FeatureStoreException


class FeatureGroupBaseEngine:
Expand Down Expand Up @@ -107,3 +108,49 @@ def new_feature_list(self, feature_group, updated_features):
):
new_features.append(feature)
return new_features + updated_features

def _verify_schema_compatibility(self, feature_group_features, dataframe_features):
err = []
feature_df_dict = {feat.name: feat.type for feat in dataframe_features}
for feature_fg in feature_group_features:
fg_type = feature_fg.type.lower().replace(" ", "")
# check if feature exists dataframe
if feature_fg.name in feature_df_dict:
df_type = feature_df_dict[feature_fg.name].lower().replace(" ", "")
# remove match from lookup table
del feature_df_dict[feature_fg.name]

# check if types match
if fg_type != df_type:
# don't check structs for exact match
if fg_type.startswith("struct") and df_type.startswith("struct"):
continue

err += [
f"{feature_fg.name} ("
f"expected type: '{fg_type}', "
f"derived from input: '{df_type}') has the wrong type."
]

else:
err += [
f"{feature_fg.name} (type: '{feature_fg.type}') is missing from "
f"input dataframe."
]

# any features that are left in lookup table are superfluous
for feature_df_name, feature_df_type in feature_df_dict.items():
err += [
f"{feature_df_name} (type: '{feature_df_type}') does not exist "
f"in feature group."
]

# raise exception if any errors were found.
if len(err) > 0:
raise FeatureStoreException(
"Features are not compatible with Feature Group schema: "
+ "".join(["\n - " + e for e in err])
)

def get_subject(self, feature_group):
return self._kafka_api.get_topic_subject(feature_group._online_topic_name)
47 changes: 0 additions & 47 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from hsfs import engine, client, util
from hsfs import feature_group as fg
from hsfs.client import exceptions
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import feature_group_base_engine, hudi_engine
from hsfs.core.deltastreamer_jobconf import DeltaStreamerJobConf

Expand Down Expand Up @@ -234,9 +233,6 @@ def update_description(self, feature_group, description):
feature_group, copy_feature_group, "updateMetadata"
)

def get_subject(self, feature_group):
return self._kafka_api.get_topic_subject(feature_group._online_topic_name)

def insert_stream(
self,
feature_group,
Expand Down Expand Up @@ -305,49 +301,6 @@ def insert_stream(

return streaming_query

def _verify_schema_compatibility(self, feature_group_features, dataframe_features):
err = []
feature_df_dict = {feat.name: feat.type for feat in dataframe_features}
for feature_fg in feature_group_features:
fg_type = feature_fg.type.lower().replace(" ", "")
# check if feature exists dataframe
if feature_fg.name in feature_df_dict:
df_type = feature_df_dict[feature_fg.name].lower().replace(" ", "")
# remove match from lookup table
del feature_df_dict[feature_fg.name]

# check if types match
if fg_type != df_type:
# don't check structs for exact match
if fg_type.startswith("struct") and df_type.startswith("struct"):
continue

err += [
f"{feature_fg.name} ("
f"expected type: '{fg_type}', "
f"derived from input: '{df_type}') has the wrong type."
]

else:
err += [
f"{feature_fg.name} (type: '{feature_fg.type}') is missing from "
f"input dataframe."
]

# any features that are left in lookup table are superfluous
for feature_df_name, feature_df_type in feature_df_dict.items():
err += [
f"{feature_df_name} (type: '{feature_df_type}') does not exist "
f"in feature group."
]

# raise exception if any errors were found.
if len(err) > 0:
raise FeatureStoreException(
"Features are not compatible with Feature Group schema: "
+ "".join(["\n - " + e for e in err])
)

def _save_feature_group_metadata(
self, feature_group, dataframe_features, write_options
):
Expand Down
15 changes: 11 additions & 4 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from sqlalchemy import sql

from hsfs import client, feature, util
from hsfs.feature_group import ExternalFeatureGroup
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import (
feature_group_api,
Expand Down Expand Up @@ -436,7 +437,10 @@ def save_dataframe(
online_write_options: dict,
validation_id: int = None,
):
if feature_group.stream:
if (
isinstance(feature_group, ExternalFeatureGroup)
and feature_group.online_enabled
) or feature_group.stream:
return self._write_dataframe_kafka(
feature_group, dataframe, offline_write_options
)
Expand Down Expand Up @@ -896,13 +900,16 @@ def acked(err, msg):
progress_bar.close()

# start backfilling job
if offline_write_options is not None and offline_write_options.get(
"start_offline_backfill", True
if (
not isinstance(feature_group, ExternalFeatureGroup)
and offline_write_options is not None
and offline_write_options.get("start_offline_backfill", True)
):
feature_group.backfill_job.run(
await_termination=offline_write_options.get("wait_for_job", True)
)

if isinstance(feature_group, ExternalFeatureGroup):
return None
return feature_group.backfill_job

def _kafka_produce(
Expand Down
6 changes: 5 additions & 1 deletion python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
pass

from hsfs import feature, training_dataset_feature, client, util
from hsfs.feature_group import ExternalFeatureGroup
from hsfs.storage_connector import StorageConnector
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import hudi_engine, transformation_function_engine, kafka_api
Expand Down Expand Up @@ -261,7 +262,10 @@ def save_dataframe(
validation_id=None,
):
try:
if feature_group.stream:
if (
isinstance(feature_group, ExternalFeatureGroup)
and feature_group.online_enabled
) or feature_group.stream:
self._save_online_dataframe(
feature_group, dataframe, online_write_options
)
Expand Down
Loading