In [8]:
import json
import os
# import chromadb
from sentence_transformers import SentenceTransformer
import numpy as np
from dataclasses import dataclass
import os
from google import genai
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chat_models import init_chat_model
import langchain
from langchain_community.document_loaders import TextLoader
# from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_text_splitters import CharacterTextSplitter
# from langchain_chroma import Chroma
# from langchain_community.vectorstores import FAISS
import json
from dataclasses import dataclass
from langchain_text_splitters import CharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
# from transformers import AutoTokenizer
from langchain.prompts import ChatPromptTemplate
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationSummaryBufferMemory


file_path = "listings/metadata/listings_0.json"

# with open(file_path, "r", encoding="utf-8") as f:
#     data = [json.loads(line) for line in f]  # Read each line as a separate JSON object

# n_data_samples = len(data)
# print(n_data_samples)  # Number of JSON objects in the file

In [2]:
api_key = os.getenv('GEMINI_API_KEY')
# client = genai.Client(api_key=api_key)

# For information on the available models, see:
# https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models

In [3]:
@dataclass
class FileAndMeatadata:
    file_string: str
    item_id: str
    main_image_id: str
    other_image_id: str

class VectorStore:
    def __init__(self, file_path,
                 embedding_model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2", 
                 chunk_size=1000, 
                 chunk_overlap=200):
        self._embedding_model_name = embedding_model_name
        self._file_path = file_path
        self._chunk_size = chunk_size
        self._chunk_overlap = chunk_overlap
        self._embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)
        # self._tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)
        self._file_and_metadata_list = self._load_and_process_json()
        self.vectorstore = self._chunk_and_embed()

    def perform_search(self, query: str, top_k=1):
        """Performs a similarity search and returns results."""
        # TODO: pre-process query if it is too long!
        # For this you will need to use self._tokenizer
        docs = self.vectorstore.similarity_search(query, k=top_k)
        return docs

    def _load_and_process_json(self) -> list[FileAndMeatadata]:
        """Loads JSON data, processes it, and returns a list of FileAndMeatadata objects."""
        with open(file_path, 'r', encoding='utf-8') as f:
            return [self._json_to_str(json.loads(line)) for line in f] 
        
    def _json_to_str(self, file: dict) -> FileAndMeatadata:
        item_id = file.pop('item_id', None)
        main_image_id = file.pop('main_image_id', None)
        other_image_id = file.pop('other_image_id', None)

        file.pop('model_number', None)
        file.pop('marketplace', None)
        file.pop("domain_name", None)

        remaining_data_str = self._flatten_json(file)
        file_and_metadata = FileAndMeatadata(file_string=remaining_data_str, 
                                             item_id=item_id, 
                                             main_image_id=main_image_id, 
                                             other_image_id=other_image_id)
        return file_and_metadata
    
    def _flatten_json(self, y):
        """Flatten nested JSON into a plain text format."""
        out = []

        def flatten(x):
            if isinstance(x, dict):
                for v in x.values():
                    flatten(v)
            elif isinstance(x, list):
                for item in x:
                    flatten(item)
            elif isinstance(x, str):
                out.append(x.lower())

        flatten(y)
        return " ".join(out)

    def _chunk_and_embed(self) -> FAISS:
        """Chunks file_string, embeds, and creates a FAISS vector store with metadata."""
        text_splitter = CharacterTextSplitter(chunk_size=self._chunk_size, chunk_overlap=self._chunk_overlap)
        chunks_with_metadata = []

        # TODO: smarter chunking based on the number of tokens insted of the number of characters.
        for item_idx, item in enumerate(self._file_and_metadata_list):
            if len(item.file_string) > self._chunk_size:
                text_chunks = text_splitter.split_text(item.file_string)
                for chunk in text_chunks:
                    chunks_with_metadata.append((chunk, {
                        "item_idx": item_idx,
                        "item_id": item.item_id,
                        "main_image_id": item.main_image_id,
                        "other_image_id": item.other_image_id
                    }))
            else:
                chunks_with_metadata.append((item.file_string, {
                    "item_idx": item_idx,
                        "item_id": item.item_id,
                        "main_image_id": item.main_image_id,
                        "other_image_id": item.other_image_id
                }))

        texts = [chunk[0] for chunk in chunks_with_metadata]
        metadatas = [chunk[1] for chunk in chunks_with_metadata]

        vectorstore = FAISS.from_texts(texts, self._embedding_model, metadatas=metadatas)
        return vectorstore

In [4]:
# Example usage:
vector_store = VectorStore(file_path)

  self._embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)


In [5]:
query = "iphone cover"
results = vector_store.perform_search(query)
results[0].page_content

'inches centimeters inches centimeters inches centimeters en_gb eono en_gb 【personalized design】full-body and 360 degree protection iphone 11case with translucent matte back and flexible shockproof bumper tpu silicone frame,keep your new iphone 11 secure. compatible with iphone 11 pro-6.1" (2019) only. en_gb 【fashion translucent】: the back of this iphone 11 case is made up of translucent hard pc, so it can allow the logo of iphone to be seen through it. it also prevents watermark, fingerprints , giving your iphone 11 a clean look. en_gb 【raised edges protect screen and camera lens】edges of this iphone 11 case are a bit raised, giving you extra protection for screen and lens of your iphone 11 and stop the screen and camera lens getting damaged in case if you drop your iphone 11 device on a flat surface. en_gb 【heavy duty protection】 this iphone 11 case has dual layer structure offers iphone 11 maximum full-body rugged protection . made of hard back case and tpu silicone sides and it giv

In [15]:
# class RAGShoppingAssistant:
#     def __init__(self, vectorstore):
#         self.vectorstore = vectorstore
#         self._llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", api_key=os.getenv('GEMINI_API_KEY'))
#         self._memory = ConversationSummaryBufferMemory.from_llm(llm=self._llm, memory_key="chat_history", return_messages=True)

#         self._rag_chain = self._setup_rag_chain()

#     def _setup_rag_chain(self):
#         """Sets up the RAG chain with conversation memory."""
#         template = """Use the following pieces of context to answer the question at the end.
#         If you don't know the answer, just say you don't know, don't try to make up an answer.
#         Also, keep the conversation going, and remember the previous questions and answers.
#         {context}
#         Question: {question}
#         {chat_history}
#         Answer:"""
#         prompt = ChatPromptTemplate.from_template(template)
#         retriever = self.vectorstore.as_retriever()
#         rag_chain = ConversationalRetrievalChain.from_llm(
#             self._llm,
#             retriever,
#             memory=self._memory,
#             return_source_documents=True,
#         )
#         return rag_chain

#     def chat_with_assistant(self, question):
#         result = self._rag_chain({"question": question})
#         print(f"Answer: {result['answer']}")
#         for doc in result['source_documents']:
#             print(f"  - {doc.page_content} (Metadata: {doc.metadata})")
#         return result['question']


# assistant = RAGShoppingAssistant(vector_store)

In [None]:
# Check out this for RAG
# Just review this one https://python.langchain.com/docs/tutorials/rag/
# Use this one https://python.langchain.com/docs/tutorials/qa_chat_history/

In [None]:
# Basic RAG: search over text, return text and images.

In [None]:
# RAG with text & image embedding.