##### Connect to Elasticsearch

In [148]:
import json
from pathlib import Path
from elasticsearch import Elasticsearch, helpers
from fuzzywuzzy import process
from dotenv import load_dotenv
import os

base_dir = "/Users/sairamlingineni/Documents/lilohq/"

load_dotenv(os.path.join(base_dir, ".env"))

ES_HOST = os.getenv("ES_HOST")
ES_API_KEY = os.getenv("ES_API_KEY")

es = Elasticsearch(
    hosts=[ES_HOST],
    api_key=ES_API_KEY,
)

# Check connection
if es.ping():
    print("Connected to Elasticsearch")
else:
    print("Connection failed")

Connected to Elasticsearch


##### CREATE SYNOMYS USING THE ES `_synonyms` API

In [149]:
syn_path = f"{base_dir}data/synonyms.json"

with open(syn_path, "r") as f:
    syn_json = json.load(f)

syn_set = [{"synonyms": f"{a}, {b}"} for a, b in syn_json]

resp = es.synonyms.put_synonym(
    id="product-synonyms",
    synonyms_set=syn_set
)
print(resp)

{'result': 'updated', 'reload_analyzers_details': {'_shards': {'total': 6, 'successful': 6, 'failed': 0}, 'reload_details': [{'index': 'products', 'reloaded_analyzers': ['search_analyzer'], 'reloaded_node_ids': ['vCLd-XP4TUajog5Gn6sPOg', 'ZBjHBxzCTWmEuqPwXWmo9Q']}]}}


##### Create the inference ( Here using Elastic `Elser` model and default `semantic_text` fields)

In [None]:

resp = es.inference.put(
    task_type="sparse_embedding",
    inference_id="product-elser-model",
    inference_config={
        "service": "elser",
        "service_settings": {
            "num_allocations": 1,
            "num_threads": 4
        }
    },
)
print(resp)


'\'\nresp = es.inference.put(\n    task_type="sparse_embedding",\n    inference_id="product-elser-model",\n    inference_config={\n        "service": "elser",\n        "service_settings": {\n            "num_allocations": 1,\n            "num_threads": 4\n        }\n    },\n)\nprint(resp)\n'

##### Create a pipeline for Products

In [151]:
pipeline_file = Path(base_dir) / "data" / "products_pipeline.json"
with open(pipeline_file, "r", encoding="utf-8") as f:
    processors = json.load(f)
resp = es.ingest.put_pipeline(
    id="product_pipeline",
    processors=processors
)
print(resp)

{'acknowledged': True}


##### Create `products` and `products_errors` template #####

In [152]:
template_file = Path(base_dir) / "data" / "products_template.json"
with open(template_file, "r", encoding="utf-8") as f:
    template_json = json.load(f)
resp = es.indices.put_index_template(
    name="products_template",
    index_patterns=["products"],
    template=template_json,
    priority=1000
    )
print(resp)

{'acknowledged': True}


In [153]:
template_file = Path(base_dir) / "data" / "products_errors_template.json"
with open(template_file, "r", encoding="utf-8") as f:
    template_json = json.load(f)
resp = es.indices.put_index_template(
    name="products_errors_template",
    index_patterns=["products_errors"],
    template=template_json,
    priority=1000
    )
print(resp)

{'acknowledged': True}


##### CREATE `PRODUCT` and `PRODUCT_ERRORS` INDEX

In [None]:

esp = es.indices.create(
    index="products",
)
print(resp)

'\nesp = es.indices.create(\n    index="products",\n)\nprint(resp)\n'

In [None]:

esp = es.indices.create(
    index="products_errors",
)
print(resp)


'\nesp = es.indices.create(\n    index="products_errors",\n)\nprint(resp)\n\n'

##### Read the Products from the `data/products.json`

In [156]:
products_file = Path(base_dir) / "data" / "products.json"
with open(products_file, "r", encoding="utf-8") as f:
    products = json.load(f)

##### Create a Normalizer function 

In [157]:
def normalize_attributes(raw_attributes, canonical_keys, threshold=60): # adjusted threshold
    """
    Normalize attribute keys using fuzzy matching against canonical keys.
    Keeps unknown keys in lowercase if no close match is found.
    """
    normalized = {}
    new_attributes = []
    for k, v in raw_attributes.items():
        best_match, score = process.extractOne(k.lower(), canonical_keys)
        if score >= threshold:
            normalized[best_match] = v
        else:
            normalized[k.lower()] = v 
            new_attributes.append(k)
    return normalized, new_attributes

##### Fix the attributes

In [158]:
attributes_file = Path(base_dir) / "data" / "attributes.json"
with open(attributes_file, "r", encoding="utf-8") as f:
    canonical_att = json.load(f)
print(canonical_att)

for product in products:
    if "attributes" in product:
        normalized_attrs, new_attr = normalize_attributes(
            product["attributes"], canonical_att
        )

        product["attributes"] = normalized_attrs 

        if new_attr:
            product["new_attributes"] = list(set(new_attr))
            print(product["attributes"] , product["new_attributes"])
        

['bulk_pack', 'power_in_hp', 'color', 'material', 'diameter_mm', 'pack_size', 'notes', 'flow_rate_lpm', 'hp', 'voltage']


##### Normalization for Category

In [159]:
def normalize_category(raw_category, canonical_categories, threshold=60):
    """
    Normalize hierarchical product categories.
    
    Fixes:
      - spelling errors
      - level-by-level mismatch
      - wrong order
      - spacing
    Returns:
      normalized_category (str)
      is_new (bool)
    """

    raw_parts = [p.strip().lower() for p in raw_category.split(">")]
    #print("Raw parts:", raw_parts)

    # Split canonical categories into 2D lists
    canonical_split = [c.split(" > ") for c in canonical_categories]
    flat_no_duplicates = list(dict.fromkeys(item for sublist in canonical_split for item in sublist))

    reconstructed = []
    # Fuzzy-match each level independently for spelling corrections
    for raw_part in raw_parts:
            if raw_part != "":
                #print("Raw part:", raw_part)
                best, score = process.extractOne(raw_part, flat_no_duplicates)
                if score >= threshold:
                    reconstructed.append(best)
                    #print("Normalizing category1:", raw_part, "->", best)
                else:
                    reconstructed.append(raw_part) 
                    #print("Normalizing category2:", raw_part) # keep raw if no good match
    # Join reconstructed path
    reconstructed_str = " > ".join(reconstructed)
    #print("Reconstructed category:", reconstructed_str)

    # Fuzzy match full reconstructed category to canonical list for order fixes
    best_cat, best_score = process.extractOne(reconstructed_str, canonical_categories)

    if best_score >= threshold:
        return best_cat
    return reconstructed_str



#### Fix Category structure

In [160]:
category_file = Path(base_dir) / "data" / "categories.json"
with open(category_file, "r", encoding="utf-8") as f:
    canonical_categories = json.load(f)
#print(canonical_categories)
for product in products:
    if "category" in product:
       #print(product["category"])
       normalized_attrs = normalize_category(product["category"], canonical_categories)
       product["category"] = normalized_attrs
      # print("Normalized category:", normalized_attrs)

##### Delete the _id in the products

In [161]:
for product in products:
    del product["_id"]
print(products[60])
print(len(products))  # Print first 2 products to verify changes

{'vendor': 'Cordillera Steel', 'sku': 'W4E74DIEV', 'title': 'Industrial Lubricant industrial grade stainless steel', 'description': 'high-flow OEM abrasive bulk pack aftermarket industrial grade moulding bulk pack polycarbonate OEM tomato colored polycarbonate colour red aftermarket cables', 'unit_of_measure': 'gal', 'category': 'Electrical > Cables > Power Cords', 'attributes': {'hp': 3, 'pack_size': '12 pcs', 'diameter_mm': '50 mm'}, 'region_availability': ['CL'], 'supplier_rating': 2.8, 'inventory_status': 'out_of_stock', 'bulk_pack_size': '24 pcs'}
10000


In [162]:
#small_set = products[60]  # for testing
es_bulk = es.options(request_timeout=7000)
def gendata(products):
    for p in products:
        yield {
            "_op_type": "index",
            "_index": "products",
            "_source": p
        }
try:
    success, failed = helpers.bulk(
        client=es_bulk,
        actions=gendata(products),
        chunk_size=100,
        raise_on_error=False,
        raise_on_exception=False
    )

    print(f"Successfully indexed: {success}")
    print(f"Failed to index: {len(failed)}")

    if failed:
        print("\n--- FAILED DOCUMENTS ---")
        for f in failed:
            print(f)

except Exception as e:
    print(f"Bulk ingest crashed: {e}")

 


Successfully indexed: 10000
Failed to index: 0


#### Get the Counts

In [163]:
resp = es.search(
   index="products,products_errors",
    size="0",
    aggs={
        "version": {
            "terms": {
                "field": "_index"
            },
            "aggs": {
                "NAME": {
                    "terms": {
                        "field": "_version"
                    }
                }
            }
        }
    },)

print(resp)
for bucket in resp['aggregations']['version']['buckets']:
    index_name = bucket['key']
    print(f"index: {index_name}")
    
    for v in bucket['NAME']['buckets']:
        version = v['key']
        count = v['doc_count']
        print(f"    version {version}: {count}")

{'took': 1, 'timed_out': False, '_shards': {'total': 6, 'successful': 6, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 9735, 'relation': 'eq'}, 'max_score': None, 'hits': []}, 'aggregations': {'version': {'doc_count_error_upper_bound': 0, 'sum_other_doc_count': 0, 'buckets': [{'key': 'products', 'doc_count': 9344, 'NAME': {'doc_count_error_upper_bound': 0, 'sum_other_doc_count': 0, 'buckets': [{'key': 1, 'doc_count': 9079}, {'key': 2, 'doc_count': 265}]}}, {'key': 'products_errors', 'doc_count': 391, 'NAME': {'doc_count_error_upper_bound': 0, 'sum_other_doc_count': 0, 'buckets': [{'key': 1, 'doc_count': 391}]}}]}}}
index: products
    version 1: 9079
    version 2: 265
index: products_errors
    version 1: 391


#### Concludes the Ingestion Process