In [1]:
import os
import json
import pandas as pd
import numpy as np
import time

from dotenv import load_dotenv
load_dotenv()

#openai embeddings
from openai import OpenAI
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


In [2]:
def retrieve_data_dict_json(directory_path: str, files_to_match: list[str]) -> dict[str, list]:
    """_summary_

    Args:
        directory_path (str): _description_
        files_to_match (list[str]): _description_

    Returns:
        dict: _description_
    """
    inputs = []
    names = []
    for filename in os.listdir(directory_path):
        full_path = os.path.join(directory_path, filename)
        if os.path.isfile(full_path):
            if len(files_to_match) > 0:
                if filename in files_to_match:
                    with open(full_path) as f:
                        inputs.append(json.load(f))
                        names.append(full_path.replace(directory_path, ""))
            else:
                with open(full_path) as f:
                    inputs.append(json.load(f))
                    names.append(full_path.replace(directory_path, ""))

    return {"data": inputs, "names": names}

In [3]:
#all data dicts
data_dictionaries = retrieve_data_dict_json(
    directory_path="/Users/jonathan/Documents/WORK/NEO4J-TESTS/data_dicts_v4/tenants/group_iii/silver/data_dict", 
    files_to_match=[])

In [5]:
# add categories_mapping
df = pd.read_csv("./categories_mapping.csv")

In [7]:
def make_context(inputs: list[dict]) -> list[str]:
    """parse data dictionaries and prepare context for query
    """
    res = []
    for datadict in inputs:
        # search consolidated category
        try:
            category = df[df["table_name"] == datadict['table']]['consolidated_category'].values[0]
        except:
            print(f"mapping category not found for {datadict['table']}")
            category = "None"
        ctx = ""
        table_name = f"{datadict['catalog']}.{datadict['schema']}.{datadict['table']}"
        table_description = '.'.join(datadict["description"].split(".")[1:])
        table_analysis = f"{",".join([x for x in datadict['table_analysis']])}" # not used
        table_primary_keys = ",".join([x for x in datadict["primary_key"]])

        ctx = f"table name:{table_name}\n"
        ctx += f"table description: {table_description}\n"
        ctx += f"table_primary_keys:{table_primary_keys}\n"
        cols = ",".join([x.get('column_name') for x in datadict.get("columns") if "__hevo__" not in x.get('column_name')]) # filter hevo tables
        ctx += f"columns available:{cols}\n"
        ctx += f"category: {category}"
        res.append(ctx)
    return res

In [8]:
data_to_embed = make_context(inputs=data_dictionaries['data'])

mapping category not found for group_iii_2023_and_2024_sales_xlsx_2023_cleaned


In [10]:
# example
print(data_to_embed[0])

table name:group_iii.silver.group_iii_purchase_xlsx_g_3_p_order
table description:  It covers data from various dates and contains information about purchase orders, including details such as order number, date, vendor, status, and quantities. This data is valuable for tracking purchase orders, analyzing vendor performance, and managing inventory levels.
table_primary_keys:OrderNbr,LineNbr
columns available:Type,OrderNbr,Date,PromisedOn,Vendor,VendorName,Status,TotalOrderQty,OpenQuantity,CreatedBy,InventoryID,Description,Description_2,ShipTo,AccountName,OrderQty_2,QtyOnReceipts,ExtCost,UnitCost,LineNbr,BranchID,EntityType
category: Procurement & Inventory


In [11]:
# get table names ; used later when inserting the data into collection
# table_name is needed as a searchable field 
tables_names = [f"{x['catalog']}.{x['schema']}.{x['table']}" for x in data_dictionaries['data']]

In [12]:
# get categories; used later when inserting the data into collection
# category is needed as a searchable field 
categories= []
for x in tables_names:
    name = x.replace("group_iii.silver.","")
    try:
        category = df[df["table_name"] == name]['consolidated_category'].values[0]
    except:
        print(f"mapping category not found for {x}")
        category = "None"
    categories.append(category)
    

mapping category not found for group_iii.silver.group_iii_2023_and_2024_sales_xlsx_2023_cleaned


In [13]:
from pymilvus import MilvusClient
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection

# create a client
# client = MilvusClient(uri="http://localhost:19530")

# create a client on production
client = MilvusClient(uri="http://localhost:8000", db_name="ontology")

try:
    collections = client.list_collections()
    print(f"Successfully connected to Milvus. Collections: {collections}")
except Exception as e:
    print(f"Failed to connect to Milvus or retrieve collections: {e}")

Successfully connected to Milvus. Collections: ['group_iii_data_dicts_table_20251104', 'group_iii_data_dicts_table', 'group_iii_data_dicts_column', 'ontology_tables', 'ontology_columns', 'group_iii_data_dicts_column_20251104', 'test_collection']


