In [None]:
!pip install sentence-transformers faiss-cpu google-generativeai pandas numpy scikit-learn matplotlib seaborn --quiet

In [None]:
import pandas as pd
import numpy as np
import os
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer
import faiss
from sklearn.metrics.pairwise import cosine_similarity
import json
import time
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict, Tuple, Optional
import warnings
import kagglehub
from kagglehub import KaggleDatasetAdapter
import google.generativeai as genai

warnings.filterwarnings('ignore')
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('stopwords')

In [None]:
CONFIG = {
    'similarity_threshold': 0.8,
    'fallback_threshold': 0.6,
    'embedding_model': 'all-MiniLM-L6-v2',
    'gemini_model': 'gemini-2.0-flash',
    'gemini_api_key': '',
}

embedding_model = SentenceTransformer(CONFIG['embedding_model'])

genai.configure(api_key=CONFIG['gemini_api_key'])
gemini_model = genai.GenerativeModel(CONFIG['gemini_model'])

In [None]:
class FAQDatabase:
    def __init__(self):
        self.faq_data = {}

    def add_faq(self, asin: str, faq_entry):
        if asin not in self.faq_data:
            self.faq_data[asin] = []
        self.faq_data[asin].append(faq_entry)

    def get_faqs(self, asin: str):
        return self.faq_data.get(asin, [])

    def update_usage(self, asin: str, question):
        if asin in self.faq_data:
            for faq in self.faq_data[asin]:
                if faq['question'] == question:
                    faq['usage_count'] += 1
                    break

    def get_total_faqs(self):
        return sum(len(faqs) for faqs in self.faq_data.values())

    def get_products_count(self):
        return len(self.faq_data)

In [None]:
file_path_products = "amazon_products.csv"
products_df = kagglehub.load_dataset(
    KaggleDatasetAdapter.PANDAS,
    "asaniczka/amazon-products-dataset-2023-1-4m-products",
    file_path_products
)

file_path_categories = "amazon_categories.csv"
categories_df = kagglehub.load_dataset(
    KaggleDatasetAdapter.PANDAS,
    "asaniczka/amazon-products-dataset-2023-1-4m-products",
    file_path_categories
)

print("Products shape:", products_df.shape)
print("Categories shape:", categories_df.shape)

In [None]:
products_df.info()

In [None]:
categories_df.info()

In [None]:
products_df = products_df.dropna().reset_index(drop=True)

In [None]:
products_df['asin'] = products_df['asin'].astype(str)

In [None]:
file_path_single = "single_qna.csv"
qna_df = kagglehub.load_dataset(
    KaggleDatasetAdapter.PANDAS,
    "praneshmukhopadhyay/amazon-questionanswer-dataset",
    file_path_single
)

print("Single QnA shape:", qna_df.shape)

In [None]:
qna_df.info()

In [None]:
qna_df = qna_df[['Asin', 'Question', 'Answer']].copy()
qna_df.rename(columns={'Asin': 'asin'}, inplace=True)

In [None]:
qna_df['asin'] = qna_df['asin'].astype(str)
qna_df = qna_df.dropna().reset_index(drop=True)
qna_df = qna_df.drop_duplicates().reset_index(drop=True)

In [None]:
merged_df = qna_df.merge(products_df, on='asin', how='inner')

In [None]:
merged_df.info()

In [None]:
merged_df = merged_df.merge(
    categories_df.rename(columns={'id': 'category_id'}),
    on='category_id',
    how='left'
)
merged_df.drop(columns=['category_id', 'boughtInLastMonth'], inplace=True)
merged_df.head()

In [None]:
merged_df.info()

