In [1]:
import os
import json
import gzip
import time
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from fastembed.sparse.bm25 import Bm25

In [3]:
with open("/Users/paolocadei/Documents/Masters/Thesis/Spider2/2_final_structure_all.json", "r") as f:
    data = json.load(f)

### BM25 Sparse Embeddings

In [5]:
import os
import json
import gzip
import time
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from fastembed.sparse.bm25 import Bm25

# === SETTINGS ===
SAVE_EVERY = 5000                      # Save every N embeddings
SAVE_PATH = "4_bm25_embeddings.json.gz"  # Output path (compressed)
MAX_WORKERS = 100                      # Thread pool size
BATCH_SIZE = 100                       # Texts per batch

# === INITIALIZATION ===
bm25_embedding_model = Bm25("Qdrant/bm25")
new_embeddings = {}
total_embeddings_done = 0
texts_to_embed = []

# === NUMPY SERIALIZATION FIX ===

def convert_numpy(obj):
    """Recursively convert NumPy arrays to lists inside any structure."""
    if isinstance(obj, dict):
        return {k: convert_numpy(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy(v) for v in obj]
    elif hasattr(obj, 'tolist'):  # for np.ndarray
        return obj.tolist()
    else:
        return obj

# === SAFE LOAD FUNCTIONS ===

def safe_load_json(path):
    try:
        with gzip.open(path, 'rt', encoding='utf-8') as f:
            return json.load(f)
    except Exception:
        print(f"⚠️ Warning: {path} is corrupted or missing. Starting fresh.")
        return {}

def save_embeddings_to_file():
    tmp_save_path = SAVE_PATH + ".tmp"
    with gzip.open(tmp_save_path, 'wt', encoding='utf-8') as f:
        json.dump(convert_numpy(new_embeddings), f)
    os.replace(tmp_save_path, SAVE_PATH)

# === LOAD EXISTING EMBEDDINGS ===
if os.path.exists(SAVE_PATH):
    new_embeddings = safe_load_json(SAVE_PATH)
    print(f"🔄 Loaded existing embeddings from {SAVE_PATH}.")
else:
    print(f"🆕 No existing embeddings found. Starting fresh.")

# === EMBEDDING FUNCTION ===
def get_bm25_embeddings_batch(texts):
    """Generate BM25 sparse embeddings."""
    return list(bm25_embedding_model.passage_embed(texts))

# === GATHER TEXTS TO EMBED ===

for database in data:
    if database not in new_embeddings:
        new_embeddings[database] = {}

    for table in data[database]:
        if table not in new_embeddings[database]:
            new_embeddings[database][table] = {'grouped': {}, 'ungrouped': {}}

        # GROUPED
        for template in data[database][table].get('grouped', {}):
            if template not in new_embeddings[database][table]['grouped']:
                new_embeddings[database][table]['grouped'][template] = []

            for group_index, group_entry in enumerate(data[database][table]['grouped'][template]):
                column_descriptions = group_entry['details']['description']

                # Ensure exact group_index alignment
                while len(new_embeddings[database][table]['grouped'][template]) <= group_index:
                    new_embeddings[database][table]['grouped'][template].append({
                        'details': {'column_embeddings': {}}
                    })

                for col_name, col_description in column_descriptions.items():
                    if not col_description:
                        continue

                    already_embedded = col_name in new_embeddings[database][table]['grouped'][template][group_index]['details']['column_embeddings']
                    if already_embedded:
                        total_embeddings_done += 1
                    else:
                        texts_to_embed.append((database, table, ('grouped', template, group_index), col_name, col_description))

        # UNGROUPED
        for ungrouped_key, ungrouped_entry in data[database][table].get('ungrouped', {}).items():
            if ungrouped_key not in new_embeddings[database][table]['ungrouped']:
                new_embeddings[database][table]['ungrouped'][ungrouped_key] = {
                    'details': {'column_embeddings': {}}
                }

            column_descriptions = ungrouped_entry['details']['description']

            for col_name, col_description in column_descriptions.items():
                if not col_description:
                    continue

                already_embedded = col_name in new_embeddings[database][table]['ungrouped'][ungrouped_key]['details']['column_embeddings']
                if already_embedded:
                    total_embeddings_done += 1
                else:
                    texts_to_embed.append((database, table, ('ungrouped', ungrouped_key), col_name, col_description))

# === DISPLAY PROGRESS COUNTS ===
total_columns = total_embeddings_done + len(texts_to_embed)
print(f"\n📋 Total columns: {total_columns}")
print(f"✅ Already embedded: {total_embeddings_done}")
print(f"📝 To embed now: {len(texts_to_embed)}\n")

# === BATCHING ===
batches = [texts_to_embed[i:i + BATCH_SIZE] for i in range(0, len(texts_to_embed), BATCH_SIZE)]

print(f"📦 Total batches created: {len(batches)}")

# === PARALLEL EXECUTION ===
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
futures = []

start_time = time.time()

with tqdm(total=len(texts_to_embed), desc="Embedding columns (BM25)") as pbar:
    for batch in batches:
        texts_only = [item[4] for item in batch]
        future = executor.submit(get_bm25_embeddings_batch, texts_only)
        futures.append((future, batch))

    for future, batch in futures:
        embeddings = future.result()
        for (database, table, location, col_name, _), embedding in zip(batch, embeddings):

            emb_obj = embedding.as_object()

            if location[0] == 'grouped':
                template, index = location[1], location[2]
                new_embeddings[database][table]['grouped'][template][index]['details']['column_embeddings'][col_name] = emb_obj
            else:
                ungrouped_key = location[1]
                new_embeddings[database][table]['ungrouped'][ungrouped_key]['details']['column_embeddings'][col_name] = emb_obj

            total_embeddings_done += 1
            pbar.update(1)

            if total_embeddings_done % SAVE_EVERY == 0:
                save_embeddings_to_file()

elapsed = time.time() - start_time

# === FINAL SAVE ===
save_embeddings_to_file()

print(f"\n✅ Completed {len(texts_to_embed)} new BM25 embeddings in {elapsed:.2f} seconds.")
print(f"📈 Total embeddings now stored: {total_embeddings_done}/{total_columns}")


🔄 Loaded existing embeddings from 4_bm25_embeddings.json.gz.

📋 Total columns: 134095
✅ Already embedded: 134095
📝 To embed now: 0

📦 Total batches created: 0


Embedding columns (BM25): 0it [00:00, ?it/s]



✅ Completed 0 new BM25 embeddings in 0.00 seconds.
📈 Total embeddings now stored: 134095/134095


In [4]:
import gzip
import json

with gzip.open("bm25_embeddings.json.gz", 'rt', encoding='utf-8') as f:
    embeddings = json.load(f)

print("✅ File loaded successfully.")
print(f"🔢 Top-level databases: {list(embeddings.keys())[:3]}")


✅ File loaded successfully.
🔢 Top-level databases: ['NEW_YORK', 'SEC_QUARTERLY_FINANCIALS', 'NHTSA_TRAFFIC_FATALITIES_PLUS']


In [5]:
example = next(iter(embeddings.values()))  # First database
example_table = next(iter(example.values()))
example_grouped = example_table['grouped']
first_template = next(iter(example_grouped))
first_entry = example_grouped[first_template][0]
print("📄 Example embedding object:")
print(json.dumps(first_entry['details']['column_embeddings'], indent=2))


📄 Example embedding object:
{
  "ehail_fee": {
    "values": [
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652,
      1.6348330914368652
    ],
    "indices": [
      572975545,
      1479387819,
      1034183227,
      70273945,
      804460016,
      799141647,
      1598178082,
      292767579,
      962346254,
      94597757,
      1634657082,
      1883650607,
      2095749492
    ]
  },
  "mta_tax": {
    "values": [
      1.6477472205968404,
      1.6477472205968404,
      1.6477472205968404,
      1.6477472205968404,
      1.6477472205968404,
      1.6477472205968404,
      1.6477472205968404,
      1.6477472205968404,
      1.6477472205968404,
      1.6477472205968404
    ],
    "indices": [
      764297089,
      1181

In [6]:
missing = 0

for db in data:
    for table in data[db]:
        for group_type in ['grouped', 'ungrouped']:
            entries = data[db][table].get(group_type, {})
            for key, entry in entries.items():
                if group_type == 'grouped':
                    for i, g in enumerate(entry):
                        for col in g['details']['description']:
                            try:
                                embeddings[db][table]['grouped'][key][i]['details']['column_embeddings'][col]
                            except KeyError:
                                print(db, table, i, col)
                                missing += 1
                else:
                    for col in entry['details']['description']:
                        try:
                            embeddings[db][table]['ungrouped'][key]['details']['column_embeddings'][col]
                        except KeyError:
                            missing += 1

print(f"✅ All good!" if missing == 0 else f"⚠️ {missing} columns missing embeddings.")


✅ All good!


### Late Interaction Embeddings

Not feasible on a MacBook Pro if use of from fastembed.late_interaction import LateInteractionTextEmbedding

In [3]:
from fastembed import LateInteractionTextEmbedding

LateInteractionTextEmbedding.list_supported_models()

[{'model': 'colbert-ir/colbertv2.0',
  'sources': {'hf': 'colbert-ir/colbertv2.0',
   'url': None,
   '_deprecated_tar_struct': False},
  'model_file': 'model.onnx',
  'description': 'Late interaction model',
  'license': 'mit',
  'size_in_GB': 0.44,
  'additional_files': [],
  'dim': 128,
  'tasks': {}},
 {'model': 'answerdotai/answerai-colbert-small-v1',
  'sources': {'hf': 'answerdotai/answerai-colbert-small-v1',
   'url': None,
   '_deprecated_tar_struct': False},
  'model_file': 'vespa_colbert.onnx',
  'description': 'Text embeddings, Unimodal (text), Multilingual (~100 languages), 512 input tokens truncation, 2024 year',
  'license': 'apache-2.0',
  'size_in_GB': 0.13,
  'additional_files': [],
  'dim': 96,
  'tasks': {}},
 {'model': 'jinaai/jina-colbert-v2',
  'sources': {'hf': 'jinaai/jina-colbert-v2',
   'url': None,
   '_deprecated_tar_struct': False},
  'model_file': 'onnx/model.onnx',
  'description': 'New model that expands capabilities of colbert-v1 with multilingual and 

In [3]:
import os
import json
import gzip
import time
import numpy as np
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from fastembed.late_interaction import LateInteractionTextEmbedding

# === SETTINGS ===
SAVE_EVERY = 100
SAVE_PATH = "colbert_embeddings.json.gz"
MAX_WORKERS = 5
BATCH_SIZE = 1

# === INITIALIZATION ===
late_embedding_model = LateInteractionTextEmbedding("answerdotai/answerai-colbert-small-v1")
new_embeddings = {}
total_embeddings_done = 0
texts_to_embed = []

# === NUMPY SERIALIZATION FIX ===
def convert_numpy(obj):
    """Recursively convert NumPy arrays to lists inside any structure."""
    if isinstance(obj, dict):
        return {k: convert_numpy(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy(v) for v in obj]
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif hasattr(obj, 'tolist'):
        return obj.tolist()
    else:
        return obj

# === SAFE LOAD FUNCTIONS ===
def safe_load_json(path):
    try:
        with gzip.open(path, 'rt', encoding='utf-8') as f:
            return json.load(f)
    except Exception:
        print(f"⚠️ Warning: {path} is corrupted or missing. Starting fresh.")
        return {}

def save_embeddings_to_file():
    tmp_save_path = SAVE_PATH + ".tmp"
    with gzip.open(tmp_save_path, 'wt', encoding='utf-8') as f:
        json.dump(convert_numpy(new_embeddings), f)
    os.replace(tmp_save_path, SAVE_PATH)

# === LOAD EXISTING EMBEDDINGS ===
if os.path.exists(SAVE_PATH):
    new_embeddings = safe_load_json(SAVE_PATH)
    print(f"🔄 Loaded existing embeddings from {SAVE_PATH}.")
else:
    print(f"🆕 No existing embeddings found. Starting fresh.")

# === EMBEDDING FUNCTION ===
def get_late_interaction_embeddings_batch(texts):
    return list(late_embedding_model.passage_embed(texts))

# === GATHER TEXTS TO EMBED ===
for database in data:
    if database not in new_embeddings:
        new_embeddings[database] = {}

    for table in data[database]:
        if table not in new_embeddings[database]:
            new_embeddings[database][table] = {'grouped': {}, 'ungrouped': {}}

        # GROUPED
        for template in data[database][table].get('grouped', {}):
            if template not in new_embeddings[database][table]['grouped']:
                new_embeddings[database][table]['grouped'][template] = []

            for group_index, group_entry in enumerate(data[database][table]['grouped'][template]):
                column_descriptions = group_entry['details']['description']

                while len(new_embeddings[database][table]['grouped'][template]) <= group_index:
                    new_embeddings[database][table]['grouped'][template].append({
                        'details': {'column_embeddings': {}}
                    })

                for col_name, col_description in column_descriptions.items():
                    if not col_description:
                        continue

                    already_embedded = col_name in new_embeddings[database][table]['grouped'][template][group_index]['details']['column_embeddings']
                    if already_embedded:
                        total_embeddings_done += 1
                    else:
                        texts_to_embed.append((database, table, ('grouped', template, group_index), col_name, col_description))

        # UNGROUPED
        for ungrouped_key, ungrouped_entry in data[database][table].get('ungrouped', {}).items():
            if ungrouped_key not in new_embeddings[database][table]['ungrouped']:
                new_embeddings[database][table]['ungrouped'][ungrouped_key] = {
                    'details': {'column_embeddings': {}}
                }

            column_descriptions = ungrouped_entry['details']['description']

            for col_name, col_description in column_descriptions.items():
                if not col_description:
                    continue

                already_embedded = col_name in new_embeddings[database][table]['ungrouped'][ungrouped_key]['details']['column_embeddings']
                if already_embedded:
                    total_embeddings_done += 1
                else:
                    texts_to_embed.append((database, table, ('ungrouped', ungrouped_key), col_name, col_description))

# === DISPLAY PROGRESS ===
total_columns = total_embeddings_done + len(texts_to_embed)
print(f"\n📋 Total columns: {total_columns}")
print(f"✅ Already embedded: {total_embeddings_done}")
print(f"📝 To embed now: {len(texts_to_embed)}\n")

# === BATCHING ===
batches = [texts_to_embed[i:i + BATCH_SIZE] for i in range(0, len(texts_to_embed), BATCH_SIZE)]
print(f"📦 Total batches created: {len(batches)}")

# === PARALLEL EXECUTION ===
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
futures = []

start_time = time.time()

with tqdm(total=len(texts_to_embed), desc="Embedding columns (ColBERT)") as pbar:
    for batch in batches:
        texts_only = [item[4] for item in batch]
        future = executor.submit(get_late_interaction_embeddings_batch, texts_only)
        futures.append((future, batch))

    for future, batch in futures:
        embeddings = future.result()
        for (database, table, location, col_name, _), embedding in zip(batch, embeddings):

            # ✅ FIX: Handle both object and array cases
            if hasattr(embedding, "as_object"):
                emb_obj = embedding.as_object()
            else:
                emb_obj = convert_numpy(embedding)

            if location[0] == 'grouped':
                template, index = location[1], location[2]
                new_embeddings[database][table]['grouped'][template][index]['details']['column_embeddings'][col_name] = emb_obj
            else:
                ungrouped_key = location[1]
                new_embeddings[database][table]['ungrouped'][ungrouped_key]['details']['column_embeddings'][col_name] = emb_obj

            total_embeddings_done += 1
            pbar.update(1)

            if total_embeddings_done % SAVE_EVERY == 0:
                save_embeddings_to_file()

elapsed = time.time() - start_time

# === FINAL SAVE ===
save_embeddings_to_file()

print(f"\n✅ Completed {len(texts_to_embed)} ColBERT embeddings in {elapsed:.2f} seconds.")
print(f"📈 Total embeddings now stored: {total_embeddings_done}/{total_columns}")


🔄 Loaded existing embeddings from colbert_embeddings.json.gz.

📋 Total columns: 134095
✅ Already embedded: 900
📝 To embed now: 133195

📦 Total batches created: 133195


Embedding columns (ColBERT):   0%|          | 100/133195 [00:12<4:29:02,  8.25it/s]


KeyboardInterrupt: 