In [1]:
from pymongo import MongoClient, UpdateOne
from google.cloud import translate_v2 as translate
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from bson.objectid import ObjectId
import html
import os
import boto3
import json
import time

In [2]:
BATCH_SIZE = 100
MAX_WORKERS = 5

In [3]:
MONGODB_URI = (
    os.environ["MONGODB_URI"]
    if "MONGODB_URI" in os.environ
    else input("MongoDB Connection String: ")
)

In [4]:
client = MongoClient(MONGODB_URI)
db = client["genai_inventory_classification"]
collection = db["reviews"]

## Translate Service

In [32]:
translate_client = translate.Client()

In [36]:
def translate_text(text):
    target = "en"
    source = "pt"
    if not text: 
        return None
    translated_text = translate_client.translate(text, target_language=target, source_language=source)["translatedText"]
    return html.unescape(translated_text)

In [37]:
def process_batch(documents):
    """Translate a batch of documents in parallel and update MongoDB using bulk_write()."""
    updates = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_doc = {}

        # Submit translation tasks
        for doc in documents:
            doc_id = doc["_id"]
            title_original = doc.get("titleOriginal")
            message_original = doc.get("messageOriginal")

            title_future = executor.submit(translate_text, title_original) if title_original else None
            message_future = executor.submit(translate_text, message_original) if message_original else None

            if title_future:
                future_to_doc[title_future] = (doc_id, "title")
            else:
                updates.append(UpdateOne({"_id": doc_id}, {"$set": {"title": None}})) 

            if message_future:
                future_to_doc[message_future] = (doc_id, "message")
            else:
                updates.append(UpdateOne({"_id": doc_id}, {"$set": {"message": None}})) 

        # Collect results
        for future in as_completed(future_to_doc):
            doc_id, field = future_to_doc[future]
            try:
                translated_text = future.result()
                updates.append(UpdateOne({"_id": doc_id}, {"$set": {field: translated_text}}))
            except Exception as e:
                print(f"Error translating {field} for document {doc_id}: {e}")

    # Perform bulk write to update MongoDB
    if updates:
        collection.bulk_write(updates)

In [50]:
filter = {"$or": [{"title": {"$exists": False}}, {"message": {"$exists": False}}]}
#filter = {"$and": [{"_id": ObjectId("67cf17d7c1af94d1b49cbed1")},{"$or": [{"title": {"$exists": False}}, {"message": {"$exists": False}}]}]}
total_docs = collection.count_documents(filter)
print(f"Total documents to process: {total_docs}")

with tqdm(total=total_docs, desc="Translating Reviews") as pbar:
    while True:
        # Fetch next batch of unprocessed documents
        documents = list(collection.find(
            filter,
            {"_id": 1, "titleOriginal": 1, "messageOriginal": 1}
        ).limit(BATCH_SIZE))

        if not documents:
            break  # No more documents to process

        process_batch(documents)
        pbar.update(len(documents))  # Update progress bar

print("Translation completed.")

Total documents to process: 5158


Translating Reviews: 100%|██████████| 5158/5158 [02:11<00:00, 39.22it/s]

Translation completed.





## Embedding service

In [5]:
aws_region = "us-east-1"
bedrock_runtime = boto3.client("bedrock-runtime", region_name=aws_region)

In [16]:
def generate_embedding(text):
    """Generate text embedding using AWS Bedrock Cohere model."""
    if not text.strip():  # If empty or whitespace
        return None

    payload = {
        "texts": [text],
        "input_type": "search_document",
        "embedding_types": ["float"],    
    }

    try:
        response = bedrock_runtime.invoke_model(
            body=json.dumps(payload),
            modelId="cohere.embed-english-v3",
            accept='*/*',
            contentType="application/json",
        )
        response_body = json.loads(response["body"].read())
        embeddings = response_body.get("embeddings")
        return embeddings["float"][0]
    except Exception as e:
        tqdm.write(f"Error generating embedding: {e}")
        return None

In [18]:
def process_batch(documents):
    """Generate embeddings in parallel and update MongoDB."""
    updates = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_doc = {}

        for doc in documents:
            doc_id = doc["_id"]
            title = doc.get("title")
            message = doc.get("message")

            if title is None and message is None:
                # If both are None, set emb to None without calling AWS
                updates.append(UpdateOne({"_id": doc_id}, {"$set": {"emb": None}}))
            else:
                combined_text = f"{title or ''} {message or ''}".strip()
                future = executor.submit(generate_embedding, combined_text)
                future_to_doc[future] = doc_id

        # Collect results
        for future in as_completed(future_to_doc):
            doc_id = future_to_doc[future]
            try:
                embedding = future.result()
                updates.append(UpdateOne({"_id": doc_id}, {"$set": {"emb": embedding}}))
            except Exception as e:
                tqdm.write(f"Error processing document {doc_id}: {e}")

    # Perform bulk update in MongoDB
    if updates:
        collection.bulk_write(updates)

In [19]:
filter = {"emb": {"$exists": False}}
#filter = {"$and": [{"_id": ObjectId("67cf17d7c1af94d1b49cbed1")},{"emb": {"$exists": False}}]}
total_docs = collection.count_documents(filter)
print(f"Total documents to process: {total_docs}")

with tqdm(total=total_docs, desc="Generating Embeddings") as pbar:
    while True:
        # Fetch next batch of unprocessed documents
        documents = list(collection.find(
           filter,  # Only fetch documents without embeddings
            {"_id": 1, "title": 1, "message": 1}
        ).limit(BATCH_SIZE))

        if not documents:
            break  # No more documents to process

        process_batch(documents)
        pbar.update(len(documents))  # Update progress bar
        time.sleep(1)  # Small delay to avoid excessive API calls

print("Embedding generation completed.")

Total documents to process: 5158


Generating Embeddings: 100%|██████████| 5158/5158 [03:19<00:00, 25.79it/s]

Embedding generation completed.