In [None]:
class StandardizedFAQEntry:
    def __init__(self, question, answer, asin: str, confidence=1.0):
        self.question = question
        self.answer = answer
        self.processed_question = self.preprocess_text(question)
        self.processed_answer = self.preprocess_text(answer)
        self.asin = asin
        self.confidence = confidence
        self.created_at = datetime.now().isoformat()
        self.usage_count = 0

    @staticmethod
    def preprocess_text(text):
        stopwords_set = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}
        lemmatizer = WordNetLemmatizer()

        text = text.lower()
        text = re.sub(r"<.*?>", " ", text)
        text = re.sub(r"[^a-z0-9$%.,!?'\\s]", " ", text)
        text = re.sub(r"\\s+", " ", text)
        tokens = nltk.word_tokenize(text)
        tokens = [lemmatizer.lemmatize(word) for word in tokens if word not in stopwords_set]
        return " ".join(tokens)

    def to_dict(self):
        return {
            'question': self.question,
            'answer': self.answer,
            'processed_question': self.processed_question,
            'processed_answer': self.processed_answer,
            'asin': self.asin,
            'confidence': self.confidence,
            'created_at': self.created_at,
            'usage_count': self.usage_count
        }

In [None]:
stopwords_set = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}

lemmatizer = WordNetLemmatizer()

def preprocess_text(text):
    text = text.lower()
    text = re.sub(r"<.*?>", " ", text)
    text = re.sub(r"[^a-z0-9$%.,!?'\s]", " ", text)
    text = re.sub(r"\s+", " ", text)
    tokens = nltk.word_tokenize(text)
    tokens = [lemmatizer.lemmatize(word) for word in tokens if word not in stopwords_set]
    return " ".join(tokens)

merged_df["Question_processed"] = merged_df["Question"].apply(preprocess_text)
merged_df["Answer_processed"]   = merged_df["Answer"].apply(preprocess_text)
merged_df["title_processed"] = merged_df["title"].apply(preprocess_text)
merged_df["category_processed"]   = merged_df["category_name"].apply(preprocess_text)
merged_df.head()

In [None]:
embeddings = {}

embeddings['questions'] = embedding_model.encode(merged_df['Question_processed'].tolist(), show_progress_bar=True)
PRODUCT_EMBEDDINGS = embeddings

embeddings['questions'].shape

In [None]:
class SemanticSearchEngine:
    def __init__(self, embeddings, df):
        self.embeddings = embeddings
        self.df = df
        self.index = faiss.IndexFlatIP(self.embeddings['questions'].shape[1])

        normalized_embeddings = self.embeddings['questions'].copy()
        faiss.normalize_L2(normalized_embeddings)
        self.index.add(normalized_embeddings)

    def search_similar_questions(self, query, top_k=5, similarity_threshold=0.5):
        processed_query = StandardizedFAQEntry.preprocess_text(query)
        query_embedding = embedding_model.encode([processed_query])

        faiss.normalize_L2(query_embedding)
        similarities, indices = self.index.search(query_embedding, top_k)

        results = []
        for i, (similarity, idx) in enumerate(zip(similarities[0], indices[0])):
            if similarity >= similarity_threshold:
                row = self.df.iloc[idx]
                results.append({
                    'similarity': float(similarity),
                    'question': row['Question'],
                    'answer': row['Answer'],
                    'asin': row['asin'],
                    'title': row['title'],
                    'category': row['category_name'],
                    'index': int(idx)
                })

        return results

    def search_by_product(self, asin: str, query, top_k=3, fallback_threshold=0.5):
        product_mask = self.df['asin'] == asin
        if not product_mask.any():
            return []

        product_indices = self.df[product_mask].index.tolist()
        product_embeddings = self.embeddings['questions'][product_indices]

        temp_index = faiss.IndexFlatIP(product_embeddings.shape[1])
        normalized_product_embeddings = product_embeddings.copy()
        faiss.normalize_L2(normalized_product_embeddings)
        temp_index.add(normalized_product_embeddings)

        processed_query = StandardizedFAQEntry.preprocess_text(query)
        query_embedding = embedding_model.encode([processed_query])
        faiss.normalize_L2(query_embedding)

        similarities, indices = temp_index.search(query_embedding, min(top_k, len(product_indices)))

        results = []
        for similarity, idx in zip(similarities[0], indices[0]):
            if similarity >= fallback_threshold:
                original_idx = product_indices[idx]
                row = self.df.iloc[original_idx]
                results.append({
                    'similarity': float(similarity),
                    'question': row['Question'],
                    'answer': row['Answer'],
                    'asin': row['asin'],
                    'title': row['title'],
                    'index': original_idx
                })

        return results

