In [1]:
import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()

###############################################################################
# create global Client._send_request wrapper
import hsfs
from hsfs.client import base
send_request_original = base.Client._send_request

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
11,application_1659447865675_0004,pyspark,idle,Link,Link


SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

In [23]:
responses_dict = {}

In [28]:
def wrap_send_request(response_instance):

    def _send_request_wrap(
            self,
            method,
            path_params,
            query_params=None,
            headers=None,
            data=None,
            stream=False,
            files=None,
    ):
        global send_request_original
        response = send_request_original(self, method, path_params, query_params, headers, data, stream, files)
        response_instance.response = response
        response_instance.method = method
        response_instance.path_params = path_params
        response_instance.query_params = query_params
        response_instance.headers = headers
        response_instance.data = data
        response_instance.stream = stream
        response_instance.files = files
        return response

    hsfs.client.base.Client._send_request = _send_request_wrap
    
def unwrap_send_request():
    global send_request_original
    base.Client._send_request = send_request_original

In [32]:
class RequestResponseInstance:
    def __init__(self):
        self.response = None
        self.method = None
        self.path_params = None
        self.query_params = None
        self.headers = None
        self.data = None
        self.stream = None
        self.files = None
        
    def to_dict(self):
        d = {}
        d["response"] = self.response
        d["method"] = self.method
        d["path_params"] = self.path_params
        d["query_params"] = self.query_params
        d["headers"] = self.headers
        return d

class ResponseGenerator:
    
    def __init__(self, name):
        self.name = name

    def prepare(self):
        pass

    def initiate_call(self):
        pass

    def cleanup(self):
        pass
    
    def run(self):
        response_instance = RequestResponseInstance()

        self.prepare()
        
        wrap_send_request(response_instance)
        self.initiate_call()
        unwrap_send_request()
        
        self.cleanup()
        
        global responses_dict
        if self.name in responses_dict:
            raise Exception("fixture was already determined. remove instance from responses_dict or rename generator to continue.")
        responses_dict[self.name] = response_instance.to_dict()
        
        return response_instance

In [33]:
# specify generators...

class FeatureGroupResponseGenerator(ResponseGenerator):

    def prepare(self):

        from pyspark.sql.types import StructType, StructField, StringType, IntegerType
        data2 = [(1, "asd"),(2, "asssd"),(23, "adssd"),(1, "adsasd"),(7, "asds")]

        schema = StructType([
            StructField("intt",IntegerType(),True),
            StructField("stringt",StringType(),True)
        ])

        df = spark.createDataFrame(data=data2,schema=schema)

        from hsfs.feature import Feature
        features = [
            Feature(name="intt",type="int",online_type="int"),
            Feature(name="arrt",type="array<int>",online_type="varchar(1000)")
        ]

        features = [
            Feature(name="intt",type="int",online_type="int"),
            Feature(name="stringt",type="string",online_type="varchar(1000)")
        ]
        self.fg = fs.create_feature_group(name="fg_test",
                                     features=features,
                                     primary_key=["intt"], # key can not contain null values
                                     online_enabled=True,
                                     time_travel_format="HUDI")

        
        self.fg.save(df)

    def initiate_call(self):
        fs.get_feature_group("fg_test", version=1)
        
    def cleanup(self):
        self.fg.delete()

In [34]:
# run generators...
FeatureGroupResponseGenerator("get_feature_group").run()    

Feature Group created successfully, explore it at 
https://hopsworks0.logicalclocks.com/p/119/fs/67/fg/1048

In [45]:
# write fixtures
import json
import pydoop.hdfs as hdfs

filename = f'hdfs:///Projects/{fs.project_name}/Resources/backend_fixtures.json'
with hdfs.open(filename, 'wt') as json_file:
    json.dump(responses_dict, json_file, 
                        indent=4,  
                        separators=(',',': '))