In [None]:
pip install chromadb

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import zipfile
import os
import google.generativeai as genai
from chromadb import Documents, EmbeddingFunction, Embeddings
import os
import chromadb

In [None]:
import os

os.environ["GEMINI_API_KEY"] = "AIzaSyDiQmN9hEk3Z8SupAR5qHcZLGQNCTnopKs"

In [None]:
zip_file_path = 'chroma_database_product.zip'

# Destination directory where the contents will be extracted
destination_dir = 'chroma_database/'

# Create the destination directory if it doesn't exist
if not os.path.exists(destination_dir):
    os.makedirs(destination_dir)

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(destination_dir)

In [None]:
class GeminiEmbeddingFunction(EmbeddingFunction):
    """
    Custom embedding function using the Gemini AI API for document retrieval.

    This class extends the EmbeddingFunction class and implements the __call__ method
    to generate embeddings for a given set of documents using the Gemini AI API.

    Parameters:
    - input (Documents): A collection of documents to be embedded.

    Returns:
    - Embeddings: Embeddings generated for the input documents.

    Raises:
    - ValueError: If the Gemini API Key is not provided as an environment variable (GEMINI_API_KEY).

    Example:
    >>> gemini_embedding_function = GeminiEmbeddingFunction()
    >>> input_documents = Documents(["Document 1", "Document 2", "Document 3"])
    >>> embeddings_result = gemini_embedding_function(input_documents)
    >>> print(embeddings_result)
    Embeddings for the input documents generated by the Gemini AI API.
    """
    def __call__(self, input: Documents) -> Embeddings:
        gemini_api_key = os.getenv("GEMINI_API_KEY")
        if not gemini_api_key:
            raise ValueError("Gemini API Key not provided. Please provide GEMINI_API_KEY as an environment variable")
        genai.configure(api_key=gemini_api_key)
        model = "models/embedding-001"
        title = "Custom query"
        return genai.embed_content(model=model,
                                   content=input,
                                   task_type="retrieval_document",
                                   title=title)["embedding"]

In [None]:
def load_chroma_collection(path, name):
    """
    Loads an existing Chroma collection from the specified path with the given name.

    Parameters:
    - path (str): The path where the Chroma database is stored.
    - name (str): The name of the collection within the Chroma database.

    Returns:
    - chromadb.Collection: The loaded Chroma Collection.
    """
    chroma_client = chromadb.PersistentClient(path=path)
    db = chroma_client.get_collection(name=name, embedding_function=GeminiEmbeddingFunction())

    return db

db=load_chroma_collection(path='chroma_database', name="reviews_collections")

In [None]:
def get_relevant_passage(query, db, n_results):
  passage = db.query(query_texts=[query], n_results=n_results)['documents'][0]
  return passage

In [None]:
def make_rag_prompt(query, relevant_passage):
  escaped = relevant_passage.replace("'", "").replace('"', "").replace("\n", " ")
  prompt = (""" You are a  bot specialized in writing amazon product reviews. Please REPLACE the [MASK] tokens in the following passage to create a coherent product review. You can write review about any product. Ensure that the filled-in words align with the context of a amazon product review. Additionally, transform the passage so that it resembles a product review on amazon. Enhance the passage with your own insights and knowledge of typical amazon product review content. You should try to keep the original textual structure intact.
For example if the original query is "I love this [MASK] and I would go to eat food here someday." Your answer should be "I love this earphone and I would definitely buy this at amazon."
Transform the passage just like the example given above.  keep the structure of original passage almost intact.
  QUESTION: '{query}'
  PASSAGE: '{relevant_passage}'

  ANSWER:
  """).format(query=query, relevant_passage=escaped)

  return prompt


In [None]:
import google.generativeai as genai
def generate_answer_api(prompt):
    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if not gemini_api_key:
        raise ValueError("Gemini API Key not provided. Please provide GEMINI_API_KEY as an environment variable")
    genai.configure(api_key=gemini_api_key)
    model = genai.GenerativeModel('gemini-pro')
    answer = model.generate_content(prompt)
    return answer.text

In [None]:
def generate_answer(db,query):
    #retrieve top 3 relevant text chunks
    relevant_text = get_relevant_passage(query,db,n_results=3)
    prompt = make_rag_prompt(query,
                             relevant_passage="".join(relevant_text)) # joining the relevant chunks to create a single passage
    answer = generate_answer_api(prompt)

    return answer

In [None]:
db=load_chroma_collection(path='chroma_database', #replace with path of your persistent directory
                          name="reviews_collections") #replace with the collection name



In [None]:
#Example
answer = generate_answer(db,"Love this! Perfect [MASK] for an [MASK] family!Very [MASK] quality. [MASK] [MASK] [MASK] [MASK] [MASK]")
print(answer)

**Mask Filling**

In [None]:
import numpy as np
import pandas as pd

In [None]:
df = pd.read_csv('masked_restauranttoproduct.csv')

In [None]:
df.head()

In [None]:
import pandas as pd

def fill_masks(df):
    lis_fill = []
    lis_label = []
    lis_text = []
    lis_masked = []
    errors = 0

    for i, j, k in zip(df['masked_sentences'], df['Label'], df['Review']):
        sentence = i
        try:
            while '[MASK]' in sentence:
                sentence = generate_answer(db, sentence)
            #print(sentence)
            lis_fill.append(sentence)
            lis_label.append(j)
            lis_text.append(k)
            lis_masked.append(i)
            print(len(lis_text))
        except Exception as e:  # Catch any general exception
            errors += 1
            if "blocked" in str(e).lower():  # Check for keywords indicating a blocked prompt (optional)
                print("** Potential Blocked Prompt Encountered (custom check):", e)
            else:
                print("An error occurred:", e)
            print("Continuing with the next iteration.")

    if errors > 0:
        print(f"Encountered {errors} errors during processing.")

    return lis_fill, lis_label, lis_text, lis_masked

# Example usage:
filled_sentences, labels, texts, masked_sentences = fill_masks(df)


In [None]:
dic = {
    'generated_text':filled_sentences,
    'label':labels,
    'original_text':texts,
    'masked_sentences':masked_sentences
}

In [None]:
data = pd.DataFrame(dic)

In [None]:
data.head()
data.shape

In [None]:
data.to_csv('restaurant_to_product_rag.csv')