In [None]:
class QueryMonitor:
    def __init__(self):
        self.query_log = []

    def log_query(self, query, asin: str):
        query_entry = {
            'timestamp': datetime.now().isoformat(),
            'query': query,
            'processed_query': StandardizedFAQEntry.preprocess_text(query),
            'asin': asin,
            'resolved': False,
            'response': None
        }

        self.query_log.append(query_entry)
        return len(self.query_log) - 1

    def update_query_result(self, query_id: int, response: dict):
        self.query_log[query_id]['response'] = response
        self.query_log[query_id]['resolved'] = response.get('status') in ['resolved', 'generated']

    def get_unresolved_queries(self):
        return [q for q in self.query_log if not q.get('resolved', False)]

    def get_query_stats(self):
        total_queries = len(self.query_log)
        if total_queries == 0:
            return {'total_queries': 0, 'resolved': 0, 'resolution_rate': 0, 'unresolved_queries': 0}

        resolved = sum(1 for q in self.query_log if q.get('resolved', False))

        return {
            'total_queries': total_queries,
            'resolved': resolved,
            'resolution_rate': resolved / total_queries,
            'unresolved_queries': total_queries - resolved
        }

In [None]:
class FAQGenerator:
    def __init__(self, faq_database, gemini_model):
        self.generation_history = []
        self.gemini_model = gemini_model
        self.faq_database = faq_database
        self.search_engine = None

    def set_search_engine(self, search_engine):
        self.search_engine = search_engine

    def generate_answer_with_gemini(self, question, product_info, similar_faqs):

        context = self.prepare_context(similar_faqs)

        prompt = f"""
You are a helpful e-commerce customer service AI assistant.
Answer the customer's question based on the provided product information and similar FAQs.

CUSTOMER QUESTION: {question}

PRODUCT INFORMATION:
- Product Name: {product_info.get('title', 'N/A')}
- Category: {product_info.get('category_name', 'N/A')}
- Price: ${product_info.get('price', 'N/A')}
- Rating: {product_info.get('stars', 'N/A')} stars ({product_info.get('reviews', 0)} reviews)

{context}

GUIDELINES:
1. Be helpful, accurate, and concise
2. If you don't have specific information, say so clearly
3. Keep the answer under 200 words
4. Use a friendly, professional tone
5. For unknown questions, advise contacting support

Answer:"""

        try:
            response = self.gemini_model.generate_content(prompt)
            return response.text.strip()

        except Exception as e:
            print(f"Gemini API error: {e}")
            return self.generate_fallback_answer(question, product_info, similar_faqs)

    def prepare_context(self, similar_faqs):
        context = ""

        if similar_faqs and len(similar_faqs) > 0:
            context = "SIMILAR QUESTIONS AND ANSWERS:\n"
            for i, faq in enumerate(similar_faqs[:3], 1):
                context += f"{i}. Q: {faq['question']}\n"
                context += f"   A: {faq['answer']}\n\n"
        return context

    def generate_fallback_answer(self, question, product_info, similar_faqs):

        if similar_faqs and len(similar_faqs) > 0:
            best_match = similar_faqs[0]
            return f"Based on similar questions about this product: {best_match['answer']} (Note: This is from a similar question as our AI assistant is temporarily unavailable)"

        return f"Thank you for your question about {product_info.get('title', 'this product')}. I'm currently unable to provide a specific answer, but I recommend checking the product page for detailed information or contacting our customer support team for personalized assistance."

    def generate_answer_with_context(self, question, asin: str, df):

        product_rows = df[df['asin'] == asin]

        if product_rows.empty:
            return {'error': 'Product not found', 'answer': 'Sorry, I could not find any information about this product.'}

        product_row = product_rows.iloc[0]

        product_info = {
            'title': product_row['title'],
            'category_name': product_row['category_name'],
            'price': product_row.get('price'),
            'stars': product_row.get('stars'),
            'reviews': product_row.get('reviews', 0)
        }

        similar_faqs = self.search_engine.search_by_product(asin, question, top_k=3) if self.search_engine else []
        answer = self.generate_answer_with_gemini(question, product_info, similar_faqs)

        faq_entry = StandardizedFAQEntry(
            question=question,
            answer=answer,
            asin=asin,
            confidence=0.8 if similar_faqs else 0.5
        )

        self.faq_database.add_faq(asin, faq_entry.to_dict())

        result = faq_entry.to_dict()
        result.update({
            'product_title': product_info['title'],
            'generated_at': datetime.now().isoformat(),
            'similar_faqs_count': len(similar_faqs)
        })

        self.generation_history.append(result)
        return result

