## 📦 Overview for `embed_bge_m3.py`

This script loads a text embedding model, reads node data from a JSONL graph data export, computes vector embeddings in
batches, and writes the output to a 'JSONL' file for use in retrieval-augmented generation (RAG).

The 'JSONL' file is generated by `neo4j_exporter.py` and contains pre-extracted node data (ID, text, and labels)
from a Neo4j graph. This avoids the need for a live Neo4j connection at embedding time.

All configuration values such as model type, batch size, normalization, output file path, and logging are read from
`digitaiCore/config.yaml`



### 🧰 Standard Library Modules

- `os`  Handles file paths and directory operations (e.g., joining paths, checking if files exist).

- `json`  Reads and writes JSON and JSONL files (e.g., for embeddings or ID maps).

- `time`  Used to track execution duration (e.g., embedding time for logging or profiling).

- `logging`  Outputs progress, errors, and debug messages to a log file for monitoring and troubleshooting.

---

### 🤖 AI & NLP Modules

- `torch`
  PyTorch deep learning framework — required to run the BGE-M3 embedding model (supports GPU, CPU, or MPS on macOS).

- `sentence_transformers.SentenceTransformer`
  High-level API to load and run sentence-level embedding models like BGE-M3.
  Converts input text into numerical vector representations used for similarity search.

---

### 🔧 Project-Specific Module

- `digitaiCore.config_loader.ConfigLoader`
  Loads configuration from `config.yaml` with dot-notation access.
  Used to retrieve paths, model settings, batch sizes, and logging options consistently across the project.

In [None]:
import os
import json
import 💿
import torch
import logging
from sentence_transformers import SentenceTransformer
from digitaiCore.config_loader import ConfigLoader

# Initialization

## Load Config 💿

In [None]:
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) #Set path of root
config_path = os.path.join(repo_root, "digitaiCore", "config.yaml") #Set path of config.yaml
config = ConfigLoader(config_path) #Load in config

## Set Performance Parameters 🛠️
### Controls the number of threads used by PyTorch and the intervals for Hugging Face tokenizer
- This helps prevent CPU overloading 🔥 or stalls 😴 during parallel batch encoding.

In [None]:
torch.set_num_threads(config.get("performance.num_threads"))
torch.set_num_interop_threads(config.get("performance.interop_threads"))
os.environ["TOKENIZERS_PARALLELISM"] = str(config.get("performance.tokenizers_parallelism")).lower()

## Set-up Logging  📝
Logging enable and log file location are controlled via parameters set in the 'config.yaml' file
- Logging is enabled by default and is HIGHLY suggested
    - Logging tracks batch processing progress along with where errors have occured
        - Due to the immense line count in the embedding file it is incredibly hard to find errors by hand. Logging makes verification/diagnosis possible


In [None]:
if config.get("logging.enabled"):
    log_path = os.path.join(repo_root, config.get("logging.bgem3Log"))  # Uses bgem3Log from config
    os.makedirs(os.path.dirname(log_path), exist_ok=True)  # Ensure the logs/ directory exists

    logging.basicConfig(
        filename=log_path,
        level=getattr(logging, config.get("logging.level")),
        format=config.get("logging.format")
    )
    logging.info("=== Embedding Script Start ===")


# 🐍 Breaking Down the Code:

## Load in SentenceTransformer Model
The embedding model specicified in the config.yaml file will be used to convert natural language into numerical vectors.
- In our case the embedding model is `BGE-M3`

In [None]:
model_name = config.get("embedding.model")
try:
    model = SentenceTransformer(model_name)
    if config.get("logging.enabled"):
        logging.info(f"Loaded model: {model_name}")
except Exception as e:
    logging.exception("Model load failed")
    raise SystemExit(f"[FATAL] Could not load model: {e}")

## Load Node Texts from JSONL Neo4j Export
This replaces the need for querying Neo4j for every piece each time the embedding model runs
- Instead we use a static export file created by `neo4j_exporter.py`
Each line in the file must be a JSON object with `id`, `text`, and optional `labels`

