## Embed CSV Files with Google Gemini

https://python.langchain.com/docs/integrations/text_embedding/google_generative_ai/

https://github.com/mongodb-developer/GenAI-Showcase/blob/50535ba52c872ed03a975bf180f01f84696e7cc9/notebooks/agents/agentic_rag_factory_safety_assistant_with_langgraph_langchain_mongodb.ipynb

# Google Generative AI Embeddings (AI Studio & Gemini API)

Connect to Google's generative AI embeddings service using the `GoogleGenerativeAIEmbeddings` class, found in the [langchain-google-genai](https://pypi.org/project/langchain-google-genai/) package.

This will help you get started with Google's Generative AI embedding models (like Gemini) using LangChain. For detailed documentation on `GoogleGenerativeAIEmbeddings` features and configuration options, please refer to the [API reference](https://python.langchain.com/v0.2/api_reference/google_genai/embeddings/langchain_google_genai.embeddings.GoogleGenerativeAIEmbeddings.html).

## Overview
### Integration details

import { ItemTable } from "@theme/FeatureTables";

<ItemTable category="text_embedding" item="Google Gemini" />

## Setup

To access Google Generative AI embedding models you'll need to create a Google Cloud project, enable the Generative Language API, get an API key, and install the `langchain-google-genai` integration package.

### Credentials

