In [1]:

import json
import os
from pathlib import Path
import re
from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, TypeVar, Union
from langchain.storage import LocalFileStore
from langchain_core.documents import Document

from langchain.storage.exceptions import InvalidKeyException
from langchain_core.stores import BaseStore


class CIAMDocumentFileStore(BaseStore[str, Document]):

    def __init__(
        self,
        root_path: Union[str, Path],
        *,
        chmod_file: Optional[int] = None,
        chmod_dir: Optional[int] = None,
        update_atime: bool = False,
    ) -> None:
        """Implement the BaseStore interface for the local file system.

        Args:
            root_path (Union[str, Path]): The root path of the file store. All keys are
                interpreted as paths relative to this root.
            chmod_file: (optional, defaults to `None`) If specified, sets permissions
                for newly created files, overriding the current `umask` if needed.
            chmod_dir: (optional, defaults to `None`) If specified, sets permissions
                for newly created dirs, overriding the current `umask` if needed.
            update_atime: (optional, defaults to `False`) If `True`, updates the
                filesystem access time (but not the modified time) when a file is read.
                This allows MRU/LRU cache policies to be implemented for filesystems
                where access time updates are disabled.
        """
        self.root_path = Path(root_path).absolute()
        self.chmod_file = chmod_file
        self.chmod_dir = chmod_dir
        self.update_atime = update_atime

        """Initialize an empty store."""

        self.store: Dict[str, Document] = {}

        root_path_str = self.root_path
        if type(self.root_path) == Path:
            root_path_str = self.root_path.absolute()

        for _path in os.listdir(root_path_str):
            path = Path(_path)
            if path.exists():
                value = path.read_text()
                content = json.loads(value)

                document = Document(
                    page_content=content["page_content"],
                    id=content["id"],
                    metadata=content["metadata"],
                )
                self.store[_path] = document

    def _get_full_path(self, key: str) -> Path:
        """Get the full path for a given key relative to the root path.

        Args:
            key (str): The key relative to the root path.

        Returns:
            Path: The full path for the given key.
        """
        if not re.match(r"^[a-zA-Z0-9_.\-/]+$", key):
            raise InvalidKeyException(f"Invalid characters in key: {key}")
        full_path = os.path.abspath(self.root_path / key)
        common_path = os.path.commonpath([str(self.root_path), full_path])
        if common_path != str(self.root_path):
            raise InvalidKeyException(
                f"Invalid key: {key}. Key should be relative to the full path."
                f"{self.root_path} vs. {common_path} and full path of {full_path}"
            )

        return Path(full_path)

    def mset(self, key_value_pairs: Sequence[Tuple[str, Document]]) -> None:
        """Set the values for the given keys.

        Args:
            key_value_pairs: A sequence of key-value pairs.

        Returns:
            None
        """
        for key, value in key_value_pairs:
            self.store[key] = value

            full_path = self._get_full_path(key)
            self._mkdir_for_store(full_path.parent)
            content = {
                "metadata": value.metadata,
                "content": value.page_content,
                "id": value.id,
            }
            full_path.write_text(json.dumps(content))
            if self.chmod_file is not None:
                os.chmod(full_path, self.chmod_file)

    def mget(self, keys: Sequence[str]) -> List[Optional[Document]]:
        """Get the values associated with the given keys.

        Args:
            keys: A sequence of keys.

        Returns:
            A sequence of optional values associated with the keys.
            If a key is not found, the corresponding value will be None.
        """

        values: List[Optional[Document]] = []
        for key in keys:
            values.append(self.store[key])

            # full_path = self._get_full_path(key)
            # if full_path.exists():
            #     value = full_path.read_text()
            #     content = json.loads(value)

            #     values.append(
            #         Document(
            #             page_content=content["page_content"],
            #             id=content["id"],
            #             metadata=content["metadata"],
            #         )
            #     )
            #     if self.update_atime:
            #         # update access time only; preserve modified time
            #         os.utime(full_path, (time.time(), os.stat(full_path).st_mtime))
            # else:
            #     values.append(Document(page_content="Empty Document", id=None))

        return values

    def _mkdir_for_store(self, dir: Path) -> None:
        """Makes a store directory path (including parents) with specified permissions

        This is needed because `Path.mkdir()` is restricted by the current `umask`,
        whereas the explicit `os.chmod()` used here is not.

        Args:
            dir: (Path) The store directory to make

        Returns:
            None
        """
        if not dir.exists():
            self._mkdir_for_store(dir.parent)
            dir.mkdir(exist_ok=True)
            if self.chmod_dir is not None:
                os.chmod(dir, self.chmod_dir)

    def mdelete(self, keys: Sequence[str]) -> None:
        """Delete the given keys and their associated values.

        Args:
            keys (Sequence[str]): A sequence of keys to delete.
        """
        for key in keys:
            if key in self.store:
                del self.store[key]

    def yield_keys(self, prefix: Optional[str] = None) -> Iterator[str]:  # type: ignore
        """Get an iterator over keys that match the given prefix.

        Args:
            prefix (str, optional): The prefix to match. Defaults to None.

        Yields:
            Iterator[str]: An iterator over keys that match the given prefix.
        """
        if prefix is None:
            yield from self.store.keys()
        else:
            for key in self.store.keys():
                if key.startswith(prefix):
                    yield key