In [None]:
class DynamicFAQSystem:
    def __init__(self, df, embeddings, faq_database, gemini_model):
        self.df = df
        self.embeddings = embeddings
        self.faq_database = faq_database
        self.search_engine = SemanticSearchEngine(embeddings, df)
        self.query_monitor = QueryMonitor()
        self.faq_generator = FAQGenerator(faq_database, gemini_model)
        self.faq_generator.set_search_engine(self.search_engine)

    def process_customer_query(self, query, asin: str):
        query_id = self.query_monitor.log_query(query, asin)

        similar_faqs = self.search_engine.search_by_product(asin, query, top_k=3, fallback_threshold=CONFIG['fallback_threshold'])
        if not similar_faqs:
            similar_faqs = self.search_engine.search_similar_questions(query, top_k=5, similarity_threshold=CONFIG['similarity_threshold'])

        if similar_faqs:
            product_rows = self.df[self.df['asin'] == asin]
            if not product_rows.empty:
                product_row = product_rows.iloc[0]
                product_info = {
                    'title': product_row['title'],
                    'category_name': product_row['category_name'],
                    'price': product_row.get('price'),
                    'stars': product_row.get('stars'),
                    'reviews': product_row.get('reviews', 0)
                }
            else:
                product_info = {'title': 'Unknown Product', 'category_name': 'Unknown', 'price': 'N/A', 'stars': 'N/A', 'reviews': 0}

            try:
                answer = self.faq_generator.generate_answer_with_gemini(query, product_info, similar_faqs)

                response = {
                    'status': 'resolved',
                    'method': 'gemini_processed',
                    'question': query,
                    'answer': answer,
                    'confidence': similar_faqs[0]['similarity'],
                    'source_asin': similar_faqs[0]['asin'],
                    'similar_faqs': similar_faqs[:3],
                    'processed_by_gemini': True
                }

                self.faq_database.update_usage(similar_faqs[0]['asin'], similar_faqs[0]['question'])

            except Exception as e:
                best_match = similar_faqs[0]
                response = {
                    'status': 'resolved',
                    'method': 'existing_faq_fallback',
                    'question': best_match['question'],
                    'answer': best_match['answer'],
                    'confidence': best_match['similarity'],
                    'source_asin': best_match['asin'],
                    'similar_faqs': similar_faqs[:3],
                    'processed_by_gemini': False
                }
                self.faq_database.update_usage(best_match['asin'], best_match['question'])

        else:
            generated_faq = self.faq_generator.generate_answer_with_context(query, asin, self.df)
            response = {
                'status': 'generated',
                'method': 'ai_generated',
                'answer': generated_faq.get('answer'),
                'confidence': generated_faq.get('confidence'),
                'generated_faq': generated_faq
            }

        self.query_monitor.update_query_result(query_id, response)
        return response

    def get_system_stats(self):
        return {
            'query_stats': self.query_monitor.get_query_stats(),
            'faq_database_size': self.faq_database.get_total_faqs(),
            'products_with_faqs': self.faq_database.get_products_count(),
            'generated_faqs': len(self.faq_generator.generation_history)
        }

In [None]:
faq_db = FAQDatabase()
faq_system = DynamicFAQSystem(merged_df, embeddings, faq_db, gemini_model)

