### Feature Engineering

Our goal is to build simple recommender system using matrix factorization and predictive methods(collaborative & content based filtering). In order to do this, we need to do some feature engineering. 
We need to create some customer based and some product based features.

In [0]:
sales_transactions = spark.read.table("samples.bakehouse.sales_transactions")

We will create a feature store with the following customer based features:

- total number of transactions
- average transaction value
- receny
- frequency
- monetary value
- did they buy during the week?
- did they buy during the weekend?


In [0]:
# Create a database in the Hive metastore using PySpark
spark.sql("""
    CREATE DATABASE IF NOT EXISTS my_database
    COMMENT 'This is my database for storing sales transactions features for my toy project'
    LOCATION 'dbfs:/user/hive/warehouse/recommender_system_toy.db'
""")

DataFrame[]

In [0]:
from pyspark.sql import functions as F
from databricks.feature_store import FeatureStoreClient

# Step 1: Compute the customer features
customer_features_df = sales_transactions.groupBy('customerID').agg(
    F.count('transactionID').alias('total_transactions')
)

# Create the database if it does not exist
spark.sql("CREATE DATABASE IF NOT EXISTS recommender_system_toy")

# Step 2: Create a feature table
fs = FeatureStoreClient()
customer_feature_table = fs.create_table(
    name='recommender_system_toy.customer_features',
    primary_keys='customerID',
    schema=customer_features_df.schema,
    description='Customer features including total number of transactions'
)

# Step 3: Write the features to the feature table
fs.write_table(
    name='recommender_system_toy.customer_features',
    df=customer_features_df,
    mode='overwrite'
)

# Display the feature table
display(customer_features_df)

  """The sequence number of this run attempt for a triggered job run. The initial attempt of a run
  """The sequence number of this run attempt for a triggered job run. The initial attempt of a run
  """The sequence number of this run attempt for a triggered job run. The initial attempt of a run
2025/01/18 21:45:20 INFO databricks.ml_features._compute_client._compute_client: Created feature table 'hive_metastore.recommender_system_toy.customer_features'.
{"ts": "2025-01-18 21:45:27,196", "level": "ERROR", "logger": "SQLQueryContextLogger", "msg": "Illegal table name abfss:REDACTED_LOCAL_PART@ucstprdwesteu.dfs.core.windows.net/17a8f892-3592-4cda-a60f-4dd7892dc6fe/tables/74fd7647-46d3-4d0c-b1e8-0dfbe6635248.", "context": {"errorClass": "_LEGACY_ERROR_TEMP_DBR_0016"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o393.sql.\n: org.apache.spark.sql.catalyst.parser.ParseException: \nIllegal table name abfss:REDACTED_LOCAL_PART@ucstprdwesteu.dfs.core.windows.

customerID,total_transactions
1000061,16
1000179,11
1000070,12
1000012,9
1000073,9
1000258,7
1000272,8
1000178,12
1000175,8
1000119,7
