In [None]:
!pip cache purge  # Clears cached packages
!pip install --no-cache-dir faiss-cpu


[0mFiles removed: 0
Collecting faiss-cpu
  Downloading faiss_cpu-1.11.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.8 kB)
Downloading faiss_cpu-1.11.0-cp311-cp311-manylinux_2_28_x86_64.whl (31.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.3/31.3 MB[0m [31m43.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.11.0


In [None]:
import json
import re
import requests
import pandas as pd
from sentence_transformers import SentenceTransformer
import faiss

# Hugging Face API Setup
HUGGINGFACE_API_URL = "https://api-inference.huggingface.co/models/mistralai/Mixtral-8x7B-Instruct-v0.1"
HEADERS = {"Authorization": "Bearer YOUR API KEY HERE"}

# Load dataset
data = pd.read_excel("/content/drive/MyDrive/pre_boston(4).xlsx")

# Updated Category mappings
category_mapping = {
    "laptops": ["laptop", "macbook"],
    "smartphone": ["smartphone", "phone", "iphone", "galaxy", "mobile", "mobiles", "phones"],
    "basic cases": ["case", "cover"],
    "headphones": ["headphone", "earphone", "earbuds"],
    "laptop bags": ["laptop bag", "backpack"],
    "laptop charger": ["laptop charger", "macbook charger"],
    "phone charger": ["phone charger", "mobile charger"],
    "screen protector": ["screen protector", "tempered glass"],
    "mouse": ["mouse", "wireless mouse"]
}

# Load embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')

# Generate embeddings
data['text'] = data['product_title'] + " " + data['model'] + " " + data['features']
embeddings = model.encode(data['text'].tolist(), convert_to_numpy=True)

# Create FAISS index
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(embeddings)

# Stores previous query context for follow-ups
previous_results = None

# Stopwords to remove from queries
stopwords = {"i", "am", "looking", "for", "want", "to", "buy", "need", "a", "an", "the", "of", "with"}


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [None]:
def remove_stopwords(query):
    words = query.lower().split()
    filtered_words = [word for word in words if word not in stopwords]
    return " ".join(filtered_words)

def query_huggingface(payload):
    response = requests.post(HUGGINGFACE_API_URL, headers=HEADERS, json=payload)
    try:
        return response.json()
    except json.JSONDecodeError:
        #print("Error: Response is not valid JSON!")
        return None

def extract_query_info(user_query):
    # Apply stopword removal to the user query before passing to the LLM
    processed_query = remove_stopwords(user_query)
    prompt = f"""
    You are an AI assistant that extracts key details from user queries for product recommendations.
    Identify the product category (from: {list(category_mapping.keys())}), brand (if mentioned),
    model (if mentioned), minimum price (if specified), and maximum price (if specified) from the following query.

    Return the output in **valid JSON format** with these keys:
    {{"brand": "...", "model": "...", "category": "...", "min_price": ..., "max_price": ...}}
    If a specific category is not explicitly mentioned, try to infer it based on keywords.
    If price is not mentioned, the value should be null.

    User Query: "{processed_query}"
    JSON Output:
    """
    response = query_huggingface({"inputs": prompt})
    if response is None or not isinstance(response, list) or len(response) == 0:
        print("Error: LLM response is empty or invalid!")
        return {}
    raw_text = response[0].get("generated_text", "").strip()
    try:
        extracted_info = json.loads(raw_text)
        # Post-process category to match the keys in category_mapping
        if "category" in extracted_info and extracted_info["category"]:
            extracted_info["category"] = extracted_info["category"].lower()
            for key, values in category_mapping.items():
                if extracted_info["category"] in values:
                    extracted_info["category"] = key
                    break
            else:
                print(f"Warning: Extracted category '{extracted_info['category']}' not found in mapping.")
                extracted_info.pop("category", None) # Remove if not found
        return extracted_info
    except json.JSONDecodeError:
        # Remove or comment out this line to prevent the error message from being displayed
        # print("Error: Extracted info is not valid JSON!", raw_text)
        return {}

def retrieve_products(query, top_k=50):
    query_embedding = model.encode([query], convert_to_numpy=True)
    D, I = index.search(query_embedding, top_k)
    return data.iloc[I[0]], D[0] # Return both the products and the distances

def filter_and_rank_products(user_query, retrieved_products_df, distances, extracted_info):
    brand = extracted_info.get("brand", "").lower()
    model = extracted_info.get("model", "").lower()
    category = extracted_info.get("category", "").lower() if extracted_info.get("category") else None
    min_price = extracted_info.get("min_price")
    max_price = extracted_info.get("max_price")

    exact_brand_model_matches = pd.DataFrame()
    exact_brand_matches = pd.DataFrame()
    other_matches = pd.DataFrame()
    exact_brand_model_indices = []
    exact_brand_indices = []

    if brand:
        brand_match = retrieved_products_df[retrieved_products_df["brand"].str.lower() == brand]
        if not brand_match.empty and model:
            exact_brand_model_matches = brand_match[brand_match["model"].str.lower() == model]
            if not exact_brand_model_matches.empty:
                exact_brand_model_indices.extend(exact_brand_model_matches.index)
        elif not brand_match.empty:
            exact_brand_matches = brand_match[~brand_match["model"].str.lower().str.contains(model, na=False)] # Exclude partial model matches
            exact_brand_indices.extend(exact_brand_matches.index)

    # Get the remaining products
    remaining_matches = retrieved_products_df[~retrieved_products_df.index.isin(exact_brand_model_indices + exact_brand_indices)]

    # Filter by category
    if category:
        exact_brand_model_matches = exact_brand_model_matches[
            exact_brand_model_matches["category"].str.lower().str.contains(category, case=False, na=False)
        ]
        exact_brand_matches = exact_brand_matches[
            exact_brand_matches["category"].str.lower().str.contains(category, case=False, na=False)
        ]
        remaining_matches = remaining_matches[
            remaining_matches["category"].str.lower().str.contains(category, case=False, na=False)
        ]

    # Price Filtering
    def filter_by_price(df, min_p, max_p):
        filtered_df = df.copy()
        if min_p is not None and max_p is not None:
            filtered_df = filtered_df[(filtered_df["price"] >= min_p) & (filtered_df["price"] <= max_p)]
        elif max_p is not None:
            filtered_df = filtered_df[filtered_df["price"] <= max_p]
        elif min_p is not None:
            filtered_df = filtered_df[filtered_df["price"] >= min_p]
        return filtered_df

    exact_brand_model_matches = filter_by_price(exact_brand_model_matches, min_price, max_price)
    exact_brand_matches = filter_by_price(exact_brand_matches, min_price, max_price)
    remaining_matches = filter_by_price(remaining_matches, min_price, max_price)

    # Rank exact brand and model matches
    ranked_exact_brand_model = exact_brand_model_matches.sort_values(by=["review_count", "rating"], ascending=[False, False]) if not exact_brand_model_matches.empty else pd.DataFrame()

    # Rank exact brand matches (excluding the specific model)
    ranked_exact_brand = exact_brand_matches.sort_values(by=["review_count", "rating"], ascending=[False, False]) if not exact_brand_matches.empty else pd.DataFrame()

    # Rank other matches by relevance (distance), then review count and rating
    if not remaining_matches.empty:
        remaining_match_distances_indices = [i for idx, i in enumerate(retrieved_products_df.index) if i in remaining_matches.index]
        remaining_match_distances = distances[retrieved_products_df.index.isin(remaining_matches.index)]
        if len(remaining_match_distances) == len(remaining_matches):
            remaining_matches['distance'] = remaining_match_distances
            ranked_remaining = remaining_matches.sort_values(by=['distance', "review_count", "rating"], ascending=[True, False, False])
            remaining_matches.drop(columns=['distance'], inplace=True)
        else:
            ranked_remaining = remaining_matches.sort_values(by=["review_count", "rating"], ascending=[False, False])
    else:
        ranked_remaining = pd.DataFrame()

    # Combine and return the ranked results, prioritizing exact brand and model
    final_results = pd.concat([ranked_exact_brand_model, ranked_exact_brand, ranked_remaining])
    return final_results

def generate_response(user_query):
    global query_memory
    # Apply stopword removal to the user query before retrieval
    processed_query = remove_stopwords(user_query)
    retrieved_products, distances = retrieve_products(processed_query, top_k=50)
    if retrieved_products.empty:
        return json.dumps({"response": "No relevant products found."}, indent=4)

    extracted_info = extract_query_info(user_query) # Use the original query for LLM to capture context
    filtered_products = filter_and_rank_products(user_query, retrieved_products, distances, extracted_info)

    if filtered_products.empty:
        return json.dumps({"response": "No products found after filtering."}, indent=4)

    query_memory = {
        "category": extracted_info.get("category"),
        "brand": extracted_info.get("brand"),
        "model": extracted_info.get("model"),
        "min_price": extracted_info.get("min_price"),
        "max_price": extracted_info.get("max_price"),
        "retrieved_products": filtered_products[:10] # Store only the top 10
    }

    return json.dumps({
        "query": user_query,
        "recommended_products": filtered_products[:10].to_dict(orient="records"), # Return only the top 10
        "response": "Top results based on your query."
    }, indent=4)

def handle_follow_up(follow_up_query):
    global query_memory
    processed_follow_up = remove_stopwords(follow_up_query)
    if not query_memory:
        return json.dumps({"response": "No previous query found."}, indent=4)

    products = query_memory["retrieved_products"]
    if "cheaper" in processed_follow_up:
        products = products.sort_values(by="price", ascending=True)
    elif "expensive" in processed_follow_up:
        products = products.sort_values(by="price", ascending=False)

    if products.empty:
        return json.dumps({"response": "No matching follow-up products."}, indent=4)

    return json.dumps({
        "query": follow_up_query,
        "recommended_products": products[:10].to_dict(orient="records"), # Return only the top 10
        "response": "Follow-up recommendations."
    }, indent=4)

while True:
    user_query = input("Enter your query (or type 'exit' to stop): ").strip()
    if user_query.lower() == "exit":
        print("Goodbye!")
        break
    if any(keyword in remove_stopwords(user_query) for keyword in ["cheaper", "expensive"]):
        print(handle_follow_up(user_query))
    else:
        print(generate_response(user_query))

{
    "query": "iphone 14 pro max",
    "recommended_products": [
        {
            "product_id": "B0B82D4RWY",
            "category": "Basic Cases",
            "product_title": "SUPCASE for iPhone 14 Pro Max Case (Unicorn Beetle Mag), [Compatible with MagSafe] [Military-Grade Protection] Protective Slim Clear Magnetic Shockproof Phone Case for iPhone 14 Pro Max, Mauve",
            "brand": "SUPCASE",
            "model": "iPhone 14 Pro Max",
            "features": "['Hard PC back + shock-absorbent TPU bumper provides your iPhone 14 Pro Max effective protection.', 'Back cover is transparent and is extremely scratch-resistant.', 'Elevated bezels help ensure touchscreen and camera lens avoid scratches when placed facedown.', 'Easily access all of your smartphone features, functions and ports thanks to the precise design of case cutouts.', 'Compatible with iPhone 14 Pro Max 6.7 Inch (2022 Release) ONLY. Compatible with MagSafe and Most Other Wireless Chargers.']",
            "pri