In [14]:
def get_embedding(text: str, model: str="text-embedding-3-large"):
    resp = openai_client.embeddings.create(
        model=model,
        input=text
    )
    # The actual output structure depends on version; example:
    return resp.data[0].embedding

In [17]:
# all the table content will be embedded 
embeddings = [get_embedding(data) for data in data_to_embed]

In [15]:
# Create schema
schema = client.create_schema(
    enable_dynamic_field=True,
    auto_id=True
)

fields = [
    FieldSchema(
        name="id", 
        dtype=DataType.INT64, 
        description="a unique ID",
        is_primary=True, 
        auto_id=False
    ),
    FieldSchema(
        name="vector", 
        description="the embedding vector",
        dtype=DataType.FLOAT_VECTOR, 
        dim=3072
    ),
    FieldSchema(
        name="text", 
        description="the content of the vector embedding",
        dtype=DataType.VARCHAR, 
        max_length=8192, 
        enable_analyzer=True, # Enables text analysis (tokenization)
        enable_match=True # Enables inverted indexing for phrase matching)
    ),
    FieldSchema(
        name="name", 
        description="the name of the column",
        dtype=DataType.VARCHAR, 
        max_length=8192, 
        enable_analyzer=True, # Enables text analysis (tokenization)
        enable_match=True # Enables inverted indexing for phrase matching)
    ),
    FieldSchema(
        name="category", 
        description="the category mapped to this table",
        dtype=DataType.VARCHAR, 
        max_length=8192, 
        enable_analyzer=True, # Enables text analysis (tokenization)
        enable_match=True # Enables inverted indexing for phrase matching)
    )
]

schema = CollectionSchema(
    fields,
    description="(tesintg only) tables informations including name, description, columns available and primary keys. query enabled"
)

# Create collection
collection_name = "testing_group_iii_data_dicts_table_20251104"

# Create an index file
client.create_collection(
    collection_name=collection_name,
    schema=schema, 
)

In [16]:
# Set up the index parameters
index_params = client.prepare_index_params()

# Add an index on the vector field.
index_params.add_index(
    field_name="vector",
    metric_type="COSINE",
    index_type="IVF_FLAT",
    index_name="vector_index",
    params={ "nlist": 128 }
)

# Create an index file
client.create_index(
    collection_name=collection_name,
    index_params=index_params,
    sync=True # Whether to wait for index creation to complete before returning. Defaults to True.
)

In [18]:
# inserting data
data_to_collection = [
    {
        "id": i,
        "name": tables_names[i],
        "vector": embeddings[i], 
        "text": data_to_embed[i], 
        "category": categories[i]
    }
    for i in range(len(data_to_embed))
]

print("Data has", len(data_to_collection), "entities, each with fields: ", data_to_collection[0].keys())
print("Vector dim:", len(data_to_collection[0]["vector"]))

Data has 760 entities, each with fields:  dict_keys(['id', 'name', 'vector', 'text', 'category'])
Vector dim: 3072


In [19]:
res = client.insert(
    collection_name=collection_name,
    data=data_to_collection
)

In [20]:
# After final entity is inserted, it is best to call flush to have no growing segments left in memory
# https://milvus.io/api-reference/pymilvus/v2.2.x/MilvusClient/Collection/flush().md
client.flush(collection_name=collection_name)

# load the collection to make it available
client.load_collection(collection_name=collection_name)

In [21]:
# semantic matching vector embedding

def embed_text(texts, model="text-embedding-3-large"):
    """
    Convert raw text into embedding vectors using OpenAI embeddings.
    """
    if isinstance(texts, str):
        texts = [texts]  # Convert single string to list for consistency

    response = openai_client.embeddings.create(
        input=texts,
        model=model
    )
    return [item.embedding for item in response.data]


query: str = "stock_iq"
query_vector = embed_text(texts=query)

search_params = {
    "metric_type": "COSINE",
    "params": {"nprobe": 128}
}
top_k = 5


# Single vector search
res = client.search(
    collection_name=collection_name,
    data=query_vector,
    search_params=search_params,
    limit=top_k,
    output_fields=["text", "name"]
)

In [22]:
for i, hits in enumerate(res):
    for hit in hits:
        print(f"name:{hit.entity.get('name')}")
        print(f"distance: {hit.get('distance')}")
        print("")

name:group_iii.silver.stock_iq_forecast_2024_2025_xlsx_sheet
distance: 0.5004482865333557

name:group_iii.silver.stock_iq_lost_sales_xlsx_sheet
distance: 0.42270272970199585

name:group_iii.silver.insitestatus
distance: 0.3762144446372986

name:group_iii.silver.initemstats
distance: 0.372540146112442

name:group_iii.silver.inturnovercalcitem
distance: 0.367465615272522



In [23]:
# direct filtering
# https://milvus.io/docs/boolean.md
filter = "name == 'group_iii.silver.initemstats'"
res2 = client.query(
    collection_name=collection_name,
    filter=filter,
    output_fields=["text", "name", "id"],
)

