# Ingestion Data

## You'll need to install the following libraries if they are not already installed:

In [None]:
pip install elasticsearch sentence-transformers pyyaml

In [17]:
import json
import yaml
from elasticsearch import Elasticsearch, helpers
from sentence_transformers import SentenceTransformer

In [None]:
# Step 1: Elasticsearch client setup using cloud configuration

In [3]:
def get_client_es():
    """
    Initializes Elasticsearch client using cloud_id and api_key from config.yml
    """
    with open("../config.yml", "r") as file:
        config = yaml.safe_load(file)
    return Elasticsearch(config["cloud_url"], api_key=config["api_key"])

# Step 2: Text Vectorization using SentenceTransformers


In [4]:
def get_text_vector(sentences):
    """
    Generates sentence embeddings using pre-trained model 'all-MiniLM-L6-v2'.
    """
    model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
    embeddings = model.encode(sentences)
    return embeddings

# Step 3: Read JSON file containing the dataset


In [18]:
def read_json_file(file_path):
    """
    Reads and loads the dataset from a JSON file.
    """
    with open(file_path, "r") as file:
        data = json.load(file)
    return data

# Step 4: Chunk data for batch processing


In [19]:
def chunk_data(data, batch_size):
    """
    Yields chunks of data in batch sizes for bulk indexing in Elasticsearch.
    """
    for i in range(0, len(data), batch_size):
        yield data[i : i + batch_size]

# Step 5: Generate bulk actions for Elasticsearch indexing


In [20]:
def generate_bulk_actions(index_name, data_batch):
    """
    Generates bulk actions for Elasticsearch from data batches.
    Adds 'description_embeddings' by encoding the 'description' field.
    """
    for item in data_batch:
        document_id = item["id"]
        # item["description_embeddings"] = get_text_vector(item["description"])
        yield {"_index": index_name, "_id": document_id, "_source": item}

# Step 6: Indexing data in batches to Elasticsearch


In [24]:
import pandas as pd
def index_data_in_batches(file_path, index_name, batch_size=100):
    """
    Indexes data from the JSON file in batches using Elasticsearch helpers.bulk.
    """
    data = read_json_file(file_path)

    for batch in chunk_data(data, batch_size):
        #actions = generate_bulk_actions(index_name, batch)
        #success, failed = helpers.bulk(get_client_es(), actions)
        #print(f"Batch indexed: {success} successful, {failed} failed")
        df = pd.DataFrame(list(generate_bulk_actions("foo", batch)))
        print(df)


# main execution block
# if __name__ == '__main__':
#     index_data_in_batches("../files/dataset/products.json", "products-catalog", batch_size=100)

In [25]:
index_data_in_batches(
    "../files/dataset/products.json", "products-catalog-2", batch_size=100
)

   _index   _id                                            _source
0     foo  1048  {'id': '1048', 'brand': 'colourpop', 'name': '...
1     foo  1047  {'id': '1047', 'brand': 'colourpop', 'name': '...
2     foo  1046  {'id': '1046', 'brand': 'colourpop', 'name': '...
3     foo  1045  {'id': '1045', 'brand': 'colourpop', 'name': '...
4     foo  1044  {'id': '1044', 'brand': 'boosh', 'name': 'Lips...
..    ...   ...                                                ...
95    foo   953  {'id': '953', 'brand': 'nyx', 'name': 'Collect...
96    foo   952  {'id': '952', 'brand': 'nyx', 'name': 'Super F...
97    foo   951  {'id': '951', 'brand': 'nyx', 'name': 'Super S...
98    foo   950  {'id': '950', 'brand': 'nyx', 'name': 'Felt Ti...
99    foo   949  {'id': '949', 'brand': 'nyx', 'name': 'The Cur...

[100 rows x 3 columns]
   _index  _id                                            _source
0     foo  948  {'id': '948', 'brand': 'nyx', 'name': 'Colored...
1     foo  947  {'id': '947', 'brand': '

In [14]:
index_name = "products-catalog-2"
mapping = {
    "mappings": {
        "properties": {
            "id": {"type": "keyword"},
            "brand": {
                "type": "text",
                "fields": {"keyword": {"type": "keyword"}},
            },
            "name": {"type": "text"},
            "price": {"type": "float"},
            "price_sign": {"type": "keyword"},
            "currency": {"type": "keyword"},
            "image_link": {"type": "keyword"},
            "description": {"type": "text"},
            "description_embeddings": {"type": "dense_vector", "dims": 384},
            "rating": {"type": "keyword"},
            "category": {"type": "keyword"},
            "product_type": {"type": "keyword"},
            "tag_list": {"type": "keyword"},
        }
    },
}

def create_index(index_name, mapping):
    if not get_client_es().indices.exists(index=index_name):
        get_client_es().indices.create(index=index_name, body=mapping)
        print(f"Index '{index_name}' created successfully.")
    else:
        print(f"Index '{index_name}' already exists.")



In [15]:
create_index(index_name, mapping)

Index 'products-catalog-2' created successfully.