In [2]:
from datetime import date
from uuid import UUID
from pydantic import BaseModel
from typing import List

product_images_path = Path("../initial-training-sets/datasets/images/")

class RawReview(BaseModel):
    id: str
    review_ref: str
    product_id: str
    review_content: str
    review_title: Optional[str]
    date_written: date
    product_asin: str
    helpful_count: int
    rating_given: int
    review_page_url: str

class RAWProduct(BaseModel):
    id: str
    name: str
    description: str
    product_asin: str
    overall_ratings: float
    total_customers_that_rated: int
    price: float
    currency: str
    category: str
    sub_category: str
    product_page_url: str
    image_url: str
    reviews: List[RawReview]
    
    
class ProductCombinedInformation:
    
    product: RAWProduct
    
    def __init__(self, product: RAWProduct) -> None:
        self.product = product
        
    def get_document(self):
        return Document(page_content=self.info(), id=str(self.product.id), metadata={
            "product_id": self.product.id,
            "product_asin": self.product.product_asin,
            "image_url": self.product.image_url,
            "name": self.product.name
        })
    
    
    def info(self):
        return (
            f"Product ID: {str(self.product.id).strip()} \n"
            f"Product Name: {str(self.product.name).strip()} \n"
            f"Product Description: {str(self.product.description).strip()} \n"
            f"Product Asin: {self.product.product_asin} \n"
            f"Overall Ratings {self.product.overall_ratings} \n"
            f"Total Customers that rated: {self.product.total_customers_that_rated} \n"
            f"Pric: {self.product.currency}{self.product.price} \n"   
        )
        
    def image_path(self):
        return f"{product_images_path}/{self.product.product_asin}.png"
    

In [3]:
psi_file_path = Path("../V1_DATA/psi-llama3.1:8b-instruct-q5_k_m.json")
pid_file_path = Path("../V1_DATA/pid-llava:13b-v1.6-vicuna-q4_0.json")
raw_data_file_path = Path("../V1_DATA/raw_products.json")

assert raw_data_file_path.exists()
assert pid_file_path.exists()
assert psi_file_path.exists()

product_summaries = json.loads(psi_file_path.read_text())
product_image_descriptions = json.loads(pid_file_path.read_text())
raw_products = json.loads(raw_data_file_path.read_bytes())


products_combined_infos = {}
for rp in raw_products:
    if rp["product_asin"] in product_summaries:
        products_combined_infos[rp["product_asin"]] = ProductCombinedInformation(
            product=RAWProduct.model_validate(rp)
        )


# Everything must be in equal
# TODO: Check that same item is in the exact position in all dict
assert (
    len(product_image_descriptions)
    == len(product_summaries)
    == len(products_combined_infos)
)

product_image_descriptions = sorted(
    product_image_descriptions.items(), key=lambda x: x[0]
)
product_summaries = sorted(product_summaries.items(), key=lambda x: x[0])
products_combined_infos = sorted(
    products_combined_infos.items(), key=lambda x: x[0]
)

# product_summaries = product_summaries[0:10]
# product_image_descriptions = product_image_descriptions[0:10]
# products_combined_infos = products_combined_infos[0:10]

In [4]:

from langchain_core.vectorstores import VectorStore
# Helper function to add documents into the vector and the doument store
def add_documents_no_split(
    id_key: str,
    summary_to_embeds: List[str],
    combined_product_infos: List[ProductCombinedInformation],
    docstore: CIAMDocumentFileStore,
    vectorstore: VectorStore,
):
    product_ids = [
        str(product_info_obj.product.id) for product_info_obj in combined_product_infos
    ]
    data = list(zip(product_ids, summary_to_embeds, combined_product_infos))

    docs = []
    parent_docs_contents = [
        Document(
            page_content=product_info.info(),
            metadata={
                id_key: product_info.product.id,
                "id": product_info.product.id,
                "name": product_info.product.name,
                "product_asin": product_info.product.product_asin,
            },
        )
        for product_info in combined_product_infos
    ]

    for single_item in data:
        product_id, content_to_embed, product_info = single_item
        docs.append(
            Document(
                page_content=content_to_embed,
                metadata={
                    id_key: product_id,
                    "id": product_id,
                    "name": product_info.product.name,
                    "product_asin": product_info.product.product_asin,
                },
            )
        )

    assert len(docs) == len(parent_docs_contents)

    vectorstore.add_documents(docs, ids=product_ids)
    docstore.mset(list(zip(product_ids, parent_docs_contents)))

In [5]:
# Vector Store for Document Embeddings
from langchain_chroma import Chroma
from langchain_experimental.open_clip import OpenCLIPEmbeddings
from langchain_openai import OpenAIEmbeddings

from langchain_community.embeddings.ollama import OllamaEmbeddings
from langchain.embeddings import CacheBackedEmbeddings
from langchain.storage import LocalFileStore
from langchain.retrievers.multi_vector import MultiVectorRetriever, SearchType

text_embedding_model_name = "nomic-embed-text"
underlying_embedding = OpenAIEmbeddings()

# for nomic-embed, embed_instruction is `search_document` to embed documents for RAG and `search_query` to embed the question
underlying_embedding = OllamaEmbeddings(
    model=text_embedding_model_name,
    embed_instruction="search_document",
    query_instruction="search_query",
)

embedding_store = LocalFileStore("/home/solomon/Documents/projects/ciam2rag/ciam2rag_core/V1_DATA/embedding-cache/")
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
    underlying_embeddings=underlying_embedding,
    document_embedding_cache=embedding_store,
    namespace=underlying_embedding.model,
)

collection_name = f"fashion_store_mrag_v_{underlying_embedding.model}"
vectorstore = Chroma(
    collection_name=collection_name,
    embedding_function=cached_embedder,
    # https://docs.trychroma.com/guides#changing-the-distance-function
    # Cosine, 1 means most similar, 0 means orthogonal, -1 means opposite
    collection_metadata={"hnsw:space": "cosine"},  # l2 is the default
    # embedding_function=OpenCLIPEmbeddings(model=None, preprocess=None, tokenizer=None, model_name=model_name, checkpoint=checkpoint)
)



In [6]:

document_store = CIAMDocumentFileStore("/home/solomon/Documents/projects/ciam2rag/ciam2rag_core/V1_DATA/docstores/")
add_documents_no_split(
    id_key="product_id",
    summary_to_embeds=[psi[1] for psi in product_summaries],
    combined_product_infos=[pci[1] for pci in products_combined_infos],
    docstore=document_store,
    vectorstore=vectorstore,
)

add_documents_no_split(
    id_key="product_id",
    summary_to_embeds=[pid[1] for pid in product_image_descriptions],
    combined_product_infos=[pci[1] for pci in products_combined_infos],
    docstore=document_store,
    vectorstore=vectorstore,
)

In [7]:
vectorstore.similarity_search_with_relevance_scores("Do you have some sun glasses for the sun?")

[(Document(metadata={'id': '8d01c144-a2a0-4f6c-8b89-e1114890d23d', 'name': 'AVENTO Polarized Sports Sunglasses UV400 Protection with Anti-Slip Function and Lightweight Frame - for Men and Women when Driving, Running, Baseball, Golf, Casual Sports and Activities', 'product_asin': 'B07G8WDT6H', 'product_id': '8d01c144-a2a0-4f6c-8b89-e1114890d23d'}, page_content=' The image is a product photograph of a pair of sunglasses. The sunglasses are designed to protect the eyes from sun glare and UV rays. They feature black frames, which likely provide a comfortable fit around the face, and possibly polarized lenses for enhanced visibility in bright conditions. These glasses could be used by outdoor enthusiasts or anyone who spends time in environments where sun protection is necessary.'),
  0.7747154831886292),
 (Document(metadata={'id': '5b4d42e2-dc3a-44de-930a-63fc58f2b276', 'name': 'Polarized Unisex Clip on Sunglasses for Eyeglasses-Good Flip up Clip Style Sunglasses for Myopia Glasses Outdoor

In [8]:
# for pci in products_combined_infos:
#     print(pci[1].product.name)

In [12]:
product_image_descriptions[4]

('B0002M30XA',
 ' The image is a product photo of a pair of Converse All Stars Chuck Taylor sneakers. These sneakers are characterized by their iconic low-top design, white rubber toe cap and midsole, black outsole, and red stitching details. They feature a canvas upper with the signature logo patch on the side and laces for securing the footwear.\n\nThese Converse sneakers are versatile fashion items that can be worn casually, with jeans or shorts, and are also suitable for more formal occasions such as work or school environments when paired with dress pants or a skirt. They provide comfortable support for walking and can easily transition from day to night wear.')

In [16]:
product_summaries[4]

('B0002M30XA',
 "Women's Star Ox M5039c' Sneakers are low-top canvas sneakers made of rubber soles and feature an iconic silhouette, ortholite insoles for comfort, and a diamond outsole tread. They are machine washable and suitable for unisex sizing.")

In [18]:
question = "The product is for women: I need an iconic low-top design sneakers."
vectorstore.similarity_search_with_relevance_scores(question, k=20)

[(Document(metadata={'id': '1b5fa23d-bbee-43fa-842f-049964c3c9e8', 'name': "Men's Arch Fit Banline Oxford", 'product_asin': 'B07YG2F3MC', 'product_id': '1b5fa23d-bbee-43fa-842f-049964c3c9e8'}, page_content=' The image is a product photo of a pair of athletic sneakers, specifically designed for women. They have a combination of dark and light gray color with details in a contrasting white and brown pattern. The sneakers feature a lace-up closure system, a knitted upper design, and appear to be made of textile materials. There are also visible logos on the side of one shoe, indicating the brand or manufacturer. These sneakers are designed for comfort and support during various athletic activities, such as running, walking, or casual wear. They likely provide a cushioned sole for added comfort and stability.'),
  0.7716027498245239),
 (Document(metadata={'id': '786c5d2b-c54d-439f-a745-4ac1fed62abe', 'name': 'Hike Footwear Barefoot Shoes Womens Mens Walking Running Gym Trainers Wide Fit No