In [24]:
# here we see that only 1 document has been retrieved since it is an exact match
res2, len(res2)

(data: ["{'id': 278, 'text': 'table name:group_iii.silver.initemstats\\ntable description:  It covers data from the years 2023 to 2025. The table contains information about inventory items, including details such as CompanyID, InventoryID, SiteID, and various cost metrics. This data is valuable for tracking inventory levels, costs, and site-specific information, aiding in inventory management and financial analysis.\\ntable_primary_keys:CompanyID,InventoryID,SiteID\\ncolumns available:CompanyID,InventoryID,SiteID,QtyOnHand,TotalCost,MinCost,MaxCost,LastCost,LastCostDate,LastPurchaseDate,ValMethod,tstamp\\ncategory: Procurement & Inventory', 'name': 'group_iii.silver.initemstats'}"], extra_info: {},
 1)

In [25]:
# when filtering on categories, because 1 category contains many tables, we got several records
filter = "category == 'Procurement & Inventory'"
res3 = client.query(
    collection_name=collection_name,
    filter=filter,
    output_fields=["name", "id", "category"],
)

[x for x in res3]

[{'name': 'group_iii.silver.group_iii_purchase_xlsx_g_3_p_order',
  'id': 0,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.initemsitehistday',
  'id': 1,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.incategory',
  'id': 7,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.poreceipt',
  'id': 13,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.initemcustsalesstats',
  'id': 19,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.rqsetupapproval',
  'id': 46,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.insetup',
  'id': 51,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.initemclass',
  'id': 52,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.initemcost',
  'id': 58,
  'category': 'Procurement & Inventory'},
 {'name': 'group_iii.silver.inplantype',
  'id': 61,
  'category': 'Procurement & Inventory'},
 {'name':

In [26]:
# partial filtering, SQL like expression
# https://milvus.io/docs/boolean.md
filter_expression = "name like \"%initem%\" " 

res4 = client.query(
    collection_name=collection_name,
    filter=filter_expression,
    output_fields=["text", "name", "id", "category"],
)

for x in res4:
    print(f"{x.get("name")}", {x.get("category")})

group_iii.silver.initemsitehistday {'Procurement & Inventory'}
group_iii.silver.initemcustsalesstats {'Procurement & Inventory'}
group_iii.silver.initemclass {'Procurement & Inventory'}
group_iii.silver.initemcost {'Procurement & Inventory'}
group_iii.silver.initemsite {'Procurement & Inventory'}
group_iii.silver.initemstats {'Procurement & Inventory'}
group_iii.silver.initemclassrep {'Procurement & Inventory'}
group_iii.silver.initemcategory {'Sales'}
group_iii.silver.initemplan {'Procurement & Inventory'}
group_iii.silver.initemsitehistbycostcenterd {'Procurement & Inventory'}
group_iii.silver.initemrep {'Procurement & Inventory'}
group_iii.silver.initemsaleshist {'Procurement & Inventory'}
group_iii.silver.initemcosthist {'Procurement & Inventory'}
group_iii.silver.initemxref {'Procurement & Inventory'}
group_iii.silver.initemsitehist {'Procurement & Inventory'}
group_iii.silver.initembox {'Procurement & Inventory'}
group_iii.silver.initemclasscurysettings {'Procurement & Inventory'

In [27]:
# phrase matching: search for documents containing your query terms as an exact phrase
# https://milvus.io/docs/phrase-match.md
# The slop parameter controls the maximum number of positions allowed between matching tokens
filter = "PHRASE_MATCH(text, 'inventory levels')"

res5 = client.query(
    collection_name=collection_name,
    filter=filter,
    output_fields=["id", "text", "name", "category"]
)

[x for x in res5]

[{'id': 0,
  'text': 'table name:group_iii.silver.group_iii_purchase_xlsx_g_3_p_order\ntable description:  It covers data from various dates and contains information about purchase orders, including details such as order number, date, vendor, status, and quantities. This data is valuable for tracking purchase orders, analyzing vendor performance, and managing inventory levels.\ntable_primary_keys:OrderNbr,LineNbr\ncolumns available:Type,OrderNbr,Date,PromisedOn,Vendor,VendorName,Status,TotalOrderQty,OpenQuantity,CreatedBy,InventoryID,Description,Description_2,ShipTo,AccountName,OrderQty_2,QtyOnReceipts,ExtCost,UnitCost,LineNbr,BranchID,EntityType\ncategory: Procurement & Inventory',
  'name': 'group_iii.silver.group_iii_purchase_xlsx_g_3_p_order',
  'category': 'Procurement & Inventory'},
 {'id': 1,
  'text': 'table name:group_iii.silver.initemsitehistday\ntable description:  It covers data from the years 2023 to 2025. The table contains information about inventory movements, including