## Load 💿 and validate ✅ neo4j jsonl using path specified in config

In [None]:
input_path = os.path.join(repo_root, config.get("dataPaths.neo4jExport"))
if not os.path.exists(input_path):
    raise SystemExit(f"[FATAL] Input file not found: {input_path}")

### Loop through all nodes reading and making embeddings
- Ensure all nodes contain actual body data; structural nodes are skipped
    - Logging is displayed/output if enabled

In [None]:
nodes = []
with open(input_path, "r", encoding="utf-8") as f:
    for line in f:
        record = json.loads(line)
        # Only embed nodes that contain actual body text; structure-only nodes are skipped
        if record.get("text"):
            nodes.append((record["id"], record["text"]))

if config.get("logging.enabled"):
    logging.info(f"Loaded {len(nodes)} nodes with text from {input_path}")

### If no nodes exist a failure is output and the file stops
- This prevents wasting CPU/GPU resources or generating an empty output file

In [None]:
if not nodes:
    if config.get("logging.enabled"):
        logging.error("No text nodes found in the input file. Cannot proceed.")
    raise SystemExit("[FATAL] No text nodes found in the input JSONL file. Check your export or path.")

## Initialize Embedding Model

### Read embedding settings from config.yaml and resolve output file path to avoid overwriting

In [None]:
# These are controlled through config.yaml and affect batching, normalization, and resource pacing.
batch_size = config.get("embedding.batch_size")     # Number of documents per batch
normalize = config.get("embedding.normalize")       # If True, performs L2 normalization (cosine similarity prep)
throttle = config.get("embedding.throttle")         # Optional pause between batches (in seconds)



### Resolve Output File Paths
- This avoids overwriting the original neo4j export and allows that dataset to be embedded more than once if needed

In [None]:
output_path = os.path.join(repo_root, config.get("dataPaths.bgem3Embeddings"))
os.makedirs(os.path.dirname(output_path), exist_ok=True)
print(f"[DEBUG] Writing embeddings to: {output_path}")

## Run Embedding Model
- Text is embedded in batches for efficiency (batch size is defined in the config file)
- For each batch of node texts, compute scentence embeddings
    - Write each result to a new JSONL line

`model.encode` generates embeddings using the SentenceTransformer model and returns a NumPy array which we convert to lists for JSON serialization.


In [None]:
with open(output_path, "w", encoding="utf-8") as f:
    for i in range(0, len(nodes), batch_size):
        batch = nodes[i:i + batch_size]
        ids, texts = zip(*batch)

        try:
            batch_embeddings = model.encode(
                list(texts),
                batch_size=batch_size,
                convert_to_numpy=True,
                normalize_embeddings=normalize,
                show_progress_bar=True
            )

### Write each node ID and its embedding to output file immediately (Avoids memory buildup)
- Embedding error handling:
    - Step 1: Write error to log (Optional depending on log enable)
    - Step 2: Skip to the next batch
**NOTE** In the case in which theres an error the entire rest of that batch will be skipped
<br>

- Throttle (Optional)
    - At end of each batch pause for a set amount of time
        - Manages memory and system load on slower machines

In [None]:
            for node_id, emb in zip(ids, batch_embeddings):
                json.dump({"id": node_id, "embedding": emb.tolist()}, f)
                f.write("\n")

            if config.get("logging.enabled"):
                logging.info(f"Embedded batch {i} to {i + len(batch)}")
        except Exception as e:
            if config.get("logging.enabled"):
                logging.error(f"Embedding failed for batch {i} to {i + len(batch)}: {e}")
            continue

            time.sleep(throttle)

### (If Logging Enabled) Log the script completion and file path to confirm success

In [None]:
if config.get("logging.enabled"):
    logging.info(f"Saved embeddings to {output_path}")
    logging.info("=== Embedding Script End ===")

print("✅ Embedding complete.")