# 0 Background
- Notebook showcases semantic search for Wayfair's WAND product data set
- Leverages
    1. *SetenceTransformer* to get embedding
    2. *Langchain* integration to vector ChromaDB
    3. *PySpark* data processing
    4. *MLFlow* experimentation and model deployment workflow
    
- [source github](https://github.com/thomaschangsf/db-product-search)

# 1 Setup
- Installed pyspark dependencies: [spark ref](https://spark.apache.org/docs/latest/api/python/getting_started/install.html)

- start notebook via terminal cmd: pyspark 

In [2]:
# Create SparkSession from builder
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('Scalling_ml_with_spark') \
                    .getOrCreate()

from pyspark.sql.types import *
import pyspark.sql.functions as fn

import os

spark

23/05/03 09:05:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
spark.sql("SHOW TABLES;")

23/05/03 09:05:24 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/05/03 09:05:24 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/05/03 09:05:25 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/05/03 09:05:25 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore chang@127.0.0.1
23/05/03 09:05:26 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


DataFrame[namespace: string, tableName: string, isTemporary: boolean]

In [4]:
spark.sql('create database if not exists {0}'.format(config['database']))

NameError: name 'config' is not defined

# 2 Intro and Config.py
- Based on 00_Introl_and_Config.py

In [10]:
os.getcwd()

'/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search'

In [12]:

WORKDIR

'/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir'

In [14]:
!ls /Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir

[34mmlflow[m[m [34mwands[m[m


In [36]:

if 'config' not in locals().keys():
  config = {}

config['database'] = 'wands'

# create database if not exists
_ = spark.sql('create database if not exists {0}'.format(config['database']))

# set current datebase context
_ = spark.catalog.setCurrentDatabase(config['database'])


# use this if we started notebook via venv/bin/pyspark
WORKDIR=f"{os.getcwd()}/work-dir"
print(f"WORKDIR={WORKDIR}")

# below is only true if we run from docker
# os.getenv('HOME') is /home/jovyan bc jupyter started with this command
# docker run -it --memory="28g" --memory-swap="30g"  -p 8888:8888 --mount type=bind,source=$(pwd),target=/home/jovyan adipolak/ml-with-apache-spark
# WORKDIR=f"{os.getenv('HOME')}/db-product-search/work-dir"


# DB
config['dbfs_path'] = f'{WORKDIR}/wands'
config['WANDS_DOWNLOADS_PATH'] = config['dbfs_path'] + '/downloads'


# Models
config['WANDS_MODEL_PATH'] = WORKDIR + '/models'
config['basic_model_name'] = 'wands_basic_search'
config['tuned_model_name'] = 'wands_tuned_search'

# MFLOW
import mlflow
config['mlflow_path'] = f"{WORKDIR}/mlflow/experiments/"
mlflow.set_experiment(config['mlflow_path'])


config



WORKDIR=/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir


{'database': 'wands',
 'dbfs_path': '/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/wands',
 'WANDS_DOWNLOADS_PATH': '/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/wands/downloads',
 'basic_model_name': 'wands_basic_search',
 'tuned_model_name': 'wands_tuned_search',
 'mlflow_path': '/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/mlflow/experiments/',
 'WANDS_MODEL_PATH': '/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/models'}

# 3 Data Prep
- Based on 01_data_Prep.py

### 3.1 Download raw data 
- to config[WANDS_DOWNLOADS_PATH]

In [77]:
!rm -rf {config['WANDS_DOWNLOADS_PATH']}
!mkdir -p {config['WANDS_DOWNLOADS_PATH']}
!pushd {config['WANDS_DOWNLOADS_PATH']}

config

ERROR! Session/line number was not unique in database. History logging moved to new session 8
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
pushd: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory


{'database': 'wands',
 'dbfs_path': '/home/jovyan/db-product-search/work-dir/wands',
 'basic_model_name': 'wands_basic_search',
 'tuned_model_name': 'wands_tuned_search',
 'mlflow_path': '/home/jovyan/db-product-search/work-dir/mlflow/experiments/',
 'WANDS_DOWNLOADS_PATH': '/home/jovyan/db-product-search/work-dir/wands/downloads'}

In [78]:
%cd {config['WANDS_DOWNLOADS_PATH']}

!echo "Download label.csv"
!wget -q https://raw.githubusercontent.com/wayfair/WANDS/main/dataset/label.csv

!echo "Download product"
!wget -q https://raw.githubusercontent.com/wayfair/WANDS/main/dataset/product.csv

!echo "Download query"
!wget -q https://raw.githubusercontent.com/wayfair/WANDS/main/dataset/query.csv


/home/jovyan/db-product-search/work-dir/wands/downloads
Download label.csv
Download product
Download query


### 3.2 Read into spark

In [7]:
from pyspark.sql.types import *
import pyspark.sql.functions as fn

import os

#### Clean all previous tables

In [21]:
!ls {os.getcwd()}/spark-warehouse/wands.db/*

/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/spark-warehouse/wands.db/labels:
_SUCCESS
part-00000-a90eb9ca-e01b-4e7a-aad3-ae089bd33e79-c000.snappy.parquet

/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/spark-warehouse/wands.db/products:
_SUCCESS
part-00000-77043ccb-3515-47eb-b14d-bf09ae9469fa-c000.snappy.parquet

/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/spark-warehouse/wands.db/queries:
_SUCCESS
part-00000-ca8f2bbc-7934-43ec-b5b8-6152b5abd361-c000.snappy.parquet


In [22]:
!rm -rf {os.getcwd()}/spark-warehouse/wands.db/*

#### Process Product
- saves to spark-warehouse/wands.db/products/*.parquet

In [23]:
products_schema = StructType([
  StructField('product_id', IntegerType()),
  StructField('product_name', StringType()),
  StructField('product_class', StringType()),
  StructField('category_hierarchy', StringType()),
  StructField('product_description', StringType()),
  StructField('product_features', StringType()),
  StructField('rating_count', FloatType()),
  StructField('average_rating', FloatType()),
  StructField('review_count', FloatType())
  ])

_ = (
  spark
    .read
      .csv(
        path=f"{config['WANDS_DOWNLOADS_PATH']}/product.csv",
        sep='\t',
        header=True,
        schema=products_schema
        )
    .write
      .format('parquet')
      #.format('delta')
      .mode('overwrite')
      .option('overwriteSchema','true')
      .saveAsTable('products')
  )

display(spark.table('products'))


23/05/03 09:13:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/05/03 09:13:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: product_id, product_name, product_class, category hierarchy, product_description, product_features, rating_count, average_rating, review_count
 Schema: product_id, product_name, product_class, category_hierarchy, product_description, product_features, rating_count, average_rating, review_count
Expected: category_hierarchy but found: category hierarchy
CSV file: file:///Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/wands/downloads/product.csv


                                                                                

23/05/03 09:13:48 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/05/03 09:13:48 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/05/03 09:13:48 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/05/03 09:13:48 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


DataFrame[product_id: int, product_name: string, product_class: string, category_hierarchy: string, product_description: string, product_features: string, rating_count: float, average_rating: float, review_count: float]

#### Process Query

In [24]:
queries_schema = StructType([
  StructField('query_id', IntegerType()),
  StructField('query', StringType()),
  StructField('query_class', StringType())
  ])

_ = (
  spark
    .read
    .csv(
      path=f"{config['WANDS_DOWNLOADS_PATH']}/query.csv",
      sep='\t',
      header=True,
      schema=queries_schema
      )
    .write
      .format('parquet')
      .mode('overwrite')
      .option('overwriteSchema','true')
      .saveAsTable('queries')
  )

display(
  spark.table('queries')
  )

DataFrame[query_id: int, query: string, query_class: string]

#### Process Labels

In [25]:
labels_schema = StructType([
  StructField('id', IntegerType()),
  StructField('query_id', IntegerType()),
  StructField('product_id', IntegerType()),
  StructField('label', StringType())
  ])

_ = (
  spark
    .read
    .csv(
      path=f"{config['WANDS_DOWNLOADS_PATH']}/label.csv",
      sep='\t',
      header=True,
      schema=labels_schema
      )
    .write
      .format('parquet')
      .mode('overwrite')
      .option('overwriteSchema','true')
      .saveAsTable('labels')
  )

display(spark.table('labels'))

DataFrame[id: int, query_id: int, product_id: int, label: string]

In [26]:
spark.table('labels').show(2)


+---+--------+----------+----------+
| id|query_id|product_id|     label|
+---+--------+----------+----------+
|  0|       0|     25434|     Exact|
|  1|       0|     12088|Irrelevant|
+---+--------+----------+----------+
only showing top 2 rows



In [27]:
if 'label_score' not in spark.table('labels').columns:
  _ = spark.sql('ALTER TABLE labels ADD COLUMN label_score FLOAT')

df_label = spark.table('labels')
df_label.show(2)

+---+--------+----------+----------+-----------+
| id|query_id|product_id|     label|label_score|
+---+--------+----------+----------+-----------+
|  0|       0|     25434|     Exact|       null|
|  1|       0|     12088|Irrelevant|       null|
+---+--------+----------+----------+-----------+
only showing top 2 rows



In [28]:
from pyspark.sql.functions import when,col


### Sql update does not work because it needs delta lake
#spark.sql("""
#UPDATE labels
#SET label_score = 
#  CASE lower(label)
#        WHEN 'Exact' THEN 1.0
#        WHEN 'Partial' THEN 0.75
#        WHEN 'Irrelevant' THEN 0.0
#        ELSE NULL
#        END;
#""")

df_label = df_label.withColumn("label_score", when(df_label.label == "Exact", 1.0)\
                                               .when(df_label.label == "Partial", 0.75)\
                                               .when(df_label.label == "Irrelevant", 0.0)\
                                               .when(df_label.label == "Exact", 1.0)\
                                               .otherwise(col("label_score")))

df_label.show(2)

+---+--------+----------+----------+-----------+
| id|query_id|product_id|     label|label_score|
+---+--------+----------+----------+-----------+
|  0|       0|     25434|     Exact|        1.0|
|  1|       0|     12088|Irrelevant|        0.0|
+---+--------+----------+----------+-----------+
only showing top 2 rows



# 4 Define Basic Search
- based on 02_Define_Basic_Searc.py


1. Create DF that joins product columns
2. Use sentence transformer to create embedding
3. Use langChain vector db connector to load
4. Analysis: correlation(cosine similarity, label score)

## 4.0 Load libraries

In [29]:
%pip install langchain chromadb






Note: you may need to restart the kernel to use updated packages.


In [30]:
from sentence_transformers import SentenceTransformer

from langchain.document_loaders import DataFrameLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

import mlflow

import pandas as pd
print('DONE')

DONE


## 4.1 Aggregate Data

In [31]:
product_text_pd = (
  spark
    .table('products')
    .selectExpr(
      'product_id',
      'product_name',
      'COALESCE(product_description, product_name) as product_text' # use product description if available, otherwise name
      )
  ).toPandas()

display(product_text_pd)

Unnamed: 0,product_id,product_name,product_text
0,0,solid wood platform bed,"good , deep sleep can be quite difficult to ha..."
1,1,all-clad 7 qt . slow cooker,"create delicious slow-cooked meals , from tend..."
2,2,all-clad electrics 6.5 qt . slow cooker,prepare home-cooked meals on any schedule with...
3,3,all-clad all professional tools pizza cutter,this original stainless tool was designed to c...
4,4,baldwin prestige alcott passage knob with roun...,the hardware has a rich heritage of delivering...
...,...,...,...
42989,42989,malibu pressure balanced diverter fixed shower...,the malibu pressure balanced diverter fixed sh...
42990,42990,emmeline 5 piece breakfast dining set,emmeline 5 piece breakfast dining set
42991,42991,maloney 3 piece pub table set,this pub table set includes 1 counter height t...
42992,42992,fletcher 27.5 '' wide polyester armchair,"bring iconic , modern style to your space in a..."


## 4.2 Convert product_text into embeddings
- We will now convert our product text into embeddings.  The instructions for converting text into an embedding is captured in a language model.  The [*all-MiniLM-L12-v2* model](https://huggingface.co/sentence-transformers/all-MiniLM-L12-v2) is a *mini language model* (in contrast to a large language model) which has been trained on a large, well-rounded corpus of input text for good, balanced performance in a variety of document search scenarios.  The benefit of the *mini* language model as compared to a *large* language is that the *mini* model generates a more succinct embedding structure that facilitates faster search and lower overall resource utilization.  Given the limited breadth of the content in a product catalog, this is the best option of our needs:

In [33]:
original_model = SentenceTransformer('all-MiniLM-L12-v2')


Downloading (…)5dded/.gitattributes:   0%|          | 0.00/1.18k [00:00<?, ?B/s]

Downloading (…)_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Downloading (…)4d81d5dded/README.md:   0%|          | 0.00/10.6k [00:00<?, ?B/s]

Downloading (…)81d5dded/config.json:   0%|          | 0.00/573 [00:00<?, ?B/s]

Downloading (…)ce_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

Downloading (…)ded/data_config.json:   0%|          | 0.00/39.3k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/134M [00:00<?, ?B/s]

Downloading (…)nce_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading (…)5dded/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/352 [00:00<?, ?B/s]

Downloading (…)dded/train_script.py:   0%|          | 0.00/13.2k [00:00<?, ?B/s]

Downloading (…)4d81d5dded/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)1d5dded/modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

In [54]:
embedding_model_path = f"{config['WANDS_MODEL_PATH']}/embedding_model"
print(f'embedding_model_path={embedding_model_path}')

#!rm -rf {embedding_model_path}/*
#!mkdir -p {embedding_model_path}

original_model.save(embedding_model_path)


embedding_model_path=/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/models/embedding_model


## 4.3 Ingest into Vector DB

#### Reload the orignal sentence transformer using langchain wrapper

In [55]:
embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_path)


In [60]:
chromadb_path = f"{config['dbfs_path']}/chromadb"
!mkdir -p {chromadb_path}
chromadb_path

'/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/wands/chromadb'

In [65]:
# Creat langcahin DataFrameLoader object
documents = (
  DataFrameLoader( #langchain.document_loaders.dataframe.DataFrameLoader
    product_text_pd,
    page_content_column='product_text'
    )
    .load()
  )

In [66]:
# Generate Embeddings from Product Info
# define logic for embeddings storage
vectordb = Chroma.from_documents(
  documents=documents, 
  embedding_model=embedding_model, 
  persist_directory=chromadb_path
  )




Downloading (…)e9125/.gitattributes:   0%|          | 0.00/1.18k [00:00<?, ?B/s]

Downloading (…)_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Downloading (…)7e55de9125/README.md:   0%|          | 0.00/10.6k [00:00<?, ?B/s]

Downloading (…)55de9125/config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

Downloading (…)ce_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

Downloading (…)125/data_config.json:   0%|          | 0.00/39.3k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

Downloading (…)nce_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading (…)e9125/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

Downloading (…)9125/train_script.py:   0%|          | 0.00/13.2k [00:00<?, ?B/s]

Downloading (…)7e55de9125/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)5de9125/modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

In [67]:
# persist vector db to storage
# creates 2 parquet file; before we just had an index folder
vectordb.persist()

# DBTITLE 1,Count Items in Vector DB
vectordb._collection.count()

FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

42994

In [68]:
# Examine a Vector DB record
rec= vectordb._collection.peek(1)

print('Metadatas:  ', rec['metadatas'])
print('Documents:  ', rec['documents'])
print('ids:        ', rec['ids'])
print('embeddings: ', rec['embeddings'])

Metadatas:   [{'product_id': 41976, 'product_name': 'bradleigh 2 - drawer end table'}]
Documents:   ['large countertops , clear storage : spacious countertops for everyday ornaments , cups , books , etc . thick and stable load plate : made of durable and durable plates , the load is more secure . firmly edge-sealing , carefully built : edge-sealing fit , not easy to lift , creating a rich home atmosphere . lifting table : creative design , change if you want , you can also use it as a desk , read books , play games , extraordinary experience . bring four wheels : move freely , save effort , and go wherever you want .']
ids:         ['c2d0edc4-e9d1-11ed-a6bb-1e00d20ee484']
embeddings:  [[-0.0447920486330986, 0.008601327426731586, -0.018501171842217445, 0.02558579109609127, -0.047929875552654266, -0.022731784731149673, 0.00840948149561882, 0.07353478670120239, 0.012087791226804256, 0.052916303277015686, -0.0822484940290451, -0.027594074606895447, -0.02226649969816208, 0.02781116403639316

In [69]:
# Perform Simple Search
vectordb.similarity_search_with_score("kid-proof rug")

[(Document(page_content="children 's nylon educational and play area rug .", metadata={'product_id': 42543, 'product_name': 'hartland abc barnyard power loomed green/red area rug'}),
  0.4518544673919678),
 (Document(page_content='this modern and convenient rug combines contemporary colors , an elegant medallion pattern , a wood floor safe backing , and durable construction . the kid and pet safe materials easy to spot clean , making these rugs ideal for a busy family .', metadata={'product_id': 41810, 'product_name': 'nile tarifa bohemian medallion red area rug'}),
  0.5379310846328735),
 (Document(page_content='this modern and convenient rug combines contemporary colors , an elegant medallion pattern , a wood floor safe backing , and durable construction . the kid and pet safe materials easy to spot clean , making these rugs ideal for a busy family .', metadata={'product_id': 41811, 'product_name': 'nile tarifa bohemian medallion yellow area rug'}),
  0.5379310846328735),
 (Document(

In [70]:
vectordb.similarity_search_with_score("bicyclist")

[(Document(page_content='decorative motorcycle', metadata={'product_id': 17819, 'product_name': 'decorative motorcycle'}),
  1.1318777799606323),
 (Document(page_content="minimalist design that 's almost invisible features : smart , simple cycle storage for all bike styles . ideal for all bikes : children 's , ladies ' , gents ' and tandems . versatile - store bikes horizontally in left or right orientations . facility to lock the bike in place . perfect for the home , office or retail display . make efficient use of space with a tiered installation . suitable for clipless or platform pedals . discreet wheel rests protect walls from dirt and damage .", metadata={'product_id': 8767, 'product_name': 'hero bicycle wall mount bike rack'}),
  1.1497584581375122),
 (Document(page_content="minimalist design that 's almost invisible features : smart , simple cycle storage for all bike styles versatile - store bikes horizontally in left or right orientations . perfect for the home , office or r

## 4.4 Deploy Model Via MLFlow

#### Define Environment Requirements


In [71]:
import pandas
import langchain
import chromadb

# get base environment configuration
conda_env = mlflow.pyfunc.get_default_conda_env()

# define packages required by model
packages = [
  f'pandas=={pandas.__version__}',
  f'langchain=={langchain.__version__}',
  f'chromadb=={chromadb.__version__}'
  ]

# add required packages to environment configuration
conda_env['dependencies'][-1]['pip'] += packages

print(
  conda_env
  )

{'name': 'mlflow-env', 'channels': ['conda-forge'], 'dependencies': ['python=3.8.3', 'pip<=23.1.2', {'pip': ['mlflow', 'cloudpickle==2.2.1', 'pandas==2.0.1', 'langchain==0.0.157', 'chromadb==0.3.21']}]}


#### Define artifacts
1. Artifacts are assets stored with the model as it is logged with MLflow.  Using keys assigned to these artifacts, those assets can be retrieved for utilization at various points in the model's logic. 

2. The two artifacts needed for our model are the path to the saved model and the Chroma database, both of which were persisted to storage in previous steps.  Please note that these objects were saved to the *Databricks Filesystem* which MLflow understands how to reference.  As a result, we need to alter the paths to these items by replacing the local */dbfs* to *dbfs:*

In [72]:
#Identify Model Artifacts
artifacts = {
  'embedding_model': embedding_model_path,#.replace('/dbfs','dbfs:'), 
  'chromadb': chromadb_path#.replace('/dbfs','dbfs:')
  }

print(
  artifacts
  )

{'embedding_model': '/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/models/embedding_model', 'chromadb': '/Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/work-dir/wands/chromadb'}


#### Define Model Wrapper class
- In the Databricks environment, deployment typically takes place using [MLflow](https://www.databricks.com/product/managed-mlflow), which has the ability to build a containerized service from our model as one of its deployment patterns.  Generic Python models deployed with MLflow typically support a standard API with a *predict* method that's called for inference.  We will need to write a custom wrapper to map a standard interface to our model as follows:

In [73]:
class ProductSearchWrapper(mlflow.pyfunc.PythonModel):

  # define steps to initialize model
  # addresses the steps that need to take place at model initialization. Two of those steps make reference to artifacts within the model's context.
  def load_context(self, context):

    # import required libraries
    import pandas as pd
    from langchain.embeddings import HuggingFaceEmbeddings
    from langchain.vectorstores import Chroma

    # retrieve embedding model
    embedding_model = HuggingFaceEmbeddings(model_name=context.artifacts['embedding_model'])

    # retrieve vectordb contents
    self._vectordb = Chroma(
      persist_directory=context.artifacts['chromadb'],
      embedding_function=embedding_model
      )

    # set number of results to return
    self._max_results = 5


  # define steps to generate results
  # note: query_df expects only one query
  def predict(self, context, query_df):

    # import required libraries
    import pandas as pd 

    # perform search on embeddings
    raw_results = self._vectordb.similarity_search_with_score(
      query_df['query'].values[0], # only expecting one value at a time 
      k=self._max_results
      )

    # get lists of of scores, descriptions and ids from raw results
    scores, descriptions, names, ids = zip(
      *[(r[1], r[0].page_content, r[0].metadata['product_name'], r[0].metadata['product_id']) for r in raw_results]
      )

    # reorganized results as a pandas df, sorted on score
    results_pd = pd.DataFrame({
      'product_id':ids,
      'product_name':names,
      'product_description':descriptions,
      'score':scores
      }).sort_values(axis=0, by='score', ascending=True)
    
    # set return value
    return results_pd


#### Persist Model to MLFlow
1. Notice that in this scenario, our embedding model and Chroma database are being loaded as artifacts and that our *python_model* is just the class definition that provides the logic for hydrating a model from those artifacts:

2. If we use the experiments UI (accessible by clicking the flask icon in the right-hand navigation of your workspace), we can access the details surrounding the model we just logged.  By expanding the folder structure behind the model, we can see the model and vector store assets loaded into MLflow:



In [78]:
# serialize the model to mlruns/models/version_*
#.   each time I run the cmd below, it will create a new wands_basic_search versopm
with mlflow.start_run() as run:

    mlflow.pyfunc.log_model(
        artifact_path='model',
        python_model=ProductSearchWrapper(),
        conda_env=conda_env,
        artifacts=artifacts, # items at artifact path will be loaded into mlflow repository
        registered_model_name=config['basic_model_name']
    )

Registered model 'wands_basic_search' already exists. Creating a new version of this model...
2023/05/03 10:34:58 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: wands_basic_search, version 3
Created version '3' of model 'wands_basic_search'.


#### Elevate to production
- In mlruns/models/wands_basic_search/version-*/meta.yaml. 
```
    current_stage: None | Production
```
    * the models directory appears to be the "model" repository
- The next command will update this meta.yaml file's stage to production

- Loading our model, we can perform a simple test to see results from a sample search. 



In [94]:
client = mlflow.MlflowClient()

latest_version = client.get_latest_versions(config['basic_model_name'], stages=['None'])[0].version

client.transition_model_version_stage(
    name=config['basic_model_name'],
    version=latest_version,
    stage='Production',
    archive_existing_versions=True
)


<ModelVersion: aliases=[], creation_timestamp=1683135298551, current_stage='Production', description=None, last_updated_timestamp=1683136187082, name='wands_basic_search', run_id='4602daae68fc4f7fa4a86a0397f40dcf', run_link=None, source='file:///Users/chang/Documents/dev/git/gratia/03_system/datapipeilne/scaling-machine-learning-course/db-product-search/mlruns/625849527336453425/4602daae68fc4f7fa4a86a0397f40dcf/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>

#### Retrieve model from registry


In [95]:
# appears load_model has a context directory of your current experiment, probably set by mlflow.pyfunc.log_model
model = mlflow.pyfunc.load_model(f"models:/{config['basic_model_name']}/Production")




In [96]:
# Test Persisted Model with Sample Search
search = pd.DataFrame({'query':['farmhouse dining room table']})

# call model
display(model.predict(search))

Unnamed: 0,product_id,product_name,product_description,score
0,14562,rustic dining table,rustic dining table,1.022571
1,14783,industrial solid wood dining table,gather family and friends for good food and go...,1.030153
2,22297,norman dining table,norman dining table,1.064149
3,23646,lockard extendable dining table,anchor your dining room in modern farmhouse st...,1.092422
4,13935,marceline 40 '' console table,farmhouse inspired design will add a charming ...,1.094104


# 5 Fine Tune
- Based on 03_Fine_Tune_Model.py
- Having demonstrated the basics of assembling a model and supporting data to enable a semantic search, we will now focus on fine-tuning the model.  During fine-tuning, the model is fit against a set of data specific to a particular domain, such as our product catalog.  The original knowledge accumulated by our model from its pre-training remains intact but is supplemented with information gleaned from the additional data provided.  Once the model has been tuned to our satisfaction, it is packaged and persisted just like as before.

In [97]:
from sentence_transformers import SentenceTransformer, util, InputExample, losses, evaluation
import torch
from torch.utils.data import DataLoader

from langchain.document_loaders import DataFrameLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

import numpy as np
import pandas as pd

import mlflow

## 5.1 Aggregate product, labels, and query

In [100]:
search_pd = (
  spark   
    .table('products')
    .selectExpr(
      'product_id',
      'product_name',
      'COALESCE(product_description, product_name) as product_text' # use product description if available, otherwise name
      )
    .join(
        df_label, #spark.table('labels'),
        on='product_id'
      )
    .join(
      spark
        .table('queries'),
        on='query_id'
      )
      .selectExpr('query','product_text','label_score as score')
  ).toPandas()

display(search_pd)

Unnamed: 0,query,product_text,score
0,hardwood beds,"good , deep sleep can be quite difficult to ha...",1.00
1,jennie tufted upholstered low profile platform...,"good , deep sleep can be quite difficult to ha...",0.75
2,platform bed side table,"good , deep sleep can be quite difficult to ha...",0.75
3,floating bed,"good , deep sleep can be quite difficult to ha...",0.75
4,upholstered bed,"good , deep sleep can be quite difficult to ha...",0.75
...,...,...,...
233443,wood bar stools,this set of two barstools features a minimalis...,0.75
233444,wine bar,this set of two barstools features a minimalis...,0.00
233445,bar stool 24 inches height,this set of two barstools features a minimalis...,0.75
233446,bar stool with backrest,this set of two barstools features a minimalis...,0.75


## 5.2 Embed product, query, and label

In [None]:
original_model = SentenceTransformer('all-MiniLM-L12-v2')




In [None]:
query_embeddings = (
  original_model
    .encode(
      search_pd['query'].tolist()
      )
  )


In [None]:
product_embeddings = (
  original_model
    .encode(
      search_pd['product_text'].tolist()
      )
  )


#### Calculate cosine similarity between Queries and Products

In [None]:
original_cos_sim_scores = (
  util.pairwise_cos_sim(
    query_embeddings, 
    product_embeddings
    )
  )
print(original_cos_sim_score)


#### Analyze correlation between cosine similarity and human relevancy score

In [None]:
original_corr_coef_score = (
  np.corrcoef(
    original_cos_sim_scores,
    search_pd['score'].values
  )[0][1]
) 
# print results
print(original_corr_coef_score)

## 5.3 Fine Tune Model
- With a baseline measurement of the original model's performance in-hand, we can now fine-tune it using our annotated search result data.  We will start by restructuring our query results into a list of inputs as required by the model:

#### Restructure Data for Model Input

In [None]:
# define function to assemble an input
def create_input(doc1, doc2, score):
  return InputExample(texts=[doc1, doc2], label=score)

# convert each search result into an input
inputs = search_pd.apply(
  lambda s: create_input(s['query'], s['product_text'], s['score']), axis=1
  ).to_list()

inputs

#### Train and tune a new model
- During model fitting, you will notice we are setting the model to perform just one pass (epoch) over the data.  We will actually see pretty sizeable improvements from this process, but we may wish to increase that value to get multiple passes if we want to explore getting more.  The setting for *warmup_steps* is just a common one used in this space.  Feel free to experiment with other values or take the default.

In [None]:
tuned_model = SentenceTransformer('all-MiniLM-L12-v2')

# define instructions for feeding inputs to model
input_dataloader = DataLoader(inputs, shuffle=True, batch_size=16) # feed 16 records at a time to the model

# define loss metric to optimize for
loss = losses.CosineSimilarityLoss(tuned_model)

# tune the model on the input data
tuned_model.fit(
  train_objectives=[(input_dataloader, loss)],
  epochs=1, # just make 1 pass over data
  warmup_steps=100 # controls how many steps over which learning rate increases to max before descending back to zero
  )


#### Estimate Tune Model Performance

In [None]:
query_embeddings = (
  tuned_model
    .encode(
      search_pd['query'].tolist()
      )
  )

product_embeddings = (
  tuned_model
    .encode(
      search_pd['product_text'].tolist()
      )
  )

# determine cosine similarity for each query-product pair
tuned_cos_sim_scores = (
  util.pairwise_cos_sim(
    query_embeddings, 
    product_embeddings
    )
  )

tuned_cos_sim_score = torch.mean(tuned_cos_sim_scores).item()

# display result
print(f"With tuning, avg cosine similarity went from {original_cos_sim_score} to {tuned_cos_sim_score}")


## 5.4 Deploy Fine Tuned Model