## CREATE

In [None]:
import os
from elasticsearch import Elasticsearch

es = Elasticsearch(
    os.getenv("ELASTICSEARCH_URL"),
    basic_auth=(
        os.getenv("ELASTICSEARCH_USER"),
        os.getenv("ELASTICSEARCH_PASSWORD")
    ),
    verify_certs=False
)

INDEX_NAME = "flipkart_products"

mapping = {
    "settings": {
        "analysis": {
            "analyzer": {
                "edge_ngram_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": ["lowercase"]
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "catalog_id": { "type": "integer" },

            "product_id": { "type": "keyword" },

            "name": {
                "type": "text",
                "analyzer": "standard"
            },

            "category": { "type": "keyword" },
            "sub_category": { "type": "keyword" },
            "sub_sub_category": { "type": "keyword" },
            "sub_sub_sub_category": { "type": "keyword" },

            "brand": { "type": "keyword" },

            "price": { "type": "integer" },
            "discounted_price": { "type": "float" },

            "item_image_url": { "type": "keyword" },

            "description": {
                "type": "text"
            },

            "auto_complete_field": {
                "type": "search_as_you_type"
            },

            "embedding": {
                "type": "dense_vector",
                "dims": 384,
                "index": True,
                "similarity": "cosine"
            }
        }
    }
}


if not es.indices.exists(index=INDEX_NAME):
    es.indices.create(index=INDEX_NAME, body=mapping)
    print("‚úÖ Index created")
else:
    print("‚ö†Ô∏è Index already exists")


## INGEST

In [None]:
import ast
import pandas as pd
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from time import time

# Embedding model
model = SentenceTransformer("all-MiniLM-L6-v2")

# Load CSV
df = pd.read_csv("data/flipkart_com-ecommerce_sample.csv")

# Helper functions
def parse_category_hierarchy(cat_tree):
    """
    Extract category hierarchy from product_category_tree
    """
    try:
        tree = ast.literal_eval(cat_tree)[0]
        parts = [p.strip() for p in tree.split(">>")]
    except Exception:
        parts = []

    return {
        "category": parts[0] if len(parts) > 0 else None,
        "sub_category": parts[1] if len(parts) > 1 else None,
        "sub_sub_category": parts[2] if len(parts) > 2 else None,
        "sub_sub_sub_category": parts[3] if len(parts) > 3 else None,
    }

def extract_first_image(image_field):
    try:
        images = ast.literal_eval(image_field)
        return images[0] if images else None
    except Exception:
        return None

def build_auto_complete(doc):
    fields = [
        doc.get("name"),
        doc.get("brand"),
        doc.get("category"),
        doc.get("sub_category"),
        doc.get("sub_sub_category"),
        doc.get("sub_sub_sub_category"),
    ]
    return " ".join([f for f in fields if f])

def build_embedding_text(doc):
    return " ".join([
        f'Item *{doc.get("name", "")}* is of category hierarchy',
        f'{doc.get("category", "")} and',
        f'{doc.get("sub_category", "")} and',
        f'{doc.get("sub_sub_category", "")} and',
        doc.get("sub_sub_sub_category", "")
    ])

# Ingestion loop
success_count = 0
failure_count = 0
start_time = time()

progress = tqdm(
    df.iterrows(),
    total=len(df),
    desc="üöÄ Ingesting Flipkart products",
    unit="doc",
    dynamic_ncols=True
)

for idx, row in progress:
    pid = row["pid"]

    try:
        cats = parse_category_hierarchy(row["product_category_tree"])
        image_url = extract_first_image(row["image"])

        doc = {
            "product_id": pid,
            "name": row["product_name"],
            "brand": row["brand"],
            "price": row["retail_price"],
            "discounted_price": row["discounted_price"],
            "item_image_url": image_url,
            "description": row["description"],
            **cats
        }

        # autocomplete field
        doc["auto_complete_field"] = build_auto_complete(doc)

        # embedding
        embedding_text = build_embedding_text(doc)
        doc["embedding_text"] = embedding_text
        doc["embedding"] = model.encode(embedding_text).tolist()

        es.index(
            index=INDEX_NAME,
            id=pid,
            document=doc
        )

        success_count += 1

    except Exception as e:
        failure_count += 1
        progress.write(f"‚ùå Failed PID {pid}: {str(e)[:120]}")

    # live stats on progress bar
    elapsed = time() - start_time
    rate = success_count / elapsed if elapsed > 0 else 0

    progress.set_postfix({
        "‚úÖ indexed": success_count,
        "‚ùå failed": failure_count,
        "‚ö° docs/sec": f"{rate:.2f}"
    })

progress.close()

print("\n================ INGESTION SUMMARY ================")
print(f"‚úÖ Successfully indexed : {success_count}")
print(f"‚ùå Failed documents     : {failure_count}")
print(f"‚è±Ô∏è  Total time (sec)     : {round(time() - start_time, 2)}")
print("üöÄ Flipkart catalog ingestion completed")