In [None]:
sample_asins = merged_df['asin'].head(10).tolist()
test_cases = [
    {
        'query': 'How long does shipping take?',
        'asin': sample_asins[0],
        'description': 'Common shipping question'
    },
    {
        'query': 'What is the warranty on this item?',
        'asin': sample_asins[0],
        'description': 'Warranty question (likely to need generation)'
    },
    {
        'query': 'Is this product waterproof?',
        'asin': sample_asins[1] if len(sample_asins) > 1 else sample_asins[0],
        'description': 'Product feature question'
    },
    {
        'query': 'Can I return this if I don\'t like it?',
        'asin': sample_asins[1] if len(sample_asins) > 1 else sample_asins[0],
        'description': 'Return policy question'
    },
    {
        'query': 'What colors are available?',
        'asin': sample_asins[0],
        'description': 'Product variation question'
    },

    # --- Less Usual but Valid Customer Questions ---
    {
        'query': 'Does it come with a user manual in Spanish?',
        'asin': sample_asins[2] if len(sample_asins) > 2 else sample_asins[0],
        'description': 'Language / documentation question'
    },
    {
        'query': 'Is it compatible with 220V power outlets?',
        'asin': sample_asins[3] if len(sample_asins) > 3 else sample_asins[0],
        'description': 'Compatibility with electrical standard'
    },
    {
        'query': 'What materials is it made from?',
        'asin': sample_asins[4] if len(sample_asins) > 4 else sample_asins[0],
        'description': 'Material composition question'
    },
    {
        'query': 'How do I assemble it? Is any tool included?',
        'asin': sample_asins[5] if len(sample_asins) > 5 else sample_asins[0],
        'description': 'Assembly / tools question'
    },
    {
        'query': 'Can I schedule delivery for a specific date?',
        'asin': sample_asins[6] if len(sample_asins) > 6 else sample_asins[0],
        'description': 'Special delivery scheduling'
    },
    {
        'query': 'asdasd asd 123 $$$',
        'asin': sample_asins[1] if len(sample_asins) > 1 else sample_asins[0],
        'description': 'Gibberish input'
    },
    {
        'query': 'Who won the World Cup in 2018?',
        'asin': sample_asins[0],
        'description': 'Totally unrelated question'
    },
    {
        'query': 'What is the environmental impact of producing this item?',
        'asin': sample_asins[3] if len(sample_asins) > 3 else sample_asins[0],
        'description': 'Sustainability question'
    },
    {
        'query': 'Is this better than the previous model?',
        'asin': sample_asins[5] if len(sample_asins) > 5 else sample_asins[0],
        'description': 'Comparative product question'
    },
    {
        'query': 'If I buy 3, can I get a discount?',
        'asin': sample_asins[6] if len(sample_asins) > 6 else sample_asins[0],
        'description': 'Bulk purchase discount inquiry'
    }
]

In [None]:
results = []
for i, test_case in enumerate(test_cases, start=1):
    print(f"Test {i}: {test_case['description']}")
    print(f"Query: '{test_case['query']}'")
    print(f"Product ASIN: {test_case['asin']}")

    product_rows = merged_df[merged_df['asin'] == test_case['asin']]
    if not product_rows.empty:
        product_info = product_rows.iloc[0]
        print(f"Product: {product_info['title']}")
    else:
        product_info = None
        print("Product not found")

    try:
        response = faq_system.process_customer_query(
            test_case['query'],
            test_case['asin']
        )

        print(f"Status: {response['status'].upper()}")
        print(f"Method: {response['method']}")
        print(f"Confidence: {response.get('confidence', 0):.3f}")

        if response['status'] == 'resolved':
            print(f"Source ASIN: {response['source_asin']}")

        question = response.get('question', 'No question provided')
        print(f"Question: {question}")
        answer = response.get('answer', 'No answer provided')
        print(f"Answer: {answer}")

        results.append({
            'test_case': test_case,
            'response': response,
            'success': True
        })

    except Exception as e:
        print(f"Error: {e}")
        results.append({
            'test_case': test_case,
            'response': None,
            'success': False,
            'error': str(e)
        })

    print("\n\n")

print("System Statistics:")
try:
    stats = faq_system.get_system_stats()
    print(f"Total queries processed: {stats['query_stats']['total_queries']}")
    print(f"Queries resolved: {stats['query_stats']['resolved']}")
    print(f"Resolution rate: {stats['query_stats']['resolution_rate']:.2%}")
    print(f"FAQ database size: {stats['faq_database_size']}")
    print(f"Products with FAQs: {stats['products_with_faqs']}")
    print(f"Generated FAQs: {stats['generated_faqs']}")
except Exception as e:
    print(f"Error: {e}")

successful_tests = sum(1 for r in results if r['success'])
print("\nTest Summary:")
print(f"Successful tests: {successful_tests}/{len(results)}")
if successful_tests < len(results):
    print(f"Failed tests: {len(results) - successful_tests}")