To use Google Generative AI models, you must have an API key. You can create one in Google AI Studio. See the [Google documentation](https://ai.google.dev/gemini-api/docs/api-key) for instructions.

Once you have a key, set it as an environment variable `GOOGLE_API_KEY`:


In [1]:
import getpass
import os

#if not os.getenv("GOOGLE_API_KEY"):
#    os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter your Google API key: ")

In [1]:
from dotenv import load_dotenv
import os

In [31]:
load_dotenv()  # This loads variables from .env into the environment


True

In [3]:
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

In [32]:
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

## Installation

In [5]:
%pip install --upgrade --quiet  langchain-google-genai


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [44]:
!pip install google-generativeai


Collecting google-ai-generativelanguage==0.6.9 (from google-generativeai)
  Downloading google_ai_generativelanguage-0.6.9-py3-none-any.whl.metadata (5.6 kB)
Downloading google_ai_generativelanguage-0.6.9-py3-none-any.whl (725 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m725.4/725.4 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: google-ai-generativelanguage
  Attempting uninstall: google-ai-generativelanguage
    Found existing installation: google-ai-generativelanguage 0.6.18
    Uninstalling google-ai-generativelanguage-0.6.18:
      Successfully uninstalled google-ai-generativelanguage-0.6.18
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
langchain-google-genai 2.1.5 requires google-ai-generativelanguage<0.7.0,>=0.6.18, but you have google-ai-generativelanguage 0.6.9 which is incompatible.[0m[3

In [4]:
import pandas as pd

In [75]:
GEMINI_EMBEDDING_MODEL = "models/embedding-001"#"models/gemini-embedding-exp-03-07"

In [42]:
from google.generativeai import GenerativeModel

  from .autonotebook import tqdm as notebook_tqdm


In [43]:
embedding_model = GenerativeModel(model_name=GEMINI_EMBEDDING_MODEL)

In [34]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings

embedding_model = GoogleGenerativeAIEmbeddings(
    model=GEMINI_EMBEDDING_MODEL,
    task_type="RETRIEVAL_DOCUMENT"
)


In [35]:
import sys

In [36]:
# Add the root directory of your project to Python path
project_root = os.path.abspath("../backend")
if project_root not in sys.path:
    sys.path.append(project_root)

## Connect to the MongoDB collection

In [9]:
from db.mongodb_client import mongodb_client

✅ MongoDB connection established.


In [10]:
# Names of the MongoDB database, collection and vector search index
DB_NAME = "diabetes_data"
COLLECTION_NAME = "records_embeddings"
VS_INDEX_NAME = "tabular_vector_index"

In [11]:
db = mongodb_client[DB_NAME]

In [12]:
# Connect to the MongoDB collection
collection = mongodb_client[DB_NAME][COLLECTION_NAME]

In [13]:

print("Client:", mongodb_client)
print("Databases:", mongodb_client.list_database_names())

Client: MongoClient(host=['ac-t38w2lp-shard-00-01.q4fmjuw.mongodb.net:27017', 'ac-t38w2lp-shard-00-00.q4fmjuw.mongodb.net:27017', 'ac-t38w2lp-shard-00-02.q4fmjuw.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, retrywrites=True, w='majority', appname='Search4Cure.diabetes', authsource='admin', replicaset='atlas-lunm1g-shard-0', tls=True, server_api=<pymongo.server_api.ServerApi object at 0x30774adb0>)
Databases: ['diabetes_data', 'admin', 'local']


## Google Cloud Credentials

In [37]:
path_to_credentials = ""

In [38]:
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_to_credentials


## Insert to MongoDB

In [16]:
import numpy as np


def combine_all_attributes(df, exclude_columns=None):
    """
    Combine all attributes (optionally excluding some) of a DataFrame row into a single column.

    Parameters:
    - df: pandas DataFrame
    - exclude_columns: list of column names to exclude from the combination

    Returns:
    - df: DataFrame with a new 'combined_info' column
    """
    exclude_columns = exclude_columns or []

    def combine_row(row):
        combined = []
        for attr in row.index:
            if attr in exclude_columns:
                continue
            value = row[attr]
            if isinstance(value, (pd.Series, np.ndarray, list)):
                # Handle array-like objects
                if len(value) > 0 and not pd.isna(value).all():
                    combined.append(f"{attr.capitalize()}: {value!s}")
            elif not pd.isna(value):
                combined.append(f"{attr.capitalize()}: {value!s}")
        return " ".join(combined)

    df["combined_info"] = df.apply(combine_row, axis=1)
    return df

In [17]:
from tqdm import tqdm
from datetime import datetime

In [53]:
import itertools

In [None]:

 # === Local Files Folder ===
LOCAL_FOLDER = "../data/data_new" 

for file_name in os.listdir(LOCAL_FOLDER):
    file_path = os.path.join(LOCAL_FOLDER, file_name)

    # Skip non-data files
    if not file_name.lower().endswith(('.csv', '.xlsx', '.xls', '.json')):
        print(f"Skipping unsupported file: {file_name}")
        continue
    
    datasets_col = db["datasets"]
    data_col = db["records_embeddings"]

    #Check if file already uploaded
    existing = datasets_col.find_one({"file_name": file_name})
    if existing:
         print(f"⏭️ Skipping {file_name} (already uploaded)")
         #continue

    print(f"📂 Processing {file_name}...")
    # Initialize metadata tracking
    first_chunk = None
    total_rows = 0
    combined_missing = None

    # Read file into DataFrame
    try:
        if file_name.endswith(".csv"):
            chunk_iter = pd.read_csv(file_path, chunksize=500)
        elif file_name.endswith((".xlsx", ".xls")):
            chunk_iter = pd.read_excel(file_path, chunksize=500)
        elif file_name.endswith(".json"):
            chunk_iter = pd.read_json(file_path, lines=True, chunksize=500)
    except Exception as e:
        print(f"❌ Failed to read {file_name}: {e}")
        continue
    dataset_id = None

    # === Insert Metadata ===
    start_index = 1500

    # Skip the first `start_index // chunksize` chunks
    chunks_to_embed = itertools.islice(chunk_iter, start_index // 500, None)
    #for chunk_idx, chunk in enumerate(chunk_iter):
    # Now embed starting from chunk N
    for chunk_idx, chunk in enumerate(tqdm(chunks_to_embed, desc="Embedding", initial=start_index, total=None)):       
        if first_chunk is None:
            first_chunk = chunk.copy()
            combined_missing = chunk.isnull().sum()
            dataset_doc = {
                "file_name": file_name,
                "upload_date": datetime.now(),
                "n_columns": chunk.shape[1],
                "columns": chunk.columns.tolist(),
                "missing_values": chunk.isnull().sum().to_dict(),
                "file_type": os.path.splitext(file_name)[-1].replace(".", ""),
                "file_path": file_path,
                "column_types": chunk.dtypes.astype(str).to_dict(),
            }
            dataset_id = datasets_col.insert_one(dataset_doc).inserted_id
            print(f"✅ Inserted metadata for {file_name}")

        total_rows += len(chunk)
    
        # === Combine all attributes ===
        chunk = combine_all_attributes(chunk, exclude_columns=[])
        print(chunk[["combined_info"]].head(2))  # preview
    
        duplicated_data = []
        for row in tqdm(chunk.itertuples(index=False), total=len(chunk), desc="Embedding"):
            duplicated_rows = get_embedding(row._asdict())
            duplicated_data.extend(duplicated_rows)

        if duplicated_data:
            df_dup = pd.DataFrame(duplicated_data)
            print(df_dup.head(2))
            try:
                total_inserted_tabular = insert_df_to_mongodb(df_dup, data_col, dataset_id)
                print(
                    f"📌 Chunk {chunk_idx + 1}: {len(df_dup)} documents inserted."
                )
            except Exception as e:
                print(f"❌ Error inserting chunk {chunk_idx + 1}: {e}")

    if dataset_id:
        datasets_col.update_one(
            {"_id": dataset_id},
            {"$set": {"n_rows": total_rows}}
        )
        print(f"✅ Finalized metadata for {file_name}: {total_rows} rows")

    # # Combine all columns into 'combined_info'
    # df = combine_all_attributes(df, exclude_columns=[])  
    # print(df[["combined_info"]].head(2))  # preview

    # # Apply the function and expand the dataset
    # duplicated_data = []
    # for _, row in tqdm(
    #     df.iterrows(),
    #     desc="Generating embeddings and duplicating rows",
    #     total=len(df),
    # ):
    #     duplicated_rows = get_embedding(row)
    #     duplicated_data.extend(duplicated_rows)

    # # Create a new DataFrame from the duplicated data
    # df = pd.DataFrame(duplicated_data)
    #print(df.head(2))

    # === Insert Data Records ===
    # Insert dataframe to mongodb
    # try:
    #     total_inserted_tabular = insert_df_to_mongodb(df, data_col, dataset_id)
    #     print(
    #         f"{file_name} data ingestion completed. Total documents inserted: {total_inserted_tabular}"
    #     )
    # except Exception as e:
    #     print(f"An error occurred while inserting {file_name}: {e}")
    #     print("Pandas version:", pd.__version__)
    # print_dataframe_info(df, {file_name})

# Final summary
print(f"\n✅ Ingestion process completed for all {total_inserted_tabular} files.")
# print("\nInsertion Summary:")
# print(
#     f"Tabular files inserted: {total_inserted_tabular if 'total_inserted_tabular' in locals() else 'Failed'}"
# )


     
        
    # data_records = df.to_dict(orient="records")
    # for record in data_records:
    #     record["dataset_id"] = dataset_id

    # if data_records:
    #     data_col.insert_many(data_records)
    #     print(f"✅ Uploaded data records for {file_name}")

In [69]:

 
for file_name in os.listdir(LOCAL_FOLDER):
    file_path = os.path.join(LOCAL_FOLDER, file_name)

    # Skip non-data files
    if not file_name.lower().endswith(('.csv', '.xlsx', '.xls', '.json')):
        print(f"Skipping unsupported file: {file_name}")
        continue
    
    datasets_col = db["datasets"]
    data_col = db["records_embeddings"]

    #Check if file already uploaded
    existing = datasets_col.find_one({"file_name": file_name})
    if existing:
         print(f"⏭️ Skipping {file_name} (already uploaded)")
         #continue

    print(f"📂 Processing {file_name}...")
    # Initialize metadata tracking
    first_chunk = None
    total_rows = 0
    combined_missing = None

    # Read file into DataFrame
    try:
        if file_name.endswith(".csv"):
            chunk_iter = pd.read_csv(file_path, chunksize=500)
        elif file_name.endswith((".xlsx", ".xls")):
            chunk_iter = pd.read_excel(file_path, chunksize=500)
        elif file_name.endswith(".json"):
            chunk_iter = pd.read_json(file_path, lines=True, chunksize=500)
    except Exception as e:
        print(f"❌ Failed to read {file_name}: {e}")
        continue
    dataset_id = None

    # === Insert Metadata ===
    start_index = 1500

    # Skip the first `start_index // chunksize` chunks
    chunks_to_embed = itertools.islice(chunk_iter, start_index // 500, None)
    #for chunk_idx, chunk in enumerate(chunk_iter):
    # Now embed starting from chunk N
    for chunk_idx, chunk in enumerate(tqdm(chunks_to_embed, desc="Embedding", initial=start_index, total=None)):       
        if first_chunk is None:
            first_chunk = chunk.copy()
            combined_missing = chunk.isnull().sum()
            dataset_doc = {
                "file_name": file_name,
                "upload_date": datetime.now(),
                "n_columns": chunk.shape[1],
                "columns": chunk.columns.tolist(),
                "missing_values": chunk.isnull().sum().to_dict(),
                "file_type": os.path.splitext(file_name)[-1].replace(".", ""),
                "file_path": file_path,
                "column_types": chunk.dtypes.astype(str).to_dict(),
            }
            dataset_id = datasets_col.insert_one(dataset_doc).inserted_id
            print(f"✅ Inserted metadata for {file_name}")

        total_rows += len(chunk)
    
        # === Combine all attributes ===
        chunk = combine_all_attributes(chunk, exclude_columns=[])
        print(chunk[["combined_info"]].head(2))  # preview
    
        duplicated_data = []
        for row in tqdm(chunk.itertuples(index=False), total=len(chunk), desc="Embedding"):
            duplicated_rows = get_embedding(row._asdict())
            duplicated_data.extend(duplicated_rows)

        if duplicated_data:
            df_dup = pd.DataFrame(duplicated_data)
            print(df_dup.head(2))
            try:
                total_inserted_tabular = insert_df_to_mongodb(df_dup, data_col, dataset_id)
                print(
                    f"📌 Chunk {chunk_idx + 1}: {len(df_dup)} documents inserted."
                )
            except Exception as e:
                print(f"❌ Error inserting chunk {chunk_idx + 1}: {e}")

    if dataset_id:
        datasets_col.update_one(
            {"_id": dataset_id},
            {"$set": {"n_rows": total_rows}}
        )
        print(f"✅ Finalized metadata for {file_name}: {total_rows} rows")

    # # Combine all columns into 'combined_info'
    # df = combine_all_attributes(df, exclude_columns=[])  
    # print(df[["combined_info"]].head(2))  # preview

    # # Apply the function and expand the dataset
    # duplicated_data = []
    # for _, row in tqdm(
    #     df.iterrows(),
    #     desc="Generating embeddings and duplicating rows",
    #     total=len(df),
    # ):
    #     duplicated_rows = get_embedding(row)
    #     duplicated_data.extend(duplicated_rows)

    # # Create a new DataFrame from the duplicated data
    # df = pd.DataFrame(duplicated_data)
    #print(df.head(2))

    # === Insert Data Records ===
    # Insert dataframe to mongodb
    # try:
    #     total_inserted_tabular = insert_df_to_mongodb(df, data_col, dataset_id)
    #     print(
    #         f"{file_name} data ingestion completed. Total documents inserted: {total_inserted_tabular}"
    #     )
    # except Exception as e:
    #     print(f"An error occurred while inserting {file_name}: {e}")
    #     print("Pandas version:", pd.__version__)
    # print_dataframe_info(df, {file_name})

# Final summary
print(f"\n✅ Ingestion process completed for all {total_inserted_tabular} files.")
# print("\nInsertion Summary:")
# print(
#     f"Tabular files inserted: {total_inserted_tabular if 'total_inserted_tabular' in locals() else 'Failed'}"
# )


     
        
    # data_records = df.to_dict(orient="records")
    # for record in data_records:
    #     record["dataset_id"] = dataset_id

    # if data_records:
    #     data_col.insert_many(data_records)
    #     print(f"✅ Uploaded data records for {file_name}")

Skipping unsupported file: .DS_Store
⏭️ Skipping diabetic_data.csv (already uploaded)
📂 Processing diabetic_data.csv...
✅ Inserted metadata for diabetic_data.csv
                                       combined_info
0  Encounter_id: 2278392 Patient_nbr: 8222157 Rac...
1  Encounter_id: 149190 Patient_nbr: 55629189 Rac...


Embedding: 100%|██████████| 500/500 [39:38<00:00,  4.76s/it]    


   encounter_id  patient_nbr       race  gender      age weight  \
0       2278392      8222157  Caucasian  Female   [0-10)      ?   
1        149190     55629189  Caucasian  Female  [10-20)      ?   

   admission_type_id  discharge_disposition_id  admission_source_id  \
0                  6                        25                    1   
1                  1                         1                    7   

   time_in_hospital  ... _42 _43  _44  _45  _46  change  diabetesMed  \
0                 1  ...  No  No   No   No   No      No           No   
1                 3  ...  No  No   No   No   No      Ch          Yes   

   readmitted                                      combined_info  \
0          NO  Encounter_id: 2278392 Patient_nbr: 8222157 Rac...   
1         >30  Encounter_id: 149190 Patient_nbr: 55629189 Rac...   

                                           embedding  
0  [0.019425964, -0.0024473513, -0.09022015, -0.0...  
1  [0.019444333, -0.004087155, -0.07224274, 3.688...

Embedding: 100%|██████████| 500/500 [19:05<00:00,  2.29s/it]  


   encounter_id  patient_nbr             race  gender      age weight  \
0       4255176      2139525        Caucasian  Female  [60-70)      ?   
1       4255452     99109602  AfricanAmerican  Female  [60-70)      ?   

   admission_type_id  discharge_disposition_id  admission_source_id  \
0                  6                        25                    7   
1                  1                         6                    7   

   time_in_hospital  ... _42 _43  _44  _45  _46  change  diabetesMed  \
0                10  ...  No  No   No   No   No      No          Yes   
1                10  ...  No  No   No   No   No      No           No   

   readmitted                                      combined_info  \
0          NO  Encounter_id: 4255176 Patient_nbr: 2139525 Rac...   
1          NO  Encounter_id: 4255452 Patient_nbr: 99109602 Ra...   

                                           embedding  
0  [0.022118663, 0.0037105205, -0.07894335, -0.00...  
1  [0.020221915, 0.0010955177, -0.

Embedding: 100%|██████████| 500/500 [16:35<00:00,  1.99s/it]  


   encounter_id  patient_nbr       race gender      age weight  \
0       7556418      4282317  Caucasian   Male  [50-60)      ?   
1       7564920        94527  Caucasian   Male  [70-80)      ?   

   admission_type_id  discharge_disposition_id  admission_source_id  \
0                  1                         1                    7   
1                  1                         1                    7   

   time_in_hospital  ... _42 _43  _44  _45  _46  change  diabetesMed  \
0                 6  ...  No  No   No   No   No      No          Yes   
1                 2  ...  No  No   No   No   No      No          Yes   

   readmitted                                      combined_info  \
0          NO  Encounter_id: 7556418 Patient_nbr: 4282317 Rac...   
1         >30  Encounter_id: 7564920 Patient_nbr: 94527 Race:...   

                                           embedding  
0  [0.018956222, 0.002368126, -0.0839956, -0.0046...  
1  [0.019801792, 0.0050626164, -0.075723045, 0.00...  


Embedding:  86%|████████▌ | 429/500 [2:08:11<21:12, 17.93s/it]    


DeadlineExceeded: 504 Deadline Exceeded

In [77]:

# === Local Files Folde7r ===
LOCAL_FOLDER = "../data/data_new2" 
for file_name in os.listdir(LOCAL_FOLDER):
    file_path = os.path.join(LOCAL_FOLDER, file_name)

    # Skip non-data files
    if not file_name.lower().endswith(('.csv', '.xlsx', '.xls', '.json')):
        print(f"Skipping unsupported file: {file_name}")
        continue
    
    datasets_col = db["datasets"]
    data_col = db["records_embeddings"]

    #Check if file already uploaded
    existing = datasets_col.find_one({"file_name": file_name})
    if existing:
         print(f"⏭️ Skipping {file_name} (already uploaded)")
         #continue

    print(f"📂 Processing {file_name}...")
    # Initialize metadata tracking
    first_chunk = None
    total_rows = 0
    combined_missing = None

    # Read file into DataFrame
    try:
        if file_name.endswith(".csv"):
            chunk_iter = pd.read_csv(file_path, chunksize=500)
        elif file_name.endswith((".xlsx", ".xls")):
            chunk_iter = pd.read_excel(file_path, n_rows=500)
        elif file_name.endswith(".json"):
            chunk_iter = pd.read_json(file_path, lines=True, chunksize=500)
    except Exception as e:
        print(f"❌ Failed to read {file_name}: {e}")
        continue
    dataset_id = None

    # === Insert Metadata ===
    start_index = 1000

    # Skip the first `start_index // chunksize` chunks
    #chunks_to_embed = itertools.islice(chunk_iter, start_index // 500, None)
    
    # Now embed starting from chunk N
    #for chunk_idx, chunk in enumerate(tqdm(chunks_to_embed, desc="Embedding", initial=start_index, total=None)):    
    for chunk_idx, chunk in enumerate(chunk_iter):
        if first_chunk is None:
            first_chunk = chunk.copy()
            combined_missing = chunk.isnull().sum()
            dataset_doc = {
                "file_name": file_name,
                "upload_date": datetime.now(),
                "n_columns": chunk.shape[1],
                "columns": chunk.columns.tolist(),
                "missing_values": chunk.isnull().sum().to_dict(),
                "file_type": os.path.splitext(file_name)[-1].replace(".", ""),
                "file_path": file_path,
                "column_types": chunk.dtypes.astype(str).to_dict(),
            }
            dataset_id = datasets_col.insert_one(dataset_doc).inserted_id
            print(f"✅ Inserted metadata for {file_name}")

        total_rows += len(chunk)
    
        # === Combine all attributes ===
        chunk = combine_all_attributes(chunk, exclude_columns=[])
        print(chunk[["combined_info"]].head(2))  # preview
    
        duplicated_data = []
        for row in tqdm(chunk.itertuples(index=False), total=len(chunk), desc="Embedding"):
            duplicated_rows = get_embedding(row._asdict())
            duplicated_data.extend(duplicated_rows)

        if duplicated_data:
            df_dup = pd.DataFrame(duplicated_data)
            print(df_dup.head(2))
            try:
                total_inserted_tabular = insert_df_to_mongodb(df_dup, data_col, dataset_id)
                print(
                    f"📌 Chunk {chunk_idx + 1}: {len(df_dup)} documents inserted."
                )
            except Exception as e:
                print(f"❌ Error inserting chunk {chunk_idx + 1}: {e}")

    if dataset_id:
        datasets_col.update_one(
            {"_id": dataset_id},
            {"$set": {"n_rows": total_rows}}
        )
        print(f"✅ Finalized metadata for {file_name}: {total_rows} rows")

    # # Combine all columns into 'combined_info'
    # df = combine_all_attributes(df, exclude_columns=[])  
    # print(df[["combined_info"]].head(2))  # preview

    # # Apply the function and expand the dataset
    # duplicated_data = []
    # for _, row in tqdm(
    #     df.iterrows(),
    #     desc="Generating embeddings and duplicating rows",
    #     total=len(df),
    # ):
    #     duplicated_rows = get_embedding(row)
    #     duplicated_data.extend(duplicated_rows)

    # # Create a new DataFrame from the duplicated data
    # df = pd.DataFrame(duplicated_data)
    #print(df.head(2))

    # === Insert Data Records ===
    # Insert dataframe to mongodb
    # try:
    #     total_inserted_tabular = insert_df_to_mongodb(df, data_col, dataset_id)
    #     print(
    #         f"{file_name} data ingestion completed. Total documents inserted: {total_inserted_tabular}"
    #     )
    # except Exception as e:
    #     print(f"An error occurred while inserting {file_name}: {e}")
    #     print("Pandas version:", pd.__version__)
    # print_dataframe_info(df, {file_name})

# Final summary
print(f"\n✅ Ingestion process completed for all {total_inserted_tabular} files.")
# print("\nInsertion Summary:")
# print(
#     f"Tabular files inserted: {total_inserted_tabular if 'total_inserted_tabular' in locals() else 'Failed'}"
# )


     
        
    # data_records = df.to_dict(orient="records")
    # for record in data_records:
    #     record["dataset_id"] = dataset_id

    # if data_records:
    #     data_col.insert_many(data_records)
    #     print(f"✅ Uploaded data records for {file_name}")

📂 Processing GALLSTONE_dataset-uci_2025.xlsx...
❌ Failed to read GALLSTONE_dataset-uci_2025.xlsx: read_excel() got an unexpected keyword argument 'n_rows'
⏭️ Skipping berm_hipdata.csv (already uploaded)
📂 Processing berm_hipdata.csv...
✅ Inserted metadata for berm_hipdata.csv
                                       combined_info
0  Diabetes_status: T1D Age: 41.7303217 B_cells_p...
1  Diabetes_status: T1D Age: 20.62422998 B_cells_...


Embedding: 100%|██████████| 500/500 [2:18:10<00:00, 16.58s/it]  


  Diabetes_Status        Age  B_cells_pct_of_Lymphocytes  \
0             T1D  41.730322                        5.02   
1             T1D  41.730322                        5.02   

   Transitional_pct_of_B_cells  Naive_pct_of_B_cells  \
0                         5.22                  68.3   
1                         5.22                  68.3   

   Nonnegclassnegswitched_Memory_pct_of_B_cells  \
0                                          15.7   
1                                          15.7   

   Classnegswitched_Memory_pct_of_B_cells    _7  MNC_pct_of_Leukocytes  \
0                                    10.7  0.15                   42.2   
1                                    10.7  0.15                   42.2   

   Granulocyte_pct_of_Leukocytes  ...  _186  _187  _188  _189  _190  _191  \
0                           54.5  ...   6.0   1.8   0.5  2.89  1.74   0.3   
1                           54.5  ...   6.0   1.8   0.5  2.89  1.74   0.3   

   _192  _193                            

Embedding: 100%|██████████| 302/302 [3:30:06<00:00, 41.74s/it]    


  Diabetes_Status       Age  B_cells_pct_of_Lymphocytes  \
0             FDR  14.42026                         9.6   
1             FDR  14.42026                         9.6   

   Transitional_pct_of_B_cells  Naive_pct_of_B_cells  \
0                         1.56                  70.3   
1                         1.56                  70.3   

   Nonnegclassnegswitched_Memory_pct_of_B_cells  \
0                                          10.7   
1                                          10.7   

   Classnegswitched_Memory_pct_of_B_cells     _7  MNC_pct_of_Leukocytes  \
0                                    17.3  0.159                   50.3   
1                                    17.3  0.159                   50.3   

   Granulocyte_pct_of_Leukocytes  ...  _186  _187  _188  _189  _190  _191  \
0                           39.8  ...   NaN   NaN   NaN   NaN   NaN   NaN   
1                           39.8  ...   NaN   NaN   NaN   NaN   NaN   NaN   

   _192  _193                            

Embedding: 100%|██████████| 500/500 [17:52<00:00,  2.15s/it]  


   Pregnancies  Glucose  BloodPressure  SkinThickness  Insulin   BMI  \
0            6      148             72             35        0  33.6   
1            1       85             66             29        0  26.6   

   DiabetesPedigreeFunction  Age  Outcome  \
0                     0.627   50        1   
1                     0.351   31        0   

                                       combined_info  \
0  Pregnancies: 6.0 Glucose: 148.0 Bloodpressure:...   
1  Pregnancies: 1.0 Glucose: 85.0 Bloodpressure: ...   

                                           embedding  
0  [-0.0048858738, 0.0013947599, -0.0469905, 0.03...  
1  [0.0037936354, 0.0011160112, -0.044843256, 0.0...  
Inserted batch 1: 500 documents
📌 Chunk 1: 500 documents inserted.
                                         combined_info
500  Pregnancies: 2.0 Glucose: 117.0 Bloodpressure:...
501  Pregnancies: 3.0 Glucose: 84.0 Bloodpressure: ...


Embedding: 100%|██████████| 268/268 [08:27<00:00,  1.90s/it]


   Pregnancies  Glucose  BloodPressure  SkinThickness  Insulin   BMI  \
0            2      117             90             19       71  25.2   
1            3       84             72             32        0  37.2   

   DiabetesPedigreeFunction  Age  Outcome  \
0                     0.313   21        0   
1                     0.267   28        0   

                                       combined_info  \
0  Pregnancies: 2.0 Glucose: 117.0 Bloodpressure:...   
1  Pregnancies: 3.0 Glucose: 84.0 Bloodpressure: ...   

                                           embedding  
0  [-0.0038131082, -0.004052424, -0.04017623, 0.0...  
1  [0.0046240217, -0.00027125346, -0.044360377, 0...  
Inserted batch 1: 268 documents
📌 Chunk 2: 268 documents inserted.
✅ Finalized metadata for diabetes.csv: 768 rows

✅ Ingestion process completed for all 268 files.


In [78]:

# === Local Files Folde7r ===
LOCAL_FOLDER = "../data/arc" 
for file_name in os.listdir(LOCAL_FOLDER):
    file_path = os.path.join(LOCAL_FOLDER, file_name)

    # Skip non-data files
    if not file_name.lower().endswith(('.csv', '.xlsx', '.xls', '.json')):
        print(f"Skipping unsupported file: {file_name}")
        continue
    
    datasets_col = db["datasets"]
    data_col = db["records_embeddings"]

    #Check if file already uploaded
    existing = datasets_col.find_one({"file_name": file_name})
    if existing:
         print(f"⏭️ Skipping {file_name} (already uploaded)")
         #continue

    print(f"📂 Processing {file_name}...")
    # Initialize metadata tracking
    first_chunk = None
    total_rows = 0
    combined_missing = None

    # Read file into DataFrame
    try:
        if file_name.endswith(".csv"):
            chunk_iter = pd.read_csv(file_path, chunksize=500)
        elif file_name.endswith((".xlsx", ".xls")):
            chunk_iter = pd.read_excel(file_path, chunksize=500)
        elif file_name.endswith(".json"):
            chunk_iter = pd.read_json(file_path, lines=True, chunksize=500)
    except Exception as e:
        print(f"❌ Failed to read {file_name}: {e}")
        continue
    dataset_id = None

    # === Insert Metadata ===
    start_index = 500

    # Skip the first `start_index // chunksize` chunks
    chunks_to_embed = itertools.islice(chunk_iter, start_index // 500, None)
    
    # Now embed starting from chunk N
    #for chunk_idx, chunk in enumerate(chunk_iter):
    for chunk_idx, chunk in enumerate(tqdm(chunks_to_embed, desc="Embedding", initial=start_index, total=None)):        
        if first_chunk is None:
            first_chunk = chunk.copy()
            combined_missing = chunk.isnull().sum()
            dataset_doc = {
                "file_name": file_name,
                "upload_date": datetime.now(),
                "n_columns": chunk.shape[1],
                "columns": chunk.columns.tolist(),
                "missing_values": chunk.isnull().sum().to_dict(),
                "file_type": os.path.splitext(file_name)[-1].replace(".", ""),
                "file_path": file_path,
                "column_types": chunk.dtypes.astype(str).to_dict(),
            }
            dataset_id = datasets_col.insert_one(dataset_doc).inserted_id
            print(f"✅ Inserted metadata for {file_name}")

        total_rows += len(chunk)
    
        # === Combine all attributes ===
        chunk = combine_all_attributes(chunk, exclude_columns=[])
        print(chunk[["combined_info"]].head(2))  # preview
    
        duplicated_data = []
        for row in tqdm(chunk.itertuples(index=False), total=len(chunk), desc="Embedding"):
            duplicated_rows = get_embedding(row._asdict())
            duplicated_data.extend(duplicated_rows)

        if duplicated_data:
            df_dup = pd.DataFrame(duplicated_data)
            print(df_dup.head(2))
            try:
                total_inserted_tabular = insert_df_to_mongodb(df_dup, data_col, dataset_id)
                print(
                    f"📌 Chunk {chunk_idx + 1}: {len(df_dup)} documents inserted."
                )
            except Exception as e:
                print(f"❌ Error inserting chunk {chunk_idx + 1}: {e}")

    if dataset_id:
        datasets_col.update_one(
            {"_id": dataset_id},
            {"$set": {"n_rows": total_rows}}
        )
        print(f"✅ Finalized metadata for {file_name}: {total_rows} rows")

    # # Combine all columns into 'combined_info'
    # df = combine_all_attributes(df, exclude_columns=[])  
    # print(df[["combined_info"]].head(2))  # preview

    # # Apply the function and expand the dataset
    # duplicated_data = []
    # for _, row in tqdm(
    #     df.iterrows(),
    #     desc="Generating embeddings and duplicating rows",
    #     total=len(df),
    # ):
    #     duplicated_rows = get_embedding(row)
    #     duplicated_data.extend(duplicated_rows)

    # # Create a new DataFrame from the duplicated data
    # df = pd.DataFrame(duplicated_data)
    #print(df.head(2))

    # === Insert Data Records ===
    # Insert dataframe to mongodb
    # try:
    #     total_inserted_tabular = insert_df_to_mongodb(df, data_col, dataset_id)
    #     print(
    #         f"{file_name} data ingestion completed. Total documents inserted: {total_inserted_tabular}"
    #     )
    # except Exception as e:
    #     print(f"An error occurred while inserting {file_name}: {e}")
    #     print("Pandas version:", pd.__version__)
    # print_dataframe_info(df, {file_name})

# Final summary
print(f"\n✅ Ingestion process completed for all {total_inserted_tabular} files.")
# print("\nInsertion Summary:")
# print(
#     f"Tabular files inserted: {total_inserted_tabular if 'total_inserted_tabular' in locals() else 'Failed'}"
# )


     
        
    # data_records = df.to_dict(orient="records")
    # for record in data_records:
    #     record["dataset_id"] = dataset_id

    # if data_records:
    #     data_col.insert_many(data_records)
    #     print(f"✅ Uploaded data records for {file_name}")

⏭️ Skipping diabetes_012_health_indicators_BRFSS2015.csv (already uploaded)
📂 Processing diabetes_012_health_indicators_BRFSS2015.csv...


Embedding: 500it [00:00, ?it/s]

✅ Inserted metadata for diabetes_012_health_indicators_BRFSS2015.csv
                                         combined_info
500  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...
501  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [15:10<00:00,  1.82s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       0.0        1.0  20.0     0.0     0.0   
1           2.0     1.0       0.0        1.0  33.0     0.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           0.0     1.0  ...      3.0       3.0   
1                   1.0           1.0     0.0  ...      3.0       0.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       3.0       0.0  0.0   6.0        6.0     8.0   
1      10.0       1.0  1.0  12.0        6.0     2.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.020954559, -0.013000866, -0.07458009, 0.000...  
1  [0.021339381, -0.019570077, -0.07782391, 0.001...  

[2 rows x 24 columns]


Embedding: 501it [15:14, 914.21s/it]

Inserted batch 1: 500 documents
📌 Chunk 1: 500 documents inserted.
                                          combined_info
1000  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...
1001  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [15:03<00:00,  1.81s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       0.0        1.0  25.0     0.0     0.0   
1           0.0     1.0       1.0        1.0  27.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      2.0       2.0   
1                   0.0           1.0     1.0  ...      2.0       0.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       0.0       0.0  0.0   9.0        4.0     7.0   
1       0.0       0.0  0.0  12.0        4.0     3.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.02623243, -0.010038114, -0.077802695, 0.002...  
1  [0.019862829, -0.016088093, -0.08245769, -0.00...  

[2 rows x 24 columns]


Embedding: 502it [30:19, 908.91s/it]

Inserted batch 1: 500 documents
📌 Chunk 2: 500 documents inserted.
                                          combined_info
1500  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...
1501  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [15:06<00:00,  1.81s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     1.0       0.0        1.0  38.0     0.0     0.0   
1           2.0     1.0       1.0        1.0  29.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     0.0  ...      3.0       0.0   
1                   1.0           1.0     1.0  ...      3.0       0.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       2.0       0.0  0.0   8.0        5.0     4.0   
1       0.0       1.0  1.0  13.0        5.0     8.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.02323466, -0.01180313, -0.077059835, -0.002...  
1  [0.020149186, -0.023070963, -0.07817461, 0.000...  

[2 rows x 24 columns]


Embedding: 503it [45:29, 909.40s/it]

Inserted batch 1: 500 documents
📌 Chunk 3: 500 documents inserted.
                                          combined_info
2000  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...
2001  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [15:06<00:00,  1.81s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       0.0        1.0  24.0     0.0     0.0   
1           0.0     0.0       0.0        0.0  39.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      1.0       0.0   
1                   0.0           0.0     0.0  ...      3.0       0.0   

   PhysHlth  DiffWalk  Sex  Age  Education  Income  \
0       0.0       0.0  1.0  8.0        4.0     8.0   
1       0.0       0.0  0.0  7.0        5.0     1.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.025745455, -0.0100526195, -0.07606417, 0.00...  
1  [0.022724079, -0.014071822, -0.07715163, -0.00...  

[2 rows x 24 columns]


Embedding: 504it [1:00:39, 909.71s/it]

Inserted batch 1: 500 documents
📌 Chunk 4: 500 documents inserted.
                                          combined_info
2500  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...
2501  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [15:01<00:00,  1.80s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     1.0       0.0        0.0  40.0     1.0     0.0   
1           0.0     1.0       1.0        1.0  23.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     0.0  ...      3.0      30.0   
1                   0.0           1.0     1.0  ...      2.0       0.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       2.0       0.0  0.0   3.0        4.0     5.0   
1       0.0       1.0  1.0  10.0        4.0     6.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.024271738, -0.019028971, -0.077365614, -0.0...  
1  [0.020461898, -0.01638552, -0.07490797, 0.0022...  

[2 rows x 24 columns]


Embedding: 505it [1:15:44, 908.13s/it]

Inserted batch 1: 500 documents
📌 Chunk 5: 500 documents inserted.
                                          combined_info
3000  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...
3001  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [15:00<00:00,  1.80s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     1.0       1.0        1.0  24.0     0.0     0.0   
1           2.0     1.0       1.0        1.0  25.0     0.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      1.0       3.0   
1                   1.0           1.0     0.0  ...      4.0       0.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       0.0       0.0  0.0  11.0        6.0     8.0   
1      20.0       0.0  0.0  11.0        6.0     6.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...   
1  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.021432409, -0.013125011, -0.07944139, -0.00...  
1  [0.022299005, -0.020816423, -0.075494304, 0.00...  

[2 rows x 24 columns]


Embedding: 506it [1:30:49, 906.81s/it]

Inserted batch 1: 500 documents
📌 Chunk 6: 500 documents inserted.
                                          combined_info
3500  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...
3501  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [15:07<00:00,  1.81s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     1.0       0.0        1.0  25.0     1.0     0.0   
1           0.0     1.0       0.0        1.0  22.0     0.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      3.0       0.0   
1                   0.0           1.0     0.0  ...      3.0       0.0   

   PhysHlth  DiffWalk  Sex  Age  Education  Income  \
0       7.0       0.0  1.0  8.0        6.0     8.0   
1       2.0       0.0  1.0  8.0        5.0     3.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.021391436, -0.015417598, -0.07517676, 0.000...  
1  [0.02419361, -0.010329009, -0.076397695, -0.00...  

[2 rows x 24 columns]


Embedding: 507it [1:46:02, 909.09s/it]

Inserted batch 1: 500 documents
📌 Chunk 7: 500 documents inserted.
                                          combined_info
4000  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...
4001  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [15:09<00:00,  1.82s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       1.0        1.0  27.0     0.0     0.0   
1           0.0     0.0       1.0        1.0  37.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     0.0  ...      2.0       0.0   
1                   1.0           0.0     1.0  ...      2.0       0.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       0.0       0.0  0.0   3.0        4.0     7.0   
1       2.0       0.0  1.0  10.0        6.0     8.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.024849609, -0.012472721, -0.076638, 3.52983...  
1  [0.021583777, -0.014947215, -0.078158066, -0.0...  

[2 rows x 24 columns]


Embedding: 508it [2:01:16, 910.59s/it]

Inserted batch 1: 500 documents
📌 Chunk 8: 500 documents inserted.
                                          combined_info
4500  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 0.0 Ch...
4501  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [15:13<00:00,  1.83s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           2.0     1.0       0.0        1.0  31.0     1.0     0.0   
1           2.0     1.0       0.0        0.0  36.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     0.0  ...      3.0       0.0   
1                   0.0           1.0     0.0  ...      5.0      30.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       7.0       0.0  1.0  12.0        6.0     6.0   
1      30.0       1.0  0.0   9.0        5.0     2.0   

                                       combined_info  \
0  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.021027742, -0.021490235, -0.07777866, -0.00...  
1  [0.025439246, -0.018739127, -0.07689765, 0.000...  

[2 rows x 24 columns]


Embedding: 509it [2:16:34, 912.82s/it]

Inserted batch 1: 500 documents
📌 Chunk 9: 500 documents inserted.
                                          combined_info
5000  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...
5001  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [15:02<00:00,  1.81s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     1.0       0.0        1.0  30.0     0.0     0.0   
1           0.0     1.0       1.0        1.0  29.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      3.0       0.0   
1                   0.0           1.0     0.0  ...      2.0       0.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       0.0       0.0  1.0  11.0        4.0     6.0   
1       0.0       0.0  0.0  10.0        4.0     8.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.020579929, -0.014105658, -0.0814815, -0.001...  
1  [0.021050557, -0.015296114, -0.076930575, 0.00...  

[2 rows x 24 columns]


Embedding: 510it [2:31:40, 910.84s/it]

Inserted batch 1: 500 documents
📌 Chunk 10: 500 documents inserted.
                                          combined_info
5500  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...
5501  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [15:01<00:00,  1.80s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       0.0        1.0  18.0     0.0     0.0   
1           0.0     0.0       1.0        1.0  32.0     0.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      1.0       2.0   
1                   0.0           1.0     1.0  ...      3.0       1.0   

   PhysHlth  DiffWalk  Sex  Age  Education  Income  \
0       2.0       0.0  0.0  3.0        6.0     7.0   
1       1.0       0.0  0.0  7.0        6.0     8.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.023904279, -0.009723195, -0.07740682, 0.001...  
1  [0.024270702, -0.009409835, -0.07504707, 0.000...  

[2 rows x 24 columns]


Embedding: 511it [2:46:47, 909.44s/it]

Inserted batch 1: 500 documents
📌 Chunk 11: 500 documents inserted.
                                          combined_info
6000  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 1.0 Ch...
6001  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [14:56<00:00,  1.79s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           2.0     1.0       1.0        1.0  40.0     1.0     0.0   
1           0.0     1.0       0.0        1.0  29.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      3.0       0.0   
1                   0.0           1.0     0.0  ...      3.0       2.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       0.0       1.0  0.0  10.0        5.0     5.0   
1       0.0       0.0  1.0  10.0        5.0     8.0   

                                       combined_info  \
0  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 1.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.021072999, -0.022104833, -0.07585906, -0.00...  
1  [0.021860268, -0.014979606, -0.07674988, 0.000...  

[2 rows x 24 columns]


Embedding: 512it [3:01:47, 906.74s/it]

Inserted batch 1: 500 documents
📌 Chunk 12: 500 documents inserted.
                                          combined_info
6500  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...
6501  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [1:30:24<00:00, 10.85s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     1.0       0.0        1.0  29.0     0.0     0.0   
1           0.0     0.0       0.0        1.0  22.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      2.0      10.0   
1                   0.0           1.0     1.0  ...      2.0       0.0   

   PhysHlth  DiffWalk  Sex  Age  Education  Income  \
0       0.0       0.0  1.0  7.0        5.0     8.0   
1       0.0       0.0  0.0  9.0        5.0     5.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.022379091, -0.010802044, -0.07558755, 0.003...  
1  [0.025000542, -0.012414336, -0.07771204, 0.002...  

[2 rows x 24 columns]


Embedding: 513it [4:32:14, 2276.09s/it]

Inserted batch 1: 500 documents
📌 Chunk 13: 500 documents inserted.
                                          combined_info
7000  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...
7001  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [15:12<00:00,  1.83s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       0.0        1.0  26.0     1.0     0.0   
1           0.0     0.0       0.0        1.0  30.0     0.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      2.0       0.0   
1                   0.0           1.0     1.0  ...      3.0       0.0   

   PhysHlth  DiffWalk  Sex  Age  Education  Income  \
0       0.0       0.0  1.0  1.0        4.0     8.0   
1       1.0       0.0  0.0  5.0        6.0     8.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.02264808, -0.011972407, -0.075907245, 0.000...  
1  [0.023667276, -0.012453735, -0.07615579, 0.000...  

[2 rows x 24 columns]


Embedding: 514it [4:47:30, 1865.21s/it]

Inserted batch 1: 500 documents
📌 Chunk 14: 500 documents inserted.
                                          combined_info
7500  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 0.0 Ch...
7501  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [14:52<00:00,  1.78s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           2.0     1.0       0.0        1.0  36.0     0.0     0.0   
1           0.0     1.0       1.0        1.0  36.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           0.0     1.0  ...      2.0       0.0   
1                   0.0           1.0     0.0  ...      3.0       0.0   

   PhysHlth  DiffWalk  Sex  Age  Education  Income  \
0       4.0       0.0  1.0  9.0        6.0     8.0   
1       0.0       1.0  1.0  9.0        6.0     8.0   

                                       combined_info  \
0  Diabetes_012: 2.0 Highbp: 1.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.022964716, -0.0180326, -0.07549729, -0.0014...  
1  [0.022559827, -0.014871327, -0.07770404, -0.00...  

[2 rows x 24 columns]


Embedding: 515it [5:02:24, 1572.54s/it]

Inserted batch 1: 500 documents
📌 Chunk 15: 500 documents inserted.
                                          combined_info
8000  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...
8001  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [31:48<00:00,  3.82s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       1.0        1.0  33.0     0.0     0.0   
1           0.0     0.0       0.0        1.0  23.0     0.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      3.0       0.0   
1                   0.0           1.0     1.0  ...      2.0       0.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       0.0       0.0  0.0  10.0        4.0     8.0   
1       0.0       0.0  0.0   4.0        5.0     8.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.023811907, -0.0103704175, -0.07728882, 0.00...  
1  [0.02427085, -0.00991575, -0.07487146, 0.00229...  

[2 rows x 24 columns]


Embedding: 516it [5:34:18, 1675.19s/it]

Inserted batch 1: 500 documents
📌 Chunk 16: 500 documents inserted.
                                          combined_info
8500  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...
8501  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [54:59<00:00,  6.60s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       0.0        1.0  32.0     0.0     0.0   
1           0.0     0.0       0.0        0.0  23.0     0.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     0.0  ...      1.0       0.0   
1                   1.0           1.0     0.0  ...      2.0      15.0   

   PhysHlth  DiffWalk  Sex   Age  Education  Income  \
0       0.0       0.0  0.0  10.0        6.0     4.0   
1       0.0       0.0  1.0  10.0        6.0     5.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.025151163, -0.009721973, -0.07913976, -0.00...  
1  [0.02159506, -0.01475258, -0.07843608, 0.00333...  

[2 rows x 24 columns]


Embedding: 517it [6:29:19, 2164.22s/it]

Inserted batch 1: 500 documents
📌 Chunk 17: 500 documents inserted.
                                          combined_info
9000  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...
9001  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...


Embedding: 100%|██████████| 500/500 [15:00<00:00,  1.80s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     1.0       1.0        1.0  26.0     0.0     0.0   
1           0.0     0.0       1.0        1.0  24.0     1.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     0.0  ...      2.0      15.0   
1                   1.0           1.0     0.0  ...      3.0       0.0   

   PhysHlth  DiffWalk  Sex  Age  Education  Income  \
0       0.0       0.0  0.0  8.0        6.0     8.0   
1       0.0       0.0  1.0  9.0        6.0     7.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 1.0 Highchol: 1.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...   

                                           embedding  
0  [0.021963626, -0.013149153, -0.07634742, 0.001...  
1  [0.024372015, -0.016603777, -0.07668463, -0.00...  

[2 rows x 24 columns]


Embedding: 518it [6:44:23, 1785.52s/it]

Inserted batch 1: 500 documents
📌 Chunk 18: 500 documents inserted.
                                          combined_info
9500  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...
9501  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...


Embedding: 100%|██████████| 500/500 [14:55<00:00,  1.79s/it]


   Diabetes_012  HighBP  HighChol  CholCheck   BMI  Smoker  Stroke  \
0           0.0     0.0       1.0        1.0  35.0     0.0     0.0   
1           0.0     0.0       0.0        1.0  28.0     0.0     0.0   

   HeartDiseaseorAttack  PhysActivity  Fruits  ...  GenHlth  MentHlth  \
0                   0.0           1.0     1.0  ...      4.0       0.0   
1                   0.0           0.0     1.0  ...      2.0       0.0   

   PhysHlth  DiffWalk  Sex  Age  Education  Income  \
0       0.0       0.0  0.0  7.0        5.0     3.0   
1       0.0       0.0  0.0  6.0        5.0     5.0   

                                       combined_info  \
0  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 1.0 Ch...   
1  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...   

                                           embedding  
0  [0.0250642, -0.011924047, -0.07749991, -0.0003...  
1  [0.023737352, -0.01159393, -0.07697324, 0.0018...  

[2 rows x 24 columns]


Embedding: 519it [6:59:21, 1518.97s/it]

Inserted batch 1: 500 documents
📌 Chunk 19: 500 documents inserted.
                                           combined_info
10000  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...
10001  Diabetes_012: 0.0 Highbp: 0.0 Highchol: 0.0 Ch...


Embedding:  81%|████████  | 404/500 [12:09<02:53,  1.80s/it]
Embedding: 519it [7:11:31, 1362.72s/it]


KeyboardInterrupt: 

In [19]:
import pandas as pd
from pymongo.errors import BulkWriteError


def insert_df_to_mongodb(df, collection,dataset_id,  batch_size=1000):
    """
    Insert a pandas DataFrame into a MongoDB collection.

    Parameters:
    df (pandas.DataFrame): The DataFrame to insert
    collection (pymongo.collection.Collection): The MongoDB collection to insert into
    batch_size (int): Number of documents to insert in each batch

    Returns:
    int: Number of documents successfully inserted
    """
    total_inserted = 0

    # Convert DataFrame to list of dictionaries
    records = df.to_dict("records")

    for record in records:
        record["dataset_id"] = dataset_id

    # Insert in batches
    for i in range(0, len(records), batch_size):
        batch = records[i : i + batch_size]
        try:
            result = collection.insert_many(batch, ordered=False)
            total_inserted += len(result.inserted_ids)
            print(
                f"Inserted batch {i//batch_size + 1}: {len(result.inserted_ids)} documents"
            )
        except BulkWriteError as bwe:
            total_inserted += bwe.details["nInserted"]
            print(
                f"Batch {i//batch_size + 1} partially inserted. {bwe.details['nInserted']} inserted, {len(bwe.details['writeErrors'])} failed."
            )

    return total_inserted

In [28]:
def print_dataframe_info(df, df_name):
    print(f"\n{df_name} DataFrame info:")
    print(df.info())
    print(f"\nFirst few rows of the {df_name} DataFrame:")
    print(df.head())

In [20]:
import tiktoken

In [21]:
def num_tokens_from_string(string: str, encoding_name: str = "cl100k_base") -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.get_encoding(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens


In [22]:
MAX_TOKENS = 512      # Good default for embeddings
OVERLAP = 50          # Helps with context continuity


In [23]:
def chunk_text(text, max_tokens=MAX_TOKENS, overlap=OVERLAP):
    """
    Split the text into overlapping chunks based on token count.
    """
    encoding = tiktoken.get_encoding("cl100k_base")
    tokens = encoding.encode(text)
    chunks = []
    for i in range(0, len(tokens), max_tokens - overlap):
        chunk_tokens = tokens[i : i + max_tokens]
        chunk = encoding.decode(chunk_tokens)
        chunks.append(chunk)
    return chunks

In [40]:
import time

In [45]:
import google.generativeai as genai

In [76]:
def get_embedding(input_data, model=GEMINI_EMBEDDING_MODEL):
    """
    Generate embeddings for the 'combined_attributes' column and duplicate the row for each chunk
    or generate embeddings for a given string.
    """
    if isinstance(input_data, str):
        text = input_data
    else:
        text = input_data["combined_info"]

    if not text.strip():
        print("Attempted to get embedding for empty text.")
        return []

    # Split text into chunks if it's too long
    chunks = chunk_text(text)

    # Embed each chunk
    chunk_embeddings = []
    for chunk in chunks:
        time.sleep(1.5)  # 1.5 seconds delay per request
        chunk = chunk.replace("\n", " ")
        response = genai.embed_content(
                model=model,
                content=chunk,
                task_type="retrieval_document",  # optional, depends on your use case
                request_options={"timeout": 60}  # increase timeout in seconds
            )
        embedding = response["embedding"]
        #embedding = embedding_model.embed_query(text=chunk)
        chunk_embeddings.append(embedding)

    if isinstance(input_data, str):
        # Return list of embeddings for string input
        return chunk_embeddings[0]
    # Create duplicated rows for each chunk with the respective embedding for row input
    duplicated_rows = []
    for embedding in chunk_embeddings:
        new_row = input_data.copy()
        new_row["embedding"] = embedding
        duplicated_rows.append(new_row)
    return duplicated_rows

## Create a vector search index

In [25]:
# Define the vector search index definition
vector_search_index_definition = {
    "mappings": {
        "dynamic": True,
        "fields": {
            "embedding": {
                "dimensions": 256,
                "similarity": "cosine",
                "type": "knnVector",
            },
            "recordId": {"type": "string"},
        },
    }
}

In [26]:
# Programmatically create vector search index for both colelctions
from pymongo.operations import SearchIndexModel


def setup_vector_search_index_with_filter(
    collection, index_definition, index_name="vector_index_with_filter"
):
    """
    Setup a vector search index for a MongoDB collection.

    Args:
    collection: MongoDB collection object
    index_definition: Dictionary containing the index definition
    index_name: Name of the index (default: "vector_index_with_filter")
    """
    new_vector_search_index_model = SearchIndexModel(
        definition=index_definition,
        name=index_name,
    )

    # Create the new index
    try:
        result = collection.create_search_index(model=new_vector_search_index_model)
        print(f"Creating index '{index_name}'...")
        # time.sleep(20)  # Sleep for 20 seconds
        print(f"New index '{index_name}' created successfully:", result)
    except Exception as e:
        print(f"Error creating new vector search index '{index_name}': {e!s}")

In [34]:
setup_vector_search_index_with_filter(
    collection, vector_search_index_definition
)

In [35]:
def vector_search(user_query, collection):
    """
    Perform a vector search in the MongoDB collection based on the user query.

    Args:
    user_query (str): The user's query string.
    collection (MongoCollection): The MongoDB collection to search.

    Returns:
    list: A list of matching documents.
    """

    # Generate embedding for the user query
    query_embedding = get_embedding(user_query)

    if query_embedding is None:
        return "Invalid query or embedding generation failed."

    # Define the vector search pipeline
    vector_search_stage = {
        "$vectorSearch": {
            "index": "vector_index_with_filter",
            "queryVector": query_embedding,
            "path": "embedding",
            "numCandidates": 150,  # Number of candidate matches to consider
            "limit": 5,  # Return top 4 matches
        }
    }

    unset_stage = {
        "$unset": "embedding"  # Exclude the 'embedding' field from the results
    }

    project_stage = {
        "$project": {
            "_id": 0,  # Exclude the _id field,
            "combined_info": 1,
            "score": {
                "$meta": "vectorSearchScore"  # Include the search score
            },
        }
    }

    pipeline = [vector_search_stage, unset_stage, project_stage]

    # Execute the search
    results = collection.aggregate(pipeline)
    return list(results)

In [36]:
def get_vector_search_result(query, collection):
    get_knowledge = vector_search(query, collection)
    search_results = []
    for result in get_knowledge:
        search_results.append(
            [result.get("score", "N/A"), result.get("combined_info", "N/A")]
        )
    return search_results

In [None]:
import tabulate

query = "Get me a record having high blood sugar"
source_information = get_vector_search_result(query, collection)

table_headers = ["Similarity Score", "Combined Information"]
table = tabulate.tabulate(source_information, headers=table_headers, tablefmt="grid")

combined_information = f"""Query: {query}

Continue to answer the query by using the Search Results:

{table}
"""

print(combined_information)

In [None]:
%pip install --quiet -U langchain langchain_mongodb langgraph langsmith motor langchain_anthropic # langchain-groq
     

In [None]:
# Programatically create search indexes


def create_collection_search_index(collection, index_definition, index_name):
    """
    Create a search index for a MongoDB Atlas collection.

    Args:
    collection: MongoDB collection object
    index_definition: Dictionary defining the index mappings
    index_name: String name for the index

    Returns:
    str: Result of the index creation operation
    """

    try:
        search_index_model = SearchIndexModel(
            definition=index_definition, name=index_name
        )

        result = collection.create_search_index(model=search_index_model)
        print(f"Search index '{index_name}' created successfully")
        return result
    except Exception as e:
        print(f"Error creating search index: {e!s}")
        return None


In [None]:
def print_collection_search_indexes(collection):
    """
    Print all search indexes for a given collection.

    Args:
    collection: MongoDB collection object
    """
    print(f"\nSearch indexes for collection '{collection.name}':")
    for index in collection.list_search_indexes():
        print(f"Index: {index['name']}")
     

In [None]:
collection_text_index_definition = {
    "mappings": {
        "dynamic": True,
        "fields": {
            "title": {"type": "string"},
            "description": {"type": "string"},
            "category": {"type": "string"},
            "steps.description": {"type": "string"},
        },
    }
}

create_collection_search_index(
    collection,
    collection_text_index_definition,
    "text_search_index",
)

# Print all indexes in the collection
print_collection_search_indexes(collection)
     

In [None]:
from langchain_mongodb import MongoDBAtlasVectorSearch
from langchain_mongodb.retrievers import MongoDBAtlasHybridSearchRetriever


In [None]:

ATLAS_VECTOR_SEARCH_INDEX = "csv_vector_index_with_filter"

In [None]:
# Vector Stores Intialisation
vector_store_csv_files = MongoDBAtlasVectorSearch.from_connection_string(
    connection_string=MONGO_URI,
    namespace=DB_NAME + "." + SAFETY_PROCEDURES_COLLECTION,
    embedding=embedding_model,
    index_name=ATLAS_VECTOR_SEARCH_INDEX,
    text_key="combined_info",
)

hybrid_search = MongoDBAtlasHybridSearchRetriever(
    vectorstore=vector_store_csv_files,
    search_index_name="text_search_index",
    top_k=5,
)

hybrid_search_result = hybrid_search.get_relevant_documents(query)
     


In [None]:
def hybrid_search_results_to_table(search_results):
    """
    Convert hybrid search results to a formatted markdown table.

    Args:
    search_results (list): List of Document objects containing search results

    Returns:
    str: Formatted markdown table of search results
    """
    # Extract relevant information from each result
    data = []
    for rank, doc in enumerate(search_results, start=1):
        metadata = doc.metadata
        data.append(
            {
                "Rank": rank,
                "Procedure ID": metadata["procedureId"],
                "Title": metadata["title"],
                "Category": metadata["category"],
                "Vector Score": round(metadata["vector_score"], 5),
                "Full-text Score": round(metadata["fulltext_score"], 5),
                "Total Score": round(metadata["score"], 5),
            }
        )

    # Create a DataFrame
    df = pd.DataFrame(data)

    # Generate markdown table
    table = tabulate.tabulate(df, headers="keys", tablefmt="pipe", showindex=False)

    return table

In [None]:
table = hybrid_search_results_to_table(hybrid_search_result)
print(table)
     


In [None]:
from langchain_mongodb.retrievers import MongoDBAtlasFullTextSearchRetriever

full_text_search = MongoDBAtlasFullTextSearchRetriever(
    collection=collection,
    search_index_name="text_search_index",
    search_field="description",
    top_k=5,
)
full_text_search_result = full_text_search.get_relevant_documents("Guidelines")

In [None]:
print(full_text_search_result)

## MongoDB Checkpointer

In [None]:
import pickle
from collections.abc import AsyncIterator
from contextlib import AbstractContextManager
from datetime import datetime, timezone
from types import TracebackType
from typing import Any, Dict, List, Optional, Tuple, Union

from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import (
    BaseCheckpointSaver,
    Checkpoint,
    CheckpointMetadata,
    CheckpointTuple,
    SerializerProtocol,
)
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
from motor.motor_asyncio import AsyncIOMotorClient
from typing_extensions import Self

class JsonPlusSerializerCompat(JsonPlusSerializer):
    def loads(self, data: bytes) -> Any:
        if data.startswith(b"\x80") and data.endswith(b"."):
            return pickle.loads(data)
        return super().loads(data)
    
class MongoDBSaver(AbstractContextManager, BaseCheckpointSaver):
    serde = JsonPlusSerializerCompat()

    client: AsyncIOMotorClient
    db_name: str
    collection_name: str

    def __init__(
        self,
        client: AsyncIOMotorClient,
        db_name: str,
        collection_name: str,
        *,
        serde: Optional[SerializerProtocol] = None,
    ) -> None:
        super().__init__(serde=serde)
        self.client = client
        self.db_name = db_name
        self.collection_name = collection_name
        self.collection = client[db_name][collection_name]

    def __enter__(self) -> Self:
        return self

    def __exit__(
        self,
        __exc_type: Optional[type[BaseException]],
        __exc_value: Optional[BaseException],
        __traceback: Optional[TracebackType],
    ) -> Optional[bool]:
        return True
    
    async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        if config["configurable"].get("thread_ts"):
            query = {
                "thread_id": config["configurable"]["thread_id"],
                "thread_ts": config["configurable"]["thread_ts"],
            }
        else:
            query = {"thread_id": config["configurable"]["thread_id"]}

        doc = await self.collection.find_one(query, sort=[("thread_ts", -1)])
        if doc:
            return CheckpointTuple(
                config,
                self.serde.loads(doc["checkpoint"]),
                self.serde.loads(doc["metadata"]),
                (
                    {
                        "configurable": {
                            "thread_id": doc["thread_id"],
                            "thread_ts": doc["parent_ts"],
                        }
                    }
                    if doc.get("parent_ts")
                    else None
                ),
            )
        return None
    
    async def alist(
        self,
        config: Optional[RunnableConfig],
        *,
        filter: Optional[Dict[str, Any]] = None,
        before: Optional[RunnableConfig] = None,
        limit: Optional[int] = None,
    ) -> AsyncIterator[CheckpointTuple]:
        query = {}
        if config is not None:
            query["thread_id"] = config["configurable"]["thread_id"]
        if filter:
            for key, value in filter.items():
                query[f"metadata.{key}"] = value
        if before is not None:
            query["thread_ts"] = {"$lt": before["configurable"]["thread_ts"]}

        cursor = self.collection.find(query).sort("thread_ts", -1)
        if limit:
            cursor = cursor.limit(limit)

        async for doc in cursor:
            yield CheckpointTuple(
                {
                    "configurable": {
                        "thread_id": doc["thread_id"],
                        "thread_ts": doc["thread_ts"],
                    }
                },
                self.serde.loads(doc["checkpoint"]),
                self.serde.loads(doc["metadata"]),
                (
                    {
                        "configurable": {
                            "thread_id": doc["thread_id"],
                            "thread_ts": doc["parent_ts"],
                        }
                    }
                    if doc.get("parent_ts")
                    else None
                ),
            )

    async def aput(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: Optional[dict[str, Union[str, float, int]]],
    ) -> RunnableConfig:
        doc = {
            "thread_id": config["configurable"]["thread_id"],
            "thread_ts": checkpoint["id"],
            "checkpoint": self.serde.dumps(checkpoint),
            "metadata": self.serde.dumps(metadata),
        }
        if config["configurable"].get("thread_ts"):
            doc["parent_ts"] = config["configurable"]["thread_ts"]
        await self.collection.insert_one(doc)
        return {
            "configurable": {
                "thread_id": config["configurable"]["thread_id"],
                "thread_ts": checkpoint["id"],
            }
        }
    
    # Implement synchronous methods as well for compatibility
    def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        raise NotImplementedError("Use aget_tuple for asynchronous operations")

    def list(
        self,
        config: Optional[RunnableConfig],
        *,
        filter: Optional[Dict[str, Any]] = None,
        before: Optional[RunnableConfig] = None,
        limit: Optional[int] = None,
    ):
        raise NotImplementedError("Use alist for asynchronous operations")

    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
    ) -> RunnableConfig:
        raise NotImplementedError("Use aput for asynchronous operations")

    async def aput_writes(
        self,
        config: RunnableConfig,
        writes: List[Tuple[str, Any]],
        task_id: str,
    ) -> None:
        """Asynchronously store intermediate writes linked to a checkpoint."""
        docs = []
        for channel, value in writes:
            doc = {
                "thread_id": config["configurable"]["thread_id"],
                "task_id": task_id,
                "channel": channel,
                "value": self.serde.dumps(value),
                "timestamp": datetime.now(timezone.utc).isoformat(),
            }
            docs.append(doc)

        if docs:
            await self.collection.insert_many(docs)
     
    

## Tool Definitions

In [None]:
from typing import Any, Dict

from langchain.agents import tool


@tool
def css_files_vector_search_tool(query: str, k: int = 5):
    """
    Perform a vector similarity search on safety procedures.

    Args:
        query (str): The search query string.
        k (int, optional): Number of top results to return. Defaults to 5.

    Returns:
        list: List of tuples (Document, score), where Document is a safety procedure
              and score is the similarity score (lower is more similar).

    Note:
        Uses the global vector_store_safety_procedures for the search.
    """

    vector_search_results = vector_store_safety_procedures.similarity_search_with_score(
        query=query, k=k
    )
    return vector_search_results


In [None]:
@tool
def safety_procedures_full_text_search_tool(query: str, k: int = 5):
    """
    Perform a full-text search on safety procedures.

    Args:
        query (str): The search query string.
        k (int, optional): Number of top results to return. Defaults to 5.

    Returns:
        list: Relevant safety procedure documents matching the query.
    """

    full_text_search = MongoDBAtlasFullTextSearchRetriever(
        collection=safety_procedure_collection,
        search_index_name="text_search_index",
        search_field="description",
        top_k=k,
    )

    full_text_search_result = full_text_search.get_relevant_documents(query)
    


In [None]:
@tool
def safety_procedures_hybrid_search_tool(query: str):
    """
    Perform a hybrid (vector + full-text) search on safety procedures.

    Args:
        query (str): The search query string.

    Returns:
        list: Relevant safety procedure documents from hybrid search.

    Note:
        Uses both vector_store_safety_procedures and text_search_index.
    """

    hybrid_search = MongoDBAtlasHybridSearchRetriever(
        vectorstore=vector_store_safety_procedures,
        search_index_name="text_search_index",
        top_k=5,
    )

    hybrid_search_result = hybrid_search.get_relevant_documents(query)

    return hybrid_search_result

In [None]:
from typing import List

from pydantic import BaseModel, Field


class Step(BaseModel):
    stepNumber: int = Field(..., ge=1)
    description: str


class SafetyProcedure(BaseModel):
    procedureId: str
    title: str
    description: str
    category: str
    steps: List[Step]
    lastUpdated: datetime = Field(default_factory=datetime.now)

def create_safety_procedure_document(procedure_data: dict) -> dict:
    """
    Create a new safety procedure document from a dictionary, using Pydantic for validation.

    Args:
    procedure_data (dict): Dictionary representing the new safety procedure

    Returns:
    dict: Validated and formatted safety procedure document

    Raises:
    ValidationError: If the input data doesn't match the SafetyProcedure schema
    """
    try:
        # Create a SafetyProcedure instance, which will validate the data
        safety_procedure = SafetyProcedure(**procedure_data)

        # Convert the Pydantic model to a dictionary
        document = safety_procedure.dict()

        # Ensure steps are properly numbered
        for i, step in enumerate(document["steps"], start=1):
            step["stepNumber"] = i

        return document
    except Exception as e:
        raise ValueError(f"Invalid safety procedure data: {e!s}")


In [None]:
# Tool to add new safety procedures
@tool
def create_new_safety_procedures(new_procedure: dict):
    """
    Create and validate a new safety procedure document.

    Args:
        new_procedure (dict): Dictionary containing the new safety procedure data.

    Returns:
        dict: Validated and formatted safety procedure document.

    Raises:
        ValueError: If the input data is invalid or doesn't match the required schema.

    Note:
        Uses Pydantic for data validation via create_safety_procedure_document function.
    """
    new_safety_procedure_document = create_safety_procedure_document(new_procedure)
    return new_safety_procedure_document

In [None]:
safety_procedure_collection_tools = [
    safety_procedures_vector_search_tool,
    safety_procedures_full_text_search_tool,
    safety_procedures_hybrid_search_tool,
    create_new_safety_procedures,
]


## LLM Defintion

## Agent Definition

In [None]:

from datetime import datetime

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder


def create_agent(llm, tools, system_message: str):
    """Create an agent."""

    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a helpful AI assistant, collaborating with other assistants."
                " Use the provided tools to progress towards answering the question."
                " If you are unable to fully answer, that's OK, another assistant with different tools "
                " will help where you left off. Execute what you can to make progress."
                " If you or any of the other assistants have the final answer or deliverable,"
                " prefix your response with FINAL ANSWER so the team knows to stop."
                " You have access to the following tools: {tool_names}.\n{system_message}"
                "\nCurrent time: {time}.",
            ),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )
    prompt = prompt.partial(system_message=system_message)
    prompt = prompt.partial(time=lambda: str(datetime.now()))
    prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))

    return prompt | llm.bind_tools(tools)

In [None]:
# Chatbot agent and node
toolbox = []

# Add tools
toolbox.extend(safety_procedure_collection_tools)
toolbox.extend(accident_report_collection_tools)

# Create Agent
chatbot_agent = create_agent(
    llm,
    toolbox,
    system_message="""
      You are an advanced Factory Safety Assistant Agent specializing in managing and providing information about safety procedures and accident reports in industrial settings. Your key responsibilities include:

      1. Searching and retrieving safety procedures and accident reports:
        - Use the provided search tools to find relevant safety procedures and accident reports based on user queries
        - Interpret and explain safety procedures and accident reports in detail
        - Provide context and additional information related to specific safety protocols and past incidents

      2. Creating new safety procedures and accident reports:
        - When provided with appropriate information, use the create_new_safety_procedures tool to generate new safety procedure documents
        - Use the create_new_accident_report tool to document new accidents or incidents
        - Ensure all necessary details are included in new procedures and reports

      3. Answering safety-related queries:
        - Respond to questions about safety protocols, best practices, regulations, and past incidents
        - Offer explanations and clarifications on complex safety issues
        - Provide step-by-step guidance on implementing safety procedures and handling incidents

      4. Assisting with safety compliance and incident prevention:
        - Help identify relevant safety procedures for specific tasks or situations
        - Advise on how to adhere to safety guidelines and regulations
        - Suggest improvements or updates to existing safety procedures based on past incidents
        - Analyze accident reports to identify trends and recommend preventive measures

      5. Supporting safety training and awareness:
        - Explain the importance and rationale behind safety procedures
        - Offer tips and best practices for maintaining a safe work environment
        - Help users understand the potential risks and consequences of not following safety procedures
        - Use past incident reports to illustrate the importance of safety measures

        6. Providing Structured Safety Advice:
   When users ask for safety procedures advice, provide information in the following structured format:

   Safety Procedure Advice:
   a. Relevant Procedure:
      - Title: [Procedure Title]
      - ID: [Procedure ID]
      - Description: [Brief description of the procedure]
      - Key Steps:
        1. [Step 1]
        2. [Step 2]
        3. [...]

   b. Related Incidents (Past 2 Years):
      - Incident 1:
        - IncidentID: [ID of the Incident document]
        - Date: [Date of incident]
        - Description: [Brief description of the incident]
        - Root Cause(s): [Identified root cause(s)]
      - Incident 2:
        - [Same structure as Incident 1]
      - [Additional incidents if applicable]

   c. Possible Root Causes:
      - [List of potential root causes based on the procedure and related incidents]

   d. Additional Safety Recommendations:
      - [Any extra safety tips or precautions based on the procedure and incident history]

   e. References:
      - Safety Procedure: [Reference to the specific safety procedure document]
      - Incident Reports: [References to the relevant incident reports]

When providing this structured advice:
- Use the safety procedure search tools to find the most relevant procedure.
- Utilize the accident report search tools to identify related incidents from the past two years in the same region.
- Analyze the incident reports to identify common or significant root causes.
- Provide additional recommendations based on your analysis of both the procedure and the incident history.
- Always include clear references to the source documents for both procedures and incident reports.


      When creating a new safety procedure, ensure you have all required information and use the create_new_safety_procedures tool. The required fields are:
      - procedureId
      - title
      - description
      - category
      - steps (a list of step objects, each with a stepNumber and description)

      When creating a new accident report, use the create_new_accident_report tool. Ensure you gather all necessary information about the incident.

      Provide detailed, accurate, and helpful information to support factory workers, managers, and safety officers in maintaining a safe work environment and properly documenting incidents. If you cannot find specific information or if the information requested is not available, clearly state this and offer to assist in creating a new procedure or report if appropriate.

      When discussing safety matters, always prioritize the well-being of workers and adherence to safety regulations. Use information from accident reports to reinforce the importance of following safety procedures and to suggest improvements in safety protocols.

      DO NOT MAKE UP ANY INFORMATION.
    """,
)
     

## State Definition

In [None]:
import operator
from typing import Annotated, TypedDict

from langchain_core.messages import BaseMessage


class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    sender: str

## Node Definition

In [None]:
import functools

from langchain_core.messages import AIMessage, ToolMessage


def agent_node(state, agent, name):
    result = agent.invoke(state)
    if isinstance(result, ToolMessage):
        pass
    else:
        result = AIMessage(**result.dict(exclude={"type", "name"}), name=name)
    return {
        "messages": [result],
        # track the sender so we know who to pass to next.
        "sender": name,
    }

In [None]:
from langgraph.prebuilt import ToolNode

chatbot_node = functools.partial(
    agent_node, agent=chatbot_agent, name="Diabetes Research Assistant Agent( DRAA)"
)
tool_node = ToolNode(toolbox, name="tools")
     

## Agentic Workflow Definition

In [None]:
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import tools_condition

workflow = StateGraph(AgentState)

workflow.add_node("chatbot", chatbot_node)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("chatbot")
workflow.add_conditional_edges("chatbot", tools_condition, {"tools": "tools", END: END})

workflow.add_edge("tools", "chatbot")

In [None]:
from motor.motor_asyncio import AsyncIOMotorClient

mongo_client = AsyncIOMotorClient(MONGO_URI)
mongodb_checkpointer = MongoDBSaver(mongo_client, DB_NAME, "state_store")

graph = workflow.compile(checkpointer=mongodb_checkpointer)

In [None]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

In [None]:
import re


def sanitize_name(name: str) -> str:
    """Sanitize the name to match the pattern '^[a-zA-Z0-9_-]+$'."""
    return re.sub(r"[^a-zA-Z0-9_-]", "_", name)

In [None]:
import asyncio

from langchain_core.messages import HumanMessage


async def chat_loop():
    config = {"configurable": {"thread_id": "0"}}

    while True:
        user_input = await asyncio.get_event_loop().run_in_executor(
            None, input, "User: "
        )
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break

        sanitized_name = (
            sanitize_name("Human") or "Anonymous"
        )  # Fallback if sanitized name is empty
        state = {"messages": [HumanMessage(content=user_input, name=sanitized_name)]}

        print("Assistant: ", end="", flush=True)

        max_retries = 3
        retry_delay = 1

        for attempt in range(max_retries):
            try:
                async for chunk in graph.astream(state, config, stream_mode="values"):
                    if chunk.get("messages"):
                        last_message = chunk["messages"][-1]
                        if isinstance(last_message, AIMessage):
                            last_message.name = (
                                sanitize_name(last_message.name or "AI") or "AI"
                            )
                            print(last_message.content, end="", flush=True)
                    elif isinstance(last_message, ToolMessage):
                        print(f"\n[Tool Used: {last_message.name}]")
                        print(f"Tool Call ID: {last_message.tool_call_id}")
                        print(f"Content: {last_message.content}")
                        print("Assistant: ", end="", flush=True)
                break
            except Exception as e:
                if attempt < max_retries - 1:
                    print(f"\nAn unexpected error occurred: {e!s}")
                    print(f"\nRetrying in {retry_delay} seconds...")
                    await asyncio.sleep(retry_delay)
                    retry_delay *= 2
                else:
                    print(f"\nMax retries reached. OpenAI API error: {e!s}")
                    break

        print("\n")  # New line after the complete response

In [None]:
# For Jupyter notebooks and IPython environments
import nest_asyncio

nest_asyncio.apply()

# Run the async function
await chat_loop()