## Product Categorization

This notebook shows how to use LabelKit to categorize a list of products into a product taxonomy.

We are given a list of product names from an e-commerce marketplace, like

`Kitsch Velvet Scrunchies for Hair, Hair Scrunchies for Women, Scrunchy Hair Bands, 5 Pack (Blush/Mauve)`

And some product categories in the form of a taxonomy, for example the category for the above product might be

`Beauty > Hair Care > Hair Styling`

The goal is to accurately map each product into the best category, given a taxonomy containing 1000+ categories.

### Approach

We'll implement the following multi-step approach:

1. Do a google search on the product name

2. Feed the name and the search results from Step 1 into an LLM to get a short product description

3. Create embeddings of the product categories and store them in a vector store. Then do a nearest neighbor search with the product description created in Step 3.

4. Feed the top N nearest neighbor categories along with the product description into an LLM and ask it to pick the best one

# <p align="center"><img src="image.png" width="700"></p>

Install dependencies, import libraries, and load the data and the taxonomy

In [1]:
# %pip install cohere

import json
import pandas as pd
from labelkit import *
from pydantic import BaseModel, Field
import cohere
import os
import numpy as np
from typing import List

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
taxonomy = pd.read_csv('./taxonomy.csv').fillna("")
taxonomy = list(taxonomy['sector'] + " > " + taxonomy['department'] + " > " + taxonomy['major_category'])
df = pd.read_csv('./data.csv')

### Defining the Pipeline using LabelKit

Now, let's define the first step of the pipeline which uses a Google SERP library to search for the `description` field on the input object.

We do this by using LabelKit's built-in `SERPEnrichmentStep`. You could also easily build your own using a `CustomStep` instead.

In [3]:
def shorten(serp_result):
  top_n = 5
  short = []
  y = json.loads(serp_result).get('organic')
  if y is None:
    return None
  for o in y[:top_n]:
    short.append({
        'title': o['title'],
        'snippet': o['snippet'],
        'link': o['link']
    })
  return short

serp_step = steps.SERPEnrichmentStep(
  params={
  "prompt": lambda row: row['description'], 
  "postprocess": shorten
  }, 
  name="serp")

The second step of the pipeline takes the product description and the search results and feeds them into an LLM to get a better description. We create this step using LabelKit's `LLMStep`.

An `LLMStep` instance takes a Pydantic model and a prompt generator function as arguments. The pydantic model specifies the output structure (remember every `LLMStep` creates structured output). The prompt generator function defines how to generate a prompt from the input data.

In [4]:
short_description_prompt = lambda row: f"""
You are given a product description and a list of google search results about a product.
Return a single sentence decribing the product.
Product description: {row['description']}
Search results:
{row['serp']}
"""

class ShortDescription(BaseModel):
  short_description: str = Field(description="A single sentence describing the product")
  
short_description_step = steps.LLMStep(
  params={
    "prompt": short_description_prompt,
    "model": models.gpt35
  },
  out_model=ShortDescription,
  name="short_description"
)

Next we define the embedding classification step which creates a vector embedding from the taxonomy, and then finds the top 5 nearest neighbors for each input data point based on the short description generated in the previous step. 

We can use the built-in `EmbeddingClassificationStep` and provide it an `embed` function. You can use any embedding provider here. We use Cohere in this example.

In [5]:
# set your cohere api key as an env var or set it directly here
COHERE_API_KEY = os.environ.get('COHERE_API_KEY')
co = cohere.Client(COHERE_API_KEY)

def embed(texts: List[str]):
  embeddings = co.embed(
    model="embed-english-v3.0",
    texts=texts,
    input_type='classification'
  ).embeddings
  return np.array(embeddings).astype('float32')

embedding_search_prompt = lambda row: row["short_description"]

embedding_search_step = steps.EmbeddingClassificationStep(
  params={
    "search_prompt": embedding_search_prompt,
    "embed": embed,
    "k": 5,    
  },
  categories=taxonomy,
  name="embedding_search"
)

Finally we take the top 5 categories selected by the embedding step, feed them into an LLM query and ask it to pick the index of the best one. We use an `LLMStep` for this.

In [6]:

def categorize_prompt(row):
    categories = ""
    i = 1
    while f"category{i}" in row:
        categories += f'{i}. {row[f"category{i}"]}\n'
        i += 1

    return f"""
    You are given a product description and {i-1} options for the product's category.
    Pick the index of the most accurate category.
    The index must be between 1 and {i-1}.
    Product description: {row['short_description']}
    Categories:
    {categories}
    """
    
class CategoryIndex(BaseModel):
    category_index: int = Field(description="The index of the most accurate category")
    
categorize_step = steps.LLMStep(
  params={
    "prompt": categorize_prompt,
    "model": models.gpt35
  },
  out_model=CategoryIndex,  
  name="categorize"
)

The previous step output a category index but we want the actual category, so we need to map the index to the category. We'll create a simple `CustomStep` that simply grabs the `category{i}` field that was created in the embedding search step.

In [7]:
class Category(BaseModel):
    category: str = Field(description="The most accurate category")

select_category_step = steps.CustomStep(
  params={
    "transform": lambda row: {"category": row[f'category{row["category_index"]}']}
  },
  out_model=Category,
  name="select_category"
)

We're done defining the steps. Finally, we define an evaluation function - a simple string comparison against the ground truth column which was present in the dataset. Then we define a LabelKit `Pipeline` and run it.

In [None]:
evaluate = lambda row: row['category'].lower() == row['gpt4_category'].lower()

categorizer = pipeline.Pipeline([
  serp_step, 
  short_description_step, 
  embedding_search_step, 
  categorize_step,
  select_category_step
], evaluate)

categorizer.apply(df)

### Evaluating accuracy, token usage and latency

LabelKit makes it easy to:

- Evaluate the accuracy of your pipeline if your dataset has a ground truth column. If you passed in an `evaluate` function you can call `Pipeline.score` to get the accuracy score
- Track the token usage and latency for each row or in aggregate over the entire dataset. To get the aggregate statistics, call `Pipeline.statistics`

In [12]:
print(f"Accuracy: {categorizer.score}")
print(f"Statistics: {categorizer.statistics.model_dump_json()}")

Accuracy: 0.7
Statistics: {"input_tokens":{"gpt-3.5-turbo-0125":8114},"output_tokens":{"gpt-3.5-turbo-0125":557},"num_success":10,"num_failure":0,"total_latency":18.613963379291818}


### Tuning the pipeline

In [10]:
# TODO