In [None]:
from google.colab import drive
drive.mount('/content/drive')

Embeddings

In [None]:
from embeddings.base import BaseEmbedding, APIBaseEmbedding, EmbeddingConfig
from embeddings.sentenceTransformer import SentenceTransformerEmbedding
from embeddings.openai import OpenAIEmbedding
from embeddings.google import GoogleEmbedding

In [None]:
from pydantic.v1 import BaseModel, Field, validator
from typing import Any, Optional

class EmbeddingConfig(BaseModel):
    name: str = Field(..., description="The name of the SentenceTransformer model")

    @validator('name')
    def check_model_name(cls, value):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("Model name must be a non-empty string")
        return value

class BaseEmbedding():
    name: str

    def __init__(self, name: str):
        super().__init__()
        self.name = name

    def encode(self, text: str):
        raise NotImplementedError("The encode method must be implemented by subclasses")


class APIBaseEmbedding(BaseEmbedding):
    baseUrl: str
    apiKey: str

    def __init__(self, name: str = None, baseUrl: str = None, apiKey: str = None):
        super().__init__(name)
        self.baseUrl = baseUrl
        self.apiKey = apiKey

In [None]:
import os
from typing import Optional, Union, List
from fastembed import TextEmbedding
from embeddings import BaseEmbedding

class FastEmbedding(BaseEmbedding):
    def __init__(
            self,
            # Multilingual model
            name: str = 'BAAI/bge-m3',
            max_length:int = 512
        ):
        super().__init__(name=name)

        try:
            self.embedding_model = TextEmbedding(
                name=name,
                max_length=max_length
            )
        except Exception as e:
            raise ValueError(
                f"Fastembed failed to initialize. Error: {e}"
            ) from e

    def encode(self, docs: List[str]):
        try:
            embeds = self.embedding_model.embed(docs)
            embeddings: List[List[float]] = [e.tolist() for e in embeds]
            return embeddings
        except Exception as e:
            raise ValueError(
                f"Failed to get embeddings. Error details: {e}"
            ) from e

In [None]:


from embeddings import APIBaseEmbedding
import os
import openai
from typing import Any, List, Optional
from dotenv import load_dotenv
load_dotenv()

class GoogleEmbedding(APIBaseEmbedding):
    def __init__(
        self,
        name: str = "textembedding-gecko@003",
        dimensions: int = 768,
        token_limit: int = 8192,
        baseUrl: str = None,
        apiKey: str = None,
        projectId: str = None,
        location: str = None,
    ):
        super().__init__(name=name, baseUrl=baseUrl, apiKey=apiKey)
        self.name = name

        try:
            from google.cloud import aiplatform
            from vertexai.language_models import TextEmbeddingModel
        except ImportError:
            raise ImportError(
                "To use GoogleEmbedding, please install the Google Cloud and Vertex AI libraries "
                "You can do this with the following command: "
                "`pip install google-cloud-aiplatform vertexai-language-models`"
            )

        projectId = projectId or os.getenv("GOOGLE_PROJECT_ID")
        location = location or os.getenv("GOOGLE_LOCATION", "us-central1")
        baseUrl = baseUrl or os.getenv("GOOGLE_BASE_URL")

        if projectId is None:
            raise ValueError("Google Project ID cannot be null.")

        try:
            aiplatform.init(
                project=projectId, location=location, api_endpoint=baseUrl
            )
            self.client = TextEmbeddingModel.from_pretrained(self.name)
        except Exception as err:
            raise ValueError(
                f"Failed to initialize Google AI Platform client. Error: {err}"
            ) from err

    def encode(self, docs: List[str]):
        try:
            embeddings = self.client.get_embeddings(docs)
            return [embedding.values for embedding in embeddings]
        except Exception as e:
            raise ValueError(f"Google AI Platform API call failed. Error: {e}") from e


In [None]:
import os
from typing import Optional, Union, List
from mistralai.client import MistralClient
from embeddings import APIBaseEmbedding

class MistralEmbedding(APIBaseEmbedding):
    def __init__(
            self,
            name: str = "mistral-embed",
            apiKey: str = None,
        ):
        super().__init__(name=name, apiKey=apiKey)
        self.apiKey = apiKey or os.getenv("MISTRAL_KEY")

        if not self.apiKey:
            raise ValueError("The Mistral API key must not be 'None'.")

        try:
            self.client = MistralClient(
                api_key=self.apiKey
            )
        except Exception as e:
            raise ValueError(
                f"Mistral API client failed to initialize. Error: {e}"
            ) from e

    def encode(self, docs: List[str]):
        try:
            embeds = self.client.embeddings(
                    input=docs,
                    model=self.name,
                )
            embeddings = [embeds_obj.embedding for embeds_obj in embeds.data]
            return embeddings
        except Exception as e:
            raise ValueError(
                f"Failed to get embeddings. Error details: {e}"
            ) from e

In [None]:
import os
from typing import Optional, Union, List
from pydantic.v1 import BaseModel, Field, PrivateAttr
from embeddings import APIBaseEmbedding
import openai
from dotenv import load_dotenv
load_dotenv()

class OpenAIEmbedding(APIBaseEmbedding):
    def __init__(
            self,
            name: str = "text-embedding-3-small",
            dimensions: int = 768,
            token_limit: int = 8192,
            baseUrl: str = None,
            apiKey: str = None,
            orgId: str = None,
        ):
        super().__init__(name=name, baseUrl=baseUrl, apiKey=apiKey)
        self.dimensions = dimensions
        self.apiKey = apiKey or os.getenv("OPENAI_API_KEY")
        self.orgId = orgId or os.getenv("OPENAI_ORG_ID")
        self.baseUrl = orgId or os.getenv("OPENAI_BASE_URL")

        if not self.apiKey:
            raise ValueError("The OpenAI API key must not be 'None'.")

        try:
            self.client = openai.Client(
                base_url=self.baseUrl, api_key=self.apiKey, organization=self.orgId
            )
        except Exception as e:
            raise ValueError(
                f"OpenAI API client failed to initialize. Error: {e}"
            ) from e

    def encode(self, docs: List[str]):
        try:
            embeds = self.client.embeddings.create(
                    input=docs,
                    model=self.name,
                    dimensions=self.dimensions,
                )
            embeddings = [embeds_obj.embedding for embeds_obj in embeds.data]
            return embeddings
        except Exception as e:
            raise ValueError(
                f"Failed to get embeddings. Error details: {e}"
            ) from e

In [None]:
from pydantic.v1 import BaseModel, Field, validator
from embeddings import BaseEmbedding, EmbeddingConfig
from sentence_transformers import SentenceTransformer

class SentenceTransformerEmbedding(BaseEmbedding):
    def __init__(self, config: EmbeddingConfig):
        super().__init__(config.name)
        self.config = config
        self.embedding_model = SentenceTransformer(self.config.name)

    def encode(self, text: str):
        return self.embedding_model.encode(text)

RAG

In [None]:
import pymongo
import google.generativeai as genai
from IPython.display import Markdown
import textwrap
from embeddings import SentenceTransformerEmbedding, EmbeddingConfig

class RAG():
    def __init__(self,
            mongodbUri: str,
            dbName: str,
            dbCollection: str,
            llm,
            embeddingName: str ='keepitreal/vietnamese-sbert',
        ):
        self.client = pymongo.MongoClient(mongodbUri)
        self.db = self.client[dbName]
        self.collection = self.db[dbCollection]
        self.embedding_model = SentenceTransformerEmbedding(
            EmbeddingConfig(name=embeddingName)
        )
        self.llm = llm

    def get_embedding(self, text):
        if not text.strip():
            return []

        embedding = self.embedding_model.encode(text)
        return embedding.tolist()

    def vector_search(
            self,
            user_query: str,
            limit=4):
        """
        Perform a vector search in the MongoDB collection based on the user query.

        Args:
        user_query (str): The user's query string.

        Returns:
        list: A list of matching documents.
        """

        # Generate embedding for the user query
        query_embedding = self.get_embedding(user_query)

        if query_embedding is None:
            return "Invalid query or embedding generation failed."

        # Define the vector search pipeline
        vector_search_stage = {
            "$vectorSearch": {
                "index": "vector_index",
                "queryVector": query_embedding,
                "path": "embedding",
                "numCandidates": 400,
                "limit": limit,
            }
        }

        unset_stage = {
            "$unset": "embedding"
        }

        project_stage = {
            "$project": {
                "_id": 0,
                "title": 1,
                # "product_specs": 1,
                "color_options": 1,
                "current_price": 1,
                "product_promotion": 1,
                "score": {
                    "$meta": "vectorSearchScore"
                }
            }
        }

        pipeline = [vector_search_stage, unset_stage, project_stage]

        # Execute the search
        results = self.collection.aggregate(pipeline)

        return list(results)

    def enhance_prompt(self, query):
        get_knowledge = self.vector_search(query, 10)
        enhanced_prompt = ""
        i = 0
        for result in get_knowledge:
            if result.get('current_price'):
                i += 1
                enhanced_prompt += f"\n {i}) Tên: {result.get('title')}"

                if result.get('current_price'):
                    enhanced_prompt += f", Giá: {result.get('current_price')}"
                else:
                    # Mock up data
                    # Retrieval model pricing from the internet.
                    enhanced_prompt += f", Giá: Liên hệ để trao đổi thêm!"

                if result.get('product_promotion'):
                    enhanced_prompt += f", Ưu đãi: {result.get('product_promotion')}"
        return enhanced_prompt

    def generate_content(self, prompt):
        return self.llm.generate_content(prompt)

    def _to_markdown(text):
        text = text.replace('•', '  *')
        return Markdown(textwrap.indent(text, '> ', predicate=lambda _: True))

Reflection

In [None]:
from reflection.core import Reflection

In [None]:
class Reflection():
    def __init__(self, llm):
        self.llm = llm

    def _concat_and_format_texts(self, data):
        concatenatedTexts = []
        for entry in data:
            role = entry.get('role', '')
            all_texts = ' '.join(part['text'] for part in entry['parts'])
            concatenatedTexts.append(f"{role}: {all_texts} \n")
        return ''.join(concatenatedTexts)


    def __call__(self, chatHistory, lastItemsConsidereds=100):

        if len(chatHistory) >= lastItemsConsidereds:
            chatHistory = chatHistory[len(chatHistory) - lastItemsConsidereds:]

        historyString = self._concat_and_format_texts(chatHistory)

        higherLevelSummariesPrompt = """Given a chat history and the latest user question which might reference context in the chat history, formulate a standalone question in Vietnamese which can be understood without the chat history. Do NOT answer the question, just reformulate it if needed and otherwise return it as is. {historyString}
        """.format(historyString=historyString)

        print(higherLevelSummariesPrompt)

        completion = self.llm.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "user",
                    "content": higherLevelSummariesPrompt
                }
            ]
        )

        return completion.choices[0].message.content

Semantic Router

In [None]:
from semantic_router.route import Route
from semantic_router.router import SemanticRouter
from semantic_router.samples import *

In [None]:
from typing import List
class Route():
    def __init__(
        self,
        name: str = None,
        samples:List = []
    ):

        self.name = name
        self.samples = samples

In [None]:
import numpy as np

class SemanticRouter():
    def __init__(self, embedding, routes):
        self.routes = routes
        self.embedding = embedding
        self.routesEmbedding = {}

        for route in self.routes:
            self.routesEmbedding[
                route.name
            ] = self.embedding.encode(route.samples)

    def get_routes(self):
        return self.routes

    def guide(self, query):
        queryEmbedding = self.embedding.encode([query])
        queryEmbedding = queryEmbedding / np.linalg.norm(queryEmbedding)
        scores = []

        # Calculate the cosine similarity of the query embedding with the sample embeddings of the router.

        for route in self.routes:
            routesEmbedding = self.routesEmbedding[route.name] / np.linalg.norm(self.routesEmbedding[route.name])
            score = np.mean(np.dot(routesEmbedding, queryEmbedding.T).flatten())
            scores.append((score, route.name))

        scores.sort(reverse=True)
        return scores[0]