From 46d2ff688d1e003d16b3b5bad7188c252bbc99f6 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 14 Mar 2023 15:03:00 +0100 Subject: [PATCH 01/23] add online_enabled to external feature group --- python/hsfs/feature_group.py | 12 ++++++++++++ python/hsfs/feature_store.py | 27 +++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 8d5f9bcf32..34ee4cee0b 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2481,6 +2481,7 @@ def __init__( statistics_config=None, event_time=None, expectation_suite=None, + online_enabled=False, href=None, ): super().__init__(featurestore_id, location, event_time=event_time) @@ -2495,6 +2496,7 @@ def __init__( self._path = path self._id = id self._expectation_suite = expectation_suite + self._online_enabled = online_enabled self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat @@ -2680,6 +2682,7 @@ def to_dict(self): "statisticsConfig": self._statistics_config, "eventTime": self._event_time, "expectationSuite": self._expectation_suite, + "onlineEnabled": self._online_enabled, } @property @@ -2730,6 +2733,15 @@ def creator(self): def created(self): return self._created + @property + def online_enabled(self): + """Setting if the feature group is available in online storage.""" + return self._online_enabled + + @online_enabled.setter + def online_enabled(self, online_enabled): + self._online_enabled = online_enabled + @version.setter def version(self, version): self._version = version diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index cf96e74007..fbda5b37e0 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -784,6 +784,7 @@ def create_external_feature_group( expectation_suite: Optional[ Union[expectation_suite.ExpectationSuite, ge.core.ExpectationSuite] ] = None, + online_enabled: Optional[bool] = False, ): """Create a external feature group metadata object. @@ -808,6 +809,29 @@ def create_external_feature_group( feature store on its own. To persist the feature group metadata in the feature store, call the `save()` method. + You can enable online storage for external feature groups, however, the sync from the + external storage to Hopsworks online storage needs to be done manually: + + ```python + external_fg = fs.create_external_feature_group( + name="sales", + version=1, + description="Physical shop sales features", + query=query, + storage_connector=connector, + primary_key=['ss_store_sk'], + event_time='sale_date', + online_enabled=True + ) + external_fg.save() + + # read from external storage and filter data to sync to online + df = external_fg.read().filter(external_fg.customer_status == "active") + + # insert to online storage + external_fg.insert(df) + ``` + # Arguments name: Name of the external feature group to create. storage_connector: the storage connector to use to establish connectivity @@ -854,6 +878,8 @@ def create_external_feature_group( expectation_suite: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults to `None`. + online_enabled: Define whether it should be possible to sync the feature group to + the online feature store for low latency access, defaults to `False`. # Returns `ExternalFeatureGroup`. The external feature group metadata object. @@ -874,6 +900,7 @@ def create_external_feature_group( statistics_config=statistics_config, event_time=event_time, expectation_suite=expectation_suite, + online_enabled=online_enabled, ) def create_training_dataset( From 92576cc3855f9227f07daaac81aecdcd4822f9b6 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 28 Mar 2023 16:18:31 +0200 Subject: [PATCH 02/23] move online_enabled to base fg --- python/hsfs/feature_group.py | 48 +++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 34ee4cee0b..08ac92693f 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -57,8 +57,11 @@ class FeatureGroupBase: - def __init__(self, featurestore_id, location, event_time=None): + def __init__( + self, featurestore_id, location, event_time=None, online_enabled=False + ): self.event_time = event_time + self._online_enabled = online_enabled self._location = location self._statistics_engine = statistics_engine.StatisticsEngine( featurestore_id, self.ENTITY_TYPE @@ -1175,6 +1178,15 @@ def expectation_suite( ) ) + @property + def online_enabled(self): + """Setting if the feature group is available in online storage.""" + return self._online_enabled + + @online_enabled.setter + def online_enabled(self, online_enabled): + self._online_enabled = online_enabled + class FeatureGroup(FeatureGroupBase): CACHED_FEATURE_GROUP = "CACHED_FEATURE_GROUP" @@ -1206,7 +1218,12 @@ def __init__( parents=None, href=None, ): - super().__init__(featurestore_id, location, event_time=event_time) + super().__init__( + featurestore_id, + location, + event_time=event_time, + online_enabled=online_enabled, + ) self._feature_store_name = featurestore_name self._description = description @@ -1220,7 +1237,6 @@ def __init__( for feat in (features or []) ] - self._online_enabled = online_enabled self._time_travel_format = ( time_travel_format.upper() if time_travel_format is not None else None ) @@ -2345,11 +2361,6 @@ def features(self): """Schema information.""" return self._features - @property - def online_enabled(self): - """Setting if the feature group is available in online storage.""" - return self._online_enabled - @property def time_travel_format(self): """Setting of the feature group time travel format.""" @@ -2443,10 +2454,6 @@ def partition_key(self, new_partition_key): def hudi_precombine_key(self, hudi_precombine_key): self._hudi_precombine_key = hudi_precombine_key.lower() - @online_enabled.setter - def online_enabled(self, new_online_enabled): - self._online_enabled = new_online_enabled - @stream.setter def stream(self, stream): self._stream = stream @@ -2484,7 +2491,12 @@ def __init__( online_enabled=False, href=None, ): - super().__init__(featurestore_id, location, event_time=event_time) + super().__init__( + featurestore_id, + location, + event_time=event_time, + online_enabled=online_enabled, + ) self._feature_store_name = featurestore_name self._description = description self._created = created @@ -2496,7 +2508,6 @@ def __init__( self._path = path self._id = id self._expectation_suite = expectation_suite - self._online_enabled = online_enabled self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat @@ -2733,15 +2744,6 @@ def creator(self): def created(self): return self._created - @property - def online_enabled(self): - """Setting if the feature group is available in online storage.""" - return self._online_enabled - - @online_enabled.setter - def online_enabled(self, online_enabled): - self._online_enabled = online_enabled - @version.setter def version(self, version): self._version = version From 6eafe9939469b0ec911c8089801b693e4f945c7d Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Wed, 29 Mar 2023 16:32:56 +0200 Subject: [PATCH 03/23] new attempt --- .../core/external_feature_group_engine.py | 48 +++++++++++++++++++ python/hsfs/core/feature_group_base_engine.py | 44 +++++++++++++++++ python/hsfs/core/feature_group_engine.py | 44 ----------------- python/hsfs/engine/python.py | 6 ++- python/hsfs/feature_group.py | 41 ++++++++++++++++ 5 files changed, 138 insertions(+), 45 deletions(-) diff --git a/python/hsfs/core/external_feature_group_engine.py b/python/hsfs/core/external_feature_group_engine.py index 6df5a959a6..389033be35 100644 --- a/python/hsfs/core/external_feature_group_engine.py +++ b/python/hsfs/core/external_feature_group_engine.py @@ -47,6 +47,54 @@ def save(self, feature_group): self._feature_group_api.save(feature_group) + def insert( + self, + feature_group, + feature_dataframe, + write_options: dict, + validation_options: dict = {}, + ): + if not feature_group.online_enabled: + raise FeatureStoreException( + "Online storage is not enabled for this feature group. External feature groups can only store data in" + + " online storage. To create an offline only external feature group, use the `save` method." + ) + + schema = engine.get_instance().parse_schema_feature_group(feature_dataframe) + + if not feature_group._id: + # only save metadata if feature group does not exist + feature_group.features = schema + self.save(feature_group) + else: + # else, just verify that feature group schema matches user-provided dataframe + self._verify_schema_compatibility(feature_group.features, schema) + + # ge validation on python and non stream feature groups on spark + ge_report = feature_group._great_expectation_engine.validate( + feature_group=feature_group, + dataframe=feature_dataframe, + validation_options=validation_options, + ingestion_result="INGESTED", + ge_type=False, + ) + + if ge_report is not None and ge_report.ingestion_result == "REJECTED": + return None, ge_report + + return ( + engine.get_instance().save_dataframe( + feature_group=feature_group, + dataframe=feature_dataframe, + operation=None, + online_enabled=feature_group.online_enabled, + storage="online", + offline_write_options=write_options, + online_write_options=write_options, + ), + ge_report, + ) + def _update_features_metadata(self, feature_group, features): # perform changes on copy in case the update fails, so we don't leave # the user object in corrupted state diff --git a/python/hsfs/core/feature_group_base_engine.py b/python/hsfs/core/feature_group_base_engine.py index 4fe38a56e9..93ec1145d3 100644 --- a/python/hsfs/core/feature_group_base_engine.py +++ b/python/hsfs/core/feature_group_base_engine.py @@ -15,6 +15,7 @@ # from hsfs.core import feature_group_api, storage_connector_api, tags_api, kafka_api +from hsfs.client.exceptions import FeatureStoreException class FeatureGroupBaseEngine: @@ -107,3 +108,46 @@ def new_feature_list(self, feature_group, updated_features): ): new_features.append(feature) return new_features + updated_features + + def _verify_schema_compatibility(self, feature_group_features, dataframe_features): + err = [] + feature_df_dict = {feat.name: feat.type for feat in dataframe_features} + for feature_fg in feature_group_features: + fg_type = feature_fg.type.lower().replace(" ", "") + # check if feature exists dataframe + if feature_fg.name in feature_df_dict: + df_type = feature_df_dict[feature_fg.name].lower().replace(" ", "") + # remove match from lookup table + del feature_df_dict[feature_fg.name] + + # check if types match + if fg_type != df_type: + # don't check structs for exact match + if fg_type.startswith("struct") and df_type.startswith("struct"): + continue + + err += [ + f"{feature_fg.name} (" + f"expected type: '{fg_type}', " + f"derived from input: '{df_type}') has the wrong type." + ] + + else: + err += [ + f"{feature_fg.name} (type: '{feature_fg.type}') is missing from " + f"input dataframe." + ] + + # any features that are left in lookup table are superfluous + for feature_df_name, feature_df_type in feature_df_dict.items(): + err += [ + f"{feature_df_name} (type: '{feature_df_type}') does not exist " + f"in feature group." + ] + + # raise exception if any errors were found. + if len(err) > 0: + raise FeatureStoreException( + "Features are not compatible with Feature Group schema: " + + "".join(["\n - " + e for e in err]) + ) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 6021933507..cd1b21265b 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -17,7 +17,6 @@ from hsfs import engine, client, util from hsfs import feature_group as fg from hsfs.client import exceptions -from hsfs.client.exceptions import FeatureStoreException from hsfs.core import feature_group_base_engine, hudi_engine from hsfs.core.deltastreamer_jobconf import DeltaStreamerJobConf @@ -305,49 +304,6 @@ def insert_stream( return streaming_query - def _verify_schema_compatibility(self, feature_group_features, dataframe_features): - err = [] - feature_df_dict = {feat.name: feat.type for feat in dataframe_features} - for feature_fg in feature_group_features: - fg_type = feature_fg.type.lower().replace(" ", "") - # check if feature exists dataframe - if feature_fg.name in feature_df_dict: - df_type = feature_df_dict[feature_fg.name].lower().replace(" ", "") - # remove match from lookup table - del feature_df_dict[feature_fg.name] - - # check if types match - if fg_type != df_type: - # don't check structs for exact match - if fg_type.startswith("struct") and df_type.startswith("struct"): - continue - - err += [ - f"{feature_fg.name} (" - f"expected type: '{fg_type}', " - f"derived from input: '{df_type}') has the wrong type." - ] - - else: - err += [ - f"{feature_fg.name} (type: '{feature_fg.type}') is missing from " - f"input dataframe." - ] - - # any features that are left in lookup table are superfluous - for feature_df_name, feature_df_type in feature_df_dict.items(): - err += [ - f"{feature_df_name} (type: '{feature_df_type}') does not exist " - f"in feature group." - ] - - # raise exception if any errors were found. - if len(err) > 0: - raise FeatureStoreException( - "Features are not compatible with Feature Group schema: " - + "".join(["\n - " + e for e in err]) - ) - def _save_feature_group_metadata( self, feature_group, dataframe_features, write_options ): diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 6c25c7a7f1..7d15dc9436 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -44,6 +44,7 @@ from sqlalchemy import sql from hsfs import client, feature, util +from hsfs.feature_group import ExternalFeatureGroup from hsfs.client.exceptions import FeatureStoreException from hsfs.core import ( feature_group_api, @@ -436,7 +437,10 @@ def save_dataframe( online_write_options: dict, validation_id: int = None, ): - if feature_group.stream: + if feature_group.stream or ( + isinstance(feature_group, ExternalFeatureGroup) + and feature_group.online_enabled + ): return self._write_dataframe_kafka( feature_group, dataframe, offline_write_options ) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 08ac92693f..3ef4c197d1 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2586,6 +2586,47 @@ def save(self): if self.statistics_config.enabled: self._statistics_engine.compute_statistics(self) + def insert( + self, + features: Union[ + pd.DataFrame, + TypeVar("pyspark.sql.DataFrame"), # noqa: F821 + TypeVar("pyspark.RDD"), # noqa: F821 + np.ndarray, + List[list], + ], + write_options: Optional[Dict[str, Any]] = {}, + validation_options: Optional[Dict[str, Any]] = {}, + save_code: Optional[bool] = True, + ) -> Tuple[Optional[Job], Optional[ValidationReport]]: + feature_dataframe = engine.get_instance().convert_to_default_dataframe(features) + + job, ge_report = self._feature_group_engine.insert( + self, + feature_dataframe=feature_dataframe, + write_options=write_options, + validation_options=validation_options, + ) + + if save_code and ( + ge_report is None or ge_report.ingestion_result == "INGESTED" + ): + self._code_engine.save_code(self) + + if self.statistics_config.enabled: + warnings.warn( + ( + "Statistics are not computed for insertion to online enabled external feature group `{}`, with version" + " `{}`. Call `compute_statistics` explicitly to compute statistics over the data in the external storage system." + ).format(self._name, self._version), + util.StorageWarning, + ) + + return ( + job, + ge_report.to_ge_type() if ge_report is not None else None, + ) + def read(self, dataframe_type="default"): """Get the feature group as a DataFrame. From 578876c411db59726bbb3fda6d039c86b07ca569 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 10:57:10 +0200 Subject: [PATCH 04/23] move expectation suite engine to base class --- python/hsfs/feature_group.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 3ef4c197d1..72a6be0548 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -58,11 +58,17 @@ class FeatureGroupBase: def __init__( - self, featurestore_id, location, event_time=None, online_enabled=False + self, + featurestore_id, + location, + event_time=None, + online_enabled=False, + id=None, ): self.event_time = event_time self._online_enabled = online_enabled self._location = location + self._id = id self._statistics_engine = statistics_engine.StatisticsEngine( featurestore_id, self.ENTITY_TYPE ) @@ -70,6 +76,22 @@ def __init__( self._great_expectation_engine = ( great_expectation_engine.GreatExpectationEngine(featurestore_id) ) + if self._id is not None: + self._expectation_suite_engine = ( + expectation_suite_engine.ExpectationSuiteEngine( + feature_store_id=self._feature_store_id, feature_group_id=self._id + ) + ) + self._validation_report_engine = ( + validation_report_engine.ValidationReportEngine( + self._feature_store_id, self._id + ) + ) + self._validation_result_engine = ( + validation_result_engine.ValidationResultEngine( + self._feature_store_id, self._id + ) + ) self._feature_store_id = featurestore_id self._variable_api = VariableApi() @@ -2496,6 +2518,7 @@ def __init__( location, event_time=event_time, online_enabled=online_enabled, + id=id, ) self._feature_store_name = featurestore_name self._description = description @@ -2506,7 +2529,6 @@ def __init__( self._query = query self._data_format = data_format.upper() if data_format else None self._path = path - self._id = id self._expectation_suite = expectation_suite self._features = [ From b078cd858cda67787e6321ec45fa482e27de9968 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 11:20:44 +0200 Subject: [PATCH 05/23] fix base fg init --- python/hsfs/feature_group.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 72a6be0548..47f7a39fc3 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -79,17 +79,17 @@ def __init__( if self._id is not None: self._expectation_suite_engine = ( expectation_suite_engine.ExpectationSuiteEngine( - feature_store_id=self._feature_store_id, feature_group_id=self._id + feature_store_id=featurestore_id, feature_group_id=self._id ) ) self._validation_report_engine = ( validation_report_engine.ValidationReportEngine( - self._feature_store_id, self._id + featurestore_id, self._id ) ) self._validation_result_engine = ( validation_result_engine.ValidationResultEngine( - self._feature_store_id, self._id + featurestore_id, self._id ) ) self._feature_store_id = featurestore_id @@ -1245,6 +1245,7 @@ def __init__( location, event_time=event_time, online_enabled=online_enabled, + id=id, ) self._feature_store_name = featurestore_name @@ -1253,7 +1254,6 @@ def __init__( self._creator = user.User.from_response_json(creator) self._version = version self._name = name - self._id = id self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat for feat in (features or []) From b7d1b351f7b421641e23eac8ed71adba78245762 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 11:26:50 +0200 Subject: [PATCH 06/23] ext feature group has no stream flag --- python/hsfs/engine/python.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 7d15dc9436..50b2450437 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -437,10 +437,10 @@ def save_dataframe( online_write_options: dict, validation_id: int = None, ): - if feature_group.stream or ( + if ( isinstance(feature_group, ExternalFeatureGroup) and feature_group.online_enabled - ): + ) or feature_group.stream: return self._write_dataframe_kafka( feature_group, dataframe, offline_write_options ) From 020d95ebd50d2b17c698274f1cc6c2d9f1a3b654 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 11:37:50 +0200 Subject: [PATCH 07/23] fix spark engine for stream flag --- python/hsfs/engine/spark.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 25d3c7d280..bdd1a33704 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -66,6 +66,7 @@ pass from hsfs import feature, training_dataset_feature, client, util +from hsfs.feature_group import ExternalFeatureGroup from hsfs.storage_connector import StorageConnector from hsfs.client.exceptions import FeatureStoreException from hsfs.core import hudi_engine, transformation_function_engine, kafka_api @@ -261,7 +262,10 @@ def save_dataframe( validation_id=None, ): try: - if feature_group.stream: + if ( + isinstance(feature_group, ExternalFeatureGroup) + and feature_group.online_enabled + ) or feature_group.stream: self._save_online_dataframe( feature_group, dataframe, online_write_options ) From adb483e0a5cb1c149b920c9015b3eea0cf2c2cc5 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 11:56:52 +0200 Subject: [PATCH 08/23] fix expectation suite init --- python/hsfs/feature_group.py | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 47f7a39fc3..b1efafe4ad 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -64,11 +64,14 @@ def __init__( event_time=None, online_enabled=False, id=None, + expectation_suite=None, ): self.event_time = event_time self._online_enabled = online_enabled self._location = location self._id = id + # use setter for correct conversion + self.expectation_suite = expectation_suite self._statistics_engine = statistics_engine.StatisticsEngine( featurestore_id, self.ENTITY_TYPE ) @@ -77,6 +80,10 @@ def __init__( great_expectation_engine.GreatExpectationEngine(featurestore_id) ) if self._id is not None: + if expectation_suite: + self._expectation_suite._init_expectation_engine( + feature_store_id=featurestore_id, feature_group_id=self._id + ) self._expectation_suite_engine = ( expectation_suite_engine.ExpectationSuiteEngine( feature_store_id=featurestore_id, feature_group_id=self._id @@ -92,6 +99,7 @@ def __init__( featurestore_id, self._id ) ) + self._feature_store_id = featurestore_id self._variable_api = VariableApi() @@ -1246,6 +1254,7 @@ def __init__( event_time=event_time, online_enabled=online_enabled, id=id, + expectation_suite=expectation_suite, ) self._feature_store_name = featurestore_name @@ -1294,26 +1303,6 @@ def __init__( self._hudi_precombine_key = None self.statistics_config = statistics_config - self.expectation_suite = expectation_suite - if expectation_suite: - self._expectation_suite._init_expectation_engine( - feature_store_id=featurestore_id, feature_group_id=self._id - ) - self._expectation_suite_engine = ( - expectation_suite_engine.ExpectationSuiteEngine( - feature_store_id=self._feature_store_id, feature_group_id=self._id - ) - ) - self._validation_report_engine = ( - validation_report_engine.ValidationReportEngine( - self._feature_store_id, self._id - ) - ) - self._validation_result_engine = ( - validation_result_engine.ValidationResultEngine( - self._feature_store_id, self._id - ) - ) else: # initialized by user @@ -1343,7 +1332,6 @@ def __init__( else None ) self.statistics_config = statistics_config - self.expectation_suite = expectation_suite self._feature_group_engine = feature_group_engine.FeatureGroupEngine( featurestore_id @@ -2529,7 +2517,6 @@ def __init__( self._query = query self._data_format = data_format.upper() if data_format else None self._path = path - self._expectation_suite = expectation_suite self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat @@ -2568,7 +2555,6 @@ def __init__( else: self.primary_key = primary_key self.statistics_config = statistics_config - self.expectation_suite = expectation_suite self._features = features self._options = options @@ -2579,7 +2565,6 @@ def __init__( else: self._storage_connector = storage_connector - self.expectation_suite = expectation_suite self._href = href def save(self): From 6ab9dc8deb9985f41eb6f772d2bb23ca8d877108 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 12:49:15 +0200 Subject: [PATCH 09/23] move avro logic to base class --- python/hsfs/feature_group.py | 72 ++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index b1efafe4ad..0c0a59b947 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -1065,6 +1065,42 @@ def primary_key(self): def primary_key(self, new_primary_key): self._primary_key = [pk.lower() for pk in new_primary_key] + @property + def avro_schema(self): + """Avro schema representation of the feature group.""" + return self.subject["schema"] + + def get_complex_features(self): + """Returns the names of all features with a complex data type in this + feature group. + + !!! example + ```python + complex_dtype_features = fg.get_complex_features() + ``` + """ + return [f.name for f in self.features if f.is_complex()] + + def _get_encoded_avro_schema(self): + complex_features = self.get_complex_features() + schema = json.loads(self.avro_schema) + + for field in schema["fields"]: + if field["name"] in complex_features: + field["type"] = ["null", "bytes"] + + schema_s = json.dumps(schema) + try: + avro.schema.parse(schema_s) + except avro.schema.SchemaParseException as e: + raise FeatureStoreException("Failed to construct Avro Schema: {}".format(e)) + return schema_s + + def _get_feature_avro_schema(self, feature_name): + for field in json.loads(self.avro_schema)["fields"]: + if field["name"] == feature_name: + return json.dumps(field["type"]) + def get_statistics( self, commit_time: Optional[Union[str, int, datetime, date]] = None ): @@ -2315,37 +2351,6 @@ def _get_table_name(self): def _get_online_table_name(self): return self.name + "_" + str(self.version) - def get_complex_features(self): - """Returns the names of all features with a complex data type in this - feature group. - - !!! example - ```python - complex_dtype_features = fg.get_complex_features() - ``` - """ - return [f.name for f in self.features if f.is_complex()] - - def _get_encoded_avro_schema(self): - complex_features = self.get_complex_features() - schema = json.loads(self.avro_schema) - - for field in schema["fields"]: - if field["name"] in complex_features: - field["type"] = ["null", "bytes"] - - schema_s = json.dumps(schema) - try: - avro.schema.parse(schema_s) - except avro.schema.SchemaParseException as e: - raise FeatureStoreException("Failed to construct Avro Schema: {}".format(e)) - return schema_s - - def _get_feature_avro_schema(self, feature_name): - for field in json.loads(self.avro_schema)["fields"]: - if field["name"] == feature_name: - return json.dumps(field["type"]) - @property def id(self): """Feature group id.""" @@ -2413,11 +2418,6 @@ def subject(self): self._subject = self._feature_group_engine.get_subject(self) return self._subject - @property - def avro_schema(self): - """Avro schema representation of the feature group.""" - return self.subject["schema"] - @property def stream(self): """Whether to enable real time stream writing capabilities.""" From 7eaad904b1e27d91770787e7cd4239f4eaba43ce Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 13:03:57 +0200 Subject: [PATCH 10/23] move subject --- python/hsfs/feature_group.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 0c0a59b947..1348b10989 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -70,6 +70,7 @@ def __init__( self._online_enabled = online_enabled self._location = location self._id = id + self._subject = None # use setter for correct conversion self.expectation_suite = expectation_suite self._statistics_engine = statistics_engine.StatisticsEngine( @@ -1065,6 +1066,14 @@ def primary_key(self): def primary_key(self, new_primary_key): self._primary_key = [pk.lower() for pk in new_primary_key] + @property + def subject(self): + """Subject of the feature group.""" + if self._subject is None: + # cache the schema + self._subject = self._feature_group_engine.get_subject(self) + return self._subject + @property def avro_schema(self): """Avro schema representation of the feature group.""" @@ -1308,7 +1317,6 @@ def __init__( time_travel_format.upper() if time_travel_format is not None else None ) - self._subject = None self._online_topic_name = online_topic_name self._stream = stream self._parents = parents @@ -2410,14 +2418,6 @@ def created(self): """Timestamp when the feature group was created.""" return self._created - @property - def subject(self): - """Subject of the feature group.""" - if self._subject is None: - # cache the schema - self._subject = self._feature_group_engine.get_subject(self) - return self._subject - @property def stream(self): """Whether to enable real time stream writing capabilities.""" From 8fd0ccf58b06ade48fd8a2dc48851e8787ba5f15 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 13:16:28 +0200 Subject: [PATCH 11/23] try something --- python/hsfs/feature_group.py | 88 ++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 1348b10989..45aac5d76d 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -1066,50 +1066,6 @@ def primary_key(self): def primary_key(self, new_primary_key): self._primary_key = [pk.lower() for pk in new_primary_key] - @property - def subject(self): - """Subject of the feature group.""" - if self._subject is None: - # cache the schema - self._subject = self._feature_group_engine.get_subject(self) - return self._subject - - @property - def avro_schema(self): - """Avro schema representation of the feature group.""" - return self.subject["schema"] - - def get_complex_features(self): - """Returns the names of all features with a complex data type in this - feature group. - - !!! example - ```python - complex_dtype_features = fg.get_complex_features() - ``` - """ - return [f.name for f in self.features if f.is_complex()] - - def _get_encoded_avro_schema(self): - complex_features = self.get_complex_features() - schema = json.loads(self.avro_schema) - - for field in schema["fields"]: - if field["name"] in complex_features: - field["type"] = ["null", "bytes"] - - schema_s = json.dumps(schema) - try: - avro.schema.parse(schema_s) - except avro.schema.SchemaParseException as e: - raise FeatureStoreException("Failed to construct Avro Schema: {}".format(e)) - return schema_s - - def _get_feature_avro_schema(self, feature_name): - for field in json.loads(self.avro_schema)["fields"]: - if field["name"] == feature_name: - return json.dumps(field["type"]) - def get_statistics( self, commit_time: Optional[Union[str, int, datetime, date]] = None ): @@ -1262,6 +1218,50 @@ def online_enabled(self): def online_enabled(self, online_enabled): self._online_enabled = online_enabled + @property + def subject(self): + """Subject of the feature group.""" + if self._subject is None: + # cache the schema + self._subject = self._feature_group_engine.get_subject(self) + return self._subject + + @property + def avro_schema(self): + """Avro schema representation of the feature group.""" + return self.subject["schema"] + + def get_complex_features(self): + """Returns the names of all features with a complex data type in this + feature group. + + !!! example + ```python + complex_dtype_features = fg.get_complex_features() + ``` + """ + return [f.name for f in self.features if f.is_complex()] + + def _get_encoded_avro_schema(self): + complex_features = self.get_complex_features() + schema = json.loads(self.avro_schema) + + for field in schema["fields"]: + if field["name"] in complex_features: + field["type"] = ["null", "bytes"] + + schema_s = json.dumps(schema) + try: + avro.schema.parse(schema_s) + except avro.schema.SchemaParseException as e: + raise FeatureStoreException("Failed to construct Avro Schema: {}".format(e)) + return schema_s + + def _get_feature_avro_schema(self, feature_name): + for field in json.loads(self.avro_schema)["fields"]: + if field["name"] == feature_name: + return json.dumps(field["type"]) + class FeatureGroup(FeatureGroupBase): CACHED_FEATURE_GROUP = "CACHED_FEATURE_GROUP" From 1961019c9c52f1bba5d978a16b911e69558224b3 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 13:22:35 +0200 Subject: [PATCH 12/23] move more stuff to base engine --- python/hsfs/core/feature_group_base_engine.py | 3 +++ python/hsfs/core/feature_group_engine.py | 3 --- python/hsfs/feature_group.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/hsfs/core/feature_group_base_engine.py b/python/hsfs/core/feature_group_base_engine.py index 93ec1145d3..86703d6d40 100644 --- a/python/hsfs/core/feature_group_base_engine.py +++ b/python/hsfs/core/feature_group_base_engine.py @@ -151,3 +151,6 @@ def _verify_schema_compatibility(self, feature_group_features, dataframe_feature "Features are not compatible with Feature Group schema: " + "".join(["\n - " + e for e in err]) ) + + def get_subject(self, feature_group): + return self._kafka_api.get_topic_subject(feature_group._online_topic_name) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index cd1b21265b..78773facb7 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -233,9 +233,6 @@ def update_description(self, feature_group, description): feature_group, copy_feature_group, "updateMetadata" ) - def get_subject(self, feature_group): - return self._kafka_api.get_topic_subject(feature_group._online_topic_name) - def insert_stream( self, feature_group, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 45aac5d76d..a42e59b2a1 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2507,6 +2507,7 @@ def __init__( event_time=event_time, online_enabled=online_enabled, id=id, + expectation_suite=expectation_suite, ) self._feature_store_name = featurestore_name self._description = description @@ -2545,7 +2546,6 @@ def __init__( else [] ) self.statistics_config = statistics_config - self.expectation_suite = expectation_suite self._options = ( {option["name"]: option["value"] for option in options} From 453e594152e1171fcf39ff3f97a745942c425c24 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 14:09:28 +0200 Subject: [PATCH 13/23] try --- python/hsfs/feature_group.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index a42e59b2a1..8f3a0cf7bc 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -103,6 +103,7 @@ def __init__( self._feature_store_id = featurestore_id self._variable_api = VariableApi() + self._feature_group_engine = None def delete(self): """Drop the entire feature group along with its feature data. @@ -2509,6 +2510,7 @@ def __init__( id=id, expectation_suite=expectation_suite, ) + self._feature_store_name = featurestore_name self._description = description self._created = created From 9add9cac3e2b33d6c1bd12402fdadc140d7f9673 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 14:59:14 +0200 Subject: [PATCH 14/23] online topic name has to be set --- python/hsfs/feature_group.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 8f3a0cf7bc..9e1f02c0b4 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -71,6 +71,7 @@ def __init__( self._location = location self._id = id self._subject = None + self._online_topic_name = None # use setter for correct conversion self.expectation_suite = expectation_suite self._statistics_engine = statistics_engine.StatisticsEngine( From 40c8c07559fccb49fd53b51d0a8257cf3666abf4 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 15:40:49 +0200 Subject: [PATCH 15/23] fix online topic name --- python/hsfs/feature_group.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 9e1f02c0b4..9ee30072e9 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -65,13 +65,14 @@ def __init__( online_enabled=False, id=None, expectation_suite=None, + online_topic_name=None, ): self.event_time = event_time self._online_enabled = online_enabled self._location = location self._id = id self._subject = None - self._online_topic_name = None + self._online_topic_name = online_topic_name # use setter for correct conversion self.expectation_suite = expectation_suite self._statistics_engine = statistics_engine.StatisticsEngine( @@ -1302,6 +1303,7 @@ def __init__( online_enabled=online_enabled, id=id, expectation_suite=expectation_suite, + online_topic_name=online_topic_name, ) self._feature_store_name = featurestore_name @@ -1319,7 +1321,6 @@ def __init__( time_travel_format.upper() if time_travel_format is not None else None ) - self._online_topic_name = online_topic_name self._stream = stream self._parents = parents self._deltastreamer_jobconf = None @@ -2502,6 +2503,7 @@ def __init__( expectation_suite=None, online_enabled=False, href=None, + online_topic_name=None, ): super().__init__( featurestore_id, @@ -2510,6 +2512,7 @@ def __init__( online_enabled=online_enabled, id=id, expectation_suite=expectation_suite, + online_topic_name=online_topic_name, ) self._feature_store_name = featurestore_name @@ -2707,11 +2710,9 @@ def show(self, n): def from_response_json(cls, json_dict): json_decamelized = humps.decamelize(json_dict) if isinstance(json_decamelized, dict): - _ = json_decamelized.pop("online_topic_name", None) _ = json_decamelized.pop("type", None) return cls(**json_decamelized) for fg in json_decamelized: - _ = fg.pop("online_topic_name", None) _ = fg.pop("type", None) return [cls(**fg) for fg in json_decamelized] From d6fa127b7cae57b911e92e0ffb99b1f9b261a970 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 30 Mar 2023 16:44:41 +0200 Subject: [PATCH 16/23] fix UT --- python/hsfs/feature_group.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 9ee30072e9..d55885d050 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -73,6 +73,7 @@ def __init__( self._id = id self._subject = None self._online_topic_name = online_topic_name + self._feature_store_id = featurestore_id # use setter for correct conversion self.expectation_suite = expectation_suite self._statistics_engine = statistics_engine.StatisticsEngine( From b15bb8481dd739569803ed593422c64b76cd726b Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 3 Apr 2023 10:46:31 +0200 Subject: [PATCH 17/23] allow reading from online --- python/hsfs/feature_group.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index d55885d050..3a33ebbd2b 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2641,7 +2641,9 @@ def insert( ge_report.to_ge_type() if ge_report is not None else None, ) - def read(self, dataframe_type="default"): + def read( + self, dataframe_type: Optional[str] = "default", online: Optional[bool] = False + ): """Get the feature group as a DataFrame. !!! example @@ -2666,6 +2668,8 @@ def read(self, dataframe_type="default"): # Arguments dataframe_type: str, optional. Possible values are `"default"`, `"spark"`, `"pandas"`, `"numpy"` or `"python"`, defaults to `"default"`. + online: bool, optional. If `True` read from online feature store, defaults + to `False`. # Returns `DataFrame`: The spark dataframe containing the feature data. @@ -2683,7 +2687,7 @@ def read(self, dataframe_type="default"): self._name, self._feature_store_name ), ) - return self.select_all().read(dataframe_type=dataframe_type) + return self.select_all().read(dataframe_type=dataframe_type, online=online) def show(self, n): """Show the first n rows of the feature group. From 34366f46e6b5a6271403e77469bb6d92ffae031e Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 3 Apr 2023 11:24:14 +0200 Subject: [PATCH 18/23] small fix --- python/hsfs/feature_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 3a33ebbd2b..cdac6ada84 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -107,6 +107,7 @@ def __init__( self._feature_store_id = featurestore_id self._variable_api = VariableApi() self._feature_group_engine = None + self._multi_part_insert = False def delete(self): """Drop the entire feature group along with its feature data. @@ -1387,7 +1388,6 @@ def __init__( self._href = href # cache for optimized writes - self._multi_part_insert = False self._kafka_producer = None self._feature_writers = None self._writer = None From 422d20f74161c85522a8095afd5139068a680533 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 3 Apr 2023 11:56:47 +0200 Subject: [PATCH 19/23] fix backfill job not available for ext fg --- python/hsfs/engine/python.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 50b2450437..397cf4bef7 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -900,14 +900,16 @@ def acked(err, msg): progress_bar.close() # start backfilling job - if offline_write_options is not None and offline_write_options.get( - "start_offline_backfill", True + if ( + not isinstance(feature_group, ExternalFeatureGroup) + and offline_write_options is not None + and offline_write_options.get("start_offline_backfill", True) ): feature_group.backfill_job.run( await_termination=offline_write_options.get("wait_for_job", True) ) - - return feature_group.backfill_job + return feature_group.backfill_job + return None def _kafka_produce( self, producer, feature_group, key, encoded_row, acked, offline_write_options From 6ee59eddd49b2e4f4bf48660574aba25dbeeaa2e Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 4 Apr 2023 14:01:48 +0200 Subject: [PATCH 20/23] small fix --- python/hsfs/engine/python.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 397cf4bef7..8c3fd1254d 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -908,8 +908,9 @@ def acked(err, msg): feature_group.backfill_job.run( await_termination=offline_write_options.get("wait_for_job", True) ) - return feature_group.backfill_job - return None + if isinstance(feature_group, ExternalFeatureGroup): + return None + return feature_group.backfill_job def _kafka_produce( self, producer, feature_group, key, encoded_row, acked, offline_write_options From 8252af09b6a2dbcac59afd9778fc38aa94f77cdc Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 4 Apr 2023 14:51:45 +0200 Subject: [PATCH 21/23] fix --- python/hsfs/feature_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index cdac6ada84..5018c6c97e 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2619,7 +2619,7 @@ def insert( self, feature_dataframe=feature_dataframe, write_options=write_options, - validation_options=validation_options, + validation_options={"save_report": True, **validation_options}, ) if save_code and ( From 2b6cc367dac7c4d18e96b9b173638cf7fc1057a5 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 4 Apr 2023 16:50:13 +0200 Subject: [PATCH 22/23] move validate method --- python/hsfs/feature_group.py | 130 +++++++++++++++++------------------ 1 file changed, 65 insertions(+), 65 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 5018c6c97e..043dbb1673 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -1012,6 +1012,71 @@ def get_validation_history( "Only Feature Group registered with Hopsworks can fetch validation history." ) + def validate( + self, + dataframe: Optional[ + Union[pd.DataFrame, TypeVar("pyspark.sql.DataFrame")] # noqa: F821 + ] = None, + expectation_suite: Optional[ExpectationSuite] = None, + save_report: Optional[bool] = False, + validation_options: Optional[Dict[Any, Any]] = {}, + ingestion_result: str = "UNKNOWN", + ge_type: bool = True, + ) -> Union[ge.core.ExpectationSuiteValidationResult, ValidationReport, None]: + """Run validation based on the attached expectations. + + Runs any expectation attached with Deequ. But also runs attached Great Expectation + Suites. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + # get feature group instance + fg = fs.get_or_create_feature_group(...) + + ge_report = fg.validate(df, save_report=False) + ``` + + # Arguments + dataframe: The dataframe to run the data validation expectations against. + expectation_suite: Optionally provide an Expectation Suite to override the + one that is possibly attached to the feature group. This is useful for + testing new Expectation suites. When an extra suite is provided, the results + will never be persisted. Defaults to `None`. + validation_options: Additional validation options as key-value pairs, defaults to `{}`. + * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. + * key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations. + ingestion_result: Specify the fate of the associated data, defaults + to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", + "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation + of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" + for testing and development and "FG_DATA" when validating data + already in the Feature Group. + save_report: Whether to save the report to the backend. This is only possible if the Expectation suite + is initialised and attached to the Feature Group. Defaults to False. + ge_type: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True. + + # Returns + A Validation Report produced by Great Expectations. + """ + # Activity is logged only if a the validation concerns the feature group and not a specific dataframe + if dataframe is None: + dataframe = self.read() + if ingestion_result == "UNKNOWN": + ingestion_result = "FG_DATA" + + return self._great_expectation_engine.validate( + self, + dataframe=engine.get_instance().convert_to_default_dataframe(dataframe), + expectation_suite=expectation_suite, + save_report=save_report, + validation_options=validation_options, + ingestion_result=ingestion_result, + ge_type=ge_type, + ) + def __getattr__(self, name): try: return self.__getitem__(name) @@ -2169,71 +2234,6 @@ def as_of( """ return self.select_all().as_of(wallclock_time, exclude_until) - def validate( - self, - dataframe: Optional[ - Union[pd.DataFrame, TypeVar("pyspark.sql.DataFrame")] # noqa: F821 - ] = None, - expectation_suite: Optional[ExpectationSuite] = None, - save_report: Optional[bool] = False, - validation_options: Optional[Dict[Any, Any]] = {}, - ingestion_result: str = "UNKNOWN", - ge_type: bool = True, - ) -> Union[ge.core.ExpectationSuiteValidationResult, ValidationReport, None]: - """Run validation based on the attached expectations. - - Runs any expectation attached with Deequ. But also runs attached Great Expectation - Suites. - - !!! example - ```python - # connect to the Feature Store - fs = ... - - # get feature group instance - fg = fs.get_or_create_feature_group(...) - - ge_report = fg.validate(df, save_report=False) - ``` - - # Arguments - dataframe: The dataframe to run the data validation expectations against. - expectation_suite: Optionally provide an Expectation Suite to override the - one that is possibly attached to the feature group. This is useful for - testing new Expectation suites. When an extra suite is provided, the results - will never be persisted. Defaults to `None`. - validation_options: Additional validation options as key-value pairs, defaults to `{}`. - * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. - * key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations. - ingestion_result: Specify the fate of the associated data, defaults - to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", - "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation - of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" - for testing and development and "FG_DATA" when validating data - already in the Feature Group. - save_report: Whether to save the report to the backend. This is only possible if the Expectation suite - is initialised and attached to the Feature Group. Defaults to False. - ge_type: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True. - - # Returns - A Validation Report produced by Great Expectations. - """ - # Activity is logged only if a the validation concerns the feature group and not a specific dataframe - if dataframe is None: - dataframe = self.read() - if ingestion_result == "UNKNOWN": - ingestion_result = "FG_DATA" - - return self._great_expectation_engine.validate( - self, - dataframe=engine.get_instance().convert_to_default_dataframe(dataframe), - expectation_suite=expectation_suite, - save_report=save_report, - validation_options=validation_options, - ingestion_result=ingestion_result, - ge_type=ge_type, - ) - def compute_statistics( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None ): From 0f9de67a4be8339b9b66b147b9d8be89527b79b5 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 4 Apr 2023 17:14:29 +0200 Subject: [PATCH 23/23] throw exception when reading from external feature group from python engine --- python/hsfs/feature_group.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 043dbb1673..729a74c6ef 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2681,6 +2681,13 @@ def read( # Raises `hsfs.client.exceptions.RestAPIError`. """ + if engine.get_type() == "python": + raise FeatureStoreException( + "Reading an External Feature Group directly into a Pandas Dataframe using " + + "Python/Pandas as Engine is not supported, however, you can use the " + + "Query API to create Feature Views/Training Data containing External " + + "Feature Groups." + ) engine.get_instance().set_job_group( "Fetching Feature group", "Getting feature group: {} from the featurestore {}".format(