In [0]:
import json

dbutils.widgets.text("src_table", "")
src_table = dbutils.widgets.get("src_table")
 
dbutils.widgets.text("select_columns", "[]")
select_columns = json.loads(dbutils.widgets.get("select_columns"))

dbutils.widgets.text("filter_sql_expression", "")
filter_sql_expression = dbutils.widgets.get("filter_sql_expression")

dbutils.widgets.text("feature_store_table", "")
feature_store_table = dbutils.widgets.get("feature_store_table")

dbutils.widgets.text("feature_store_primary_keys", "[]")
feature_store_primary_keys = dbutils.widgets.get("feature_store_primary_keys")

dbutils.widgets.text("feature_store_timestamp_keys", "[]")
feature_store_timestamp_keys = dbutils.widgets.get("feature_store_timestamp_keys")

dbutils.widgets.text("feature_store_partition_columns", "[]")
feature_store_partition_columns = json.loads(dbutils.widgets.get("feature_store_partition_columns"))

dbutils.widgets.text("feature_store_description", "")
feature_store_description = dbutils.widgets.get("feature_store_description")

assert src_table
assert feature_store_table

print(src_table, select_columns, filter_sql_expression, feature_store_table, feature_store_primary_keys, feature_store_timestamp_keys, feature_store_partition_columns, feature_store_description)

In [0]:
from databricks import feature_store

In [0]:
df = spark.read.format("delta").load(src_table)
if select_columns:
    df = df.select(select_columns)
if filter_sql_expression:
    df = df.filter(filter_sql_expression)

In [0]:
# To improve query speed, Delta Lake on Databricks supports the ability to optimize the layout of data stored in cloud storage. Delta Lake on Databricks supports the algorithms called Z-Ordering.
# In ordinary delta lake table we externally had to apply these algorithms to optimize the delta tables. But in feature store delta table Z-ordering is implicitly applied on primary_keys of the table.
# This Z-ordering is only applied on columns with int datatype.
# We disable Z-ordering in feature_table when its primary_key is of string data_type using the command set spark.databricks.delta.optimize.zorder.checkStatsCollection.enabled = false

feature_store_primary_keys = [feature_store_primary_keys]
feature_store_timestamp_keys = [feature_store_timestamp_keys]
primary_key_dtype = df.select(feature_store_primary_keys).dtypes

for key, key_data_type in primary_key_dtype:
    if key_data_type == 'string':
        spark.sql('set spark.databricks.delta.optimize.zorder.checkStatsCollection.enabled = false')
        break


In [0]:
fs = feature_store.FeatureStoreClient()

Write to the existing feature store table or create a brand new one

In [0]:
if spark.catalog.tableExists(feature_store_table):
    fs.write_table(
        name=feature_store_table,
        df=df,
        mode="merge"
    )
else:
    assert feature_store_table
    assert feature_store_primary_keys
    assert feature_store_description
    feature_table = fs.create_table(
        name=feature_store_table,
        primary_keys=feature_store_primary_keys,
        timestamp_keys=feature_store_timestamp_keys,
        partition_columns=feature_store_partition_columns,
        description=feature_store_description,
        df=df,
        schema=df.schema,
    )