# Index Product Data

In [1]:
%pip install --upgrade pip pyyaml fastparquet > /dev/null

Note: you may need to restart the kernel to use updated packages.


Let us start by extracting product data from the XML files. 

In [1]:
import glob
SOURCE_DIR = "/workspace/datasets/product_data/products"
FILES = glob.glob(f"{SOURCE_DIR}/*.xml")
print(f"Number of XML files: {len(FILES)}")
FILES[:5]

Number of XML files: 256


['/workspace/datasets/product_data/products/products_0001_2570_to_430420.xml',
 '/workspace/datasets/product_data/products/products_0002_430439_to_518210.xml',
 '/workspace/datasets/product_data/products/products_0003_518229_to_606384.xml',
 '/workspace/datasets/product_data/products/products_0004_606428_to_722720.xml',
 '/workspace/datasets/product_data/products/products_0005_722800_to_846222.xml']

Let us read `mappings.yaml` to get the xpath selectors associated with each field to extract.

In [2]:
import yaml
with open("/workspace/datasets/mappings.yaml", "r") as handle:
    mappings = yaml.safe_load(handle)
mappings.keys()

dict_keys(['accessories', 'active', 'artistName', 'bestBuyItemId', 'bestSellingRank', 'categoryLeaf', 'categoryPath', 'categoryPathCount', 'categoryPathIds', 'class', 'classId', 'color', 'condition', 'crossSell', 'customerReviewAverage', 'customerReviewCount', 'department', 'departmentId', 'depth', 'description', 'digital', 'features', 'frequentlyPurchasedWith', 'height', 'homeDelivery', 'image', 'inStoreAvailability', 'inStorePickup', 'longDescription', 'longDescriptionHtml', 'manufacturer', 'modelNumber', 'name', 'onSale', 'onlineAvailability', 'productId', 'quantityLimit', 'regularPrice', 'relatedProducts', 'releaseDate', 'salePrice', 'salesRankLongTerm', 'salesRankMediumTerm', 'salesRankShortTerm', 'shippingCost', 'shippingWeight', 'shortDescription', 'shortDescriptionHtml', 'sku', 'startDate', 'subclass', 'subclassId', 'type', 'url', 'weight', 'width'])

## Extract Records

Let us write a function to extract product records from a XML file and return a pandas dataframe.

In [3]:
import pandas as pd
from lxml import etree
def extract_records(file: str, mappings: dict) -> pd.DataFrame:
    """Extract details from XML file
  
    Args:
        file (str): Path to the XML file containing details.
        mappings (dict): A dictionary of mappings to extract
  
    Returns:
        pd.DataFrame: A pandas dataframe with records
    """
    nodes = etree.parse(file).getroot().findall("./product") 
    records = pd.DataFrame([
      {k: node.xpath(v) for k, v in mappings.items()} 
      for node in nodes
      if len(node.xpath("productId/text()")) > 0
    ])
    return records

records = extract_records(FILES[1], mappings)
records.head()

Unnamed: 0,accessories,active,artistName,bestBuyItemId,bestSellingRank,categoryLeaf,categoryPath,categoryPathCount,categoryPathIds,class,...,shortDescription,shortDescriptionHtml,sku,startDate,subclass,subclassId,type,url,weight,width
0,[],[false],"[Redbone,Leon]",[425502],[],[cat02005],"[Best Buy, Movies & Music, Music, Folk]",4.0,"[cat00000, abcat0600000, cat02001, cat02005]",[COMPACT DISC],...,[],[],[430439],[1989-11-24],[ROCK],[1001],[Music],[],[],[]
1,[],[false],[Shadowfax],[184387],[],[cat02008],"[Best Buy, Movies & Music, Music, New Age]",4.0,"[cat00000, abcat0600000, cat02001, cat02008]",[COMPACT DISC],...,[],[],[430448],[1990-07-10],[JAZZ-CONTEMPORARY],[1002],[Music],[],[],[]
2,[],[true],[The Pointer Sisters],[321638],[],[cat02011],"[Best Buy, Movies & Music, Music, R&B & Soul]",4.0,"[cat00000, abcat0600000, cat02001, cat02011]",[COMPACT DISC],...,[],[],[430457],[1989-01-06],[R&B],[1007],[Music],[http://www.bestbuy.com/site/Greatest+Hits+%5B...,[],[]
3,[],[false],"[Penn,Michael]",[320474],[],[cat02010],"[Best Buy, Movies & Music, Music, Rock]",4.0,"[cat00000, abcat0600000, cat02001, cat02010]",[COMPACT DISC],...,[],[],[430466],[1989-05-31],[ROCK],[1001],[Music],[],[],[]
4,[],[false],"[Parker,Graham]",[320476],[],[cat02010],"[Best Buy, Movies & Music, Music, Rock]",4.0,"[cat00000, abcat0600000, cat02001, cat02010]",[COMPACT DISC],...,[],[],[430475],[1990-11-07],[R&B],[1007],[Music],[],[],[]


## Save Records

Let us now write a function to batch the records and save each batch as a parquet file. We batch the records so that we can index the batches in parallel, while keeping the batch size manageable.

In [4]:
from pathlib import Path
def save_records(records: pd.DataFrame, file: str, batch_size: int = 2000) -> int:
    """Save product records in batches to pickle files.

    Args:
        records (pd.DataFrame): Records to save.
        file (str): The path to save the file.
        batch_size (int, optional): The number of records in a batch. Defaults to 2000.
    Returns:
        int: The number of records extracted and saved.
    """
    for idx, start in enumerate(range(0, len(records), batch_size)):
        batch = records.iloc[start : start + batch_size]
        batch.to_parquet(f"{file}-{idx}.parquet")
    return len(records)

save_records(records, f"/tmp/{Path(FILES[0]).stem}", batch_size = 2000)
glob.glob("/tmp/*.parquet")

['/tmp/products_0001_2570_to_430420-2.parquet',
 '/tmp/products_0001_2570_to_430420-1.parquet',
 '/tmp/products_0001_2570_to_430420-0.parquet']

## Extract and Save Records

Let us compose the two functions we wrote earlier to extact and save product records.

In [5]:
def extract_and_save_records(file: str, output_dir: str, mappings: dict, batch_size:int = 2000) -> int:
    """Extract product records from XML files and save them in batches to pickle files.

    Args:
        file (str): XML file to extract records from.
        output_dir (str):  The directory to save the pickle files.
        mappings (dict): A dictionary of mappings to extract.
        batch_size (int, optional): The maximum number of records in a batch. Defaults to 2000.

    Returns:
        int: The number of records extracted and saved.
    """
    records = extract_records(file, mappings)
    output_file = Path(output_dir) / Path(file).stem
    save_records(records, output_file, batch_size)
    return len(records)

Path("/tmp/test-1").mkdir(parents=True, exist_ok=True)
extract_and_save_records(FILES[0], "/tmp/test-1", mappings, 2000)
glob.glob("/tmp/test-1/*")

['/tmp/test-1/products_0001_2570_to_430420-2.parquet',
 '/tmp/test-1/products_0001_2570_to_430420-1.parquet',
 '/tmp/test-1/products_0001_2570_to_430420-0.parquet']

## Extract and Save All Records

We can now write a function that loops through all the XML files and calls the `extract_and_save_records` function on each of them. We can use `ProcessPoolExecutor()` from `concurrent.futures` to parallelize the ingesion pipeline and speed it up significantly.

In [7]:
import os
from concurrent.futures import ProcessPoolExecutor
from itertools import repeat
from functools import partial
from tqdm.contrib.concurrent import process_map
def extract_and_save_records_all(source_dir: str, output_dir: str, mappings: dict, batch_size:int = 2000):
    """Extract product records from XML files and save them in batches

    Args:
        source_dir (str): _description_
        output_dir (str): _description_
        mappings (dict): _description_
        batch_size (int, optional): _description_. Defaults to 2000.
    """
    os.mkdir(output_dir)
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    files = glob.glob(source_dir + "/*.xml")
    extract_and_save_records_from_file = partial(
        extract_and_save_records, 
        mappings = mappings, 
        output_dir = output_dir, 
        batch_size = batch_size
    )
    # with ProcessPoolExecutor(max_workers=8) as pool:
    #     records = list(tqdm(pool.map(extract_and_save_records_from_file, files), total=len(files)))
    records = process_map(extract_and_save_records_from_file, files, max_workers = 8)
    print(f"Extracted {sum(records)} records from {len(files)} files")

OUTPUT_DIR = "/workspace/datasets/products"
extract_and_save_records_all(SOURCE_DIR, OUTPUT_DIR, mappings, batch_size = 2000)

  from .autonotebook import tqdm as notebook_tqdm


100%|██████████| 256/256 [10:06<00:00,  2.37s/it]


Extracted 1275077 records from 256 files


## Initialize Client

In [2]:
from opensearchpy import OpenSearch
from IPython.display import JSON
import json

def print_json(x):
    print(json.dumps(x, indent = 2))
    
client = OpenSearch(
    hosts = [{"host": "localhost", "port": 9200}],
    http_auth = ("admin", "admin"),
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
)
print_json(client.info())

{
  "name": "fc81333c71df",
  "cluster_name": "docker-cluster",
  "cluster_uuid": "_Xsvjrs0TJOF3p6QlsWnJA",
  "version": {
    "distribution": "opensearch",
    "number": "2.9.0",
    "build_type": "tar",
    "build_hash": "1164221ee2b8ba3560f0ff492309867beea28433",
    "build_date": "2023-07-18T21:23:29.367080729Z",
    "build_snapshot": false,
    "lucene_version": "9.7.0",
    "minimum_wire_compatibility_version": "7.10.0",
    "minimum_index_compatibility_version": "7.0.0"
  },
  "tagline": "The OpenSearch Project: https://opensearch.org/"
}


In [25]:
INDEX_PRODUCT = 'bbuy_products'

In [57]:
client.indices.delete(INDEX_PRODUCT)

{'acknowledged': True}

In [58]:
import yaml
with open("../week1/mappings_product.yml", "r") as w:
  mappings_product = yaml.safe_load(w)

In [59]:
body = yaml.safe_load("""
settings:
  index:
    query:
      default_field: body
""")
body['mappings'] = mappings_product[INDEX_PRODUCT]['mappings']
response = client.indices.create(INDEX_PRODUCT, body = body)
print_json(response)

{
  "acknowledged": true,
  "shards_acknowledged": true,
  "index": "bbuy_products"
}


In [None]:
# mappings = client.indices.get_mapping(INDEX_PRODUCT)
# with open("../week1/mappings_product.yml", "w") as w:
#     yaml.safe_dump(mappings, w, indent = 2)

In [60]:
import uuid
import pandas as pd
from opensearchpy.helpers import bulk
from functools import partial
from tqdm.contrib.concurrent import process_map
from itertools import repeat

def index_batch(batch: pd.DataFrame, index_name:str):
    records = batch.to_dict(orient = "records")
    docs = [{"_id": record["sku"][0], "_index": index_name, "_source": record, } for record in records]
    bulk(client, docs, request_timeout=60)
    return len(batch)


def index_batches(batches, index_name:str):
    index_batch_partial = partial(index_batch, index_name = index_name)
    results = process_map(index_batch_partial, batches, max_workers=8)
    print(f"Indexed {sum(results)} records")

In [61]:
import glob
OUTPUT_DIR = "/workspace/datasets/products"
output_files = sorted(glob.glob(f"{OUTPUT_DIR}/*.parquet"))[:5]
output_files

['/workspace/datasets/products/products_0001_2570_to_430420-0.parquet',
 '/workspace/datasets/products/products_0001_2570_to_430420-1.parquet',
 '/workspace/datasets/products/products_0001_2570_to_430420-2.parquet',
 '/workspace/datasets/products/products_0002_430439_to_518210-0.parquet',
 '/workspace/datasets/products/products_0002_430439_to_518210-1.parquet']

In [62]:
index_batches(batches, INDEX_PRODUCT)

100%|██████████| 5/5 [00:01<00:00,  3.49it/s]

Indexed 9000 records





In [63]:
client.indices.refresh(index = INDEX_PRODUCT)
client.cat.count(index = INDEX_PRODUCT, format="json")

[{'epoch': '1696891196', 'timestamp': '22:39:56', 'count': '9000'}]

In [64]:
query_url = "https://3000-ramnathv-searchfundamen-7vo4f3nho9y.ws-us105.gitpod.io/search/query?&query=*&filter.name=regularPrice&regularPrice.type=range&regularPrice.displayName=Price&regularPrice.from=30.01&regularPrice.to=40.0&filter.name=department.keyword&department.keyword.type=terms&department.keyword.key=VIDEO/COMPACT%20DISC"

In [66]:
from urllib.parse import urlparse, parse_qs
parsed_url = urlparse(query_url)
query_params = parse_qs(parsed_url.query)
query_params

{'query': ['*'],
 'filter.name': ['regularPrice', 'department.keyword'],
 'regularPrice.type': ['range'],
 'regularPrice.displayName': ['Price'],
 'regularPrice.from': ['30.01'],
 'regularPrice.to': ['40.0'],
 'department.keyword.type': ['terms'],
 'department.keyword.key': ['VIDEO/COMPACT DISC']}

In [70]:
parsed_url.query

'&query=*&filter.name=regularPrice&regularPrice.type=range&regularPrice.displayName=Price&regularPrice.from=30.01&regularPrice.to=40.0&filter.name=department.keyword&department.keyword.type=terms&department.keyword.key=VIDEO/COMPACT%20DISC'

In [68]:
filters = []
for _filter in query_params['filter.name']:
    if _filter == 'regularPrice':
        range = dict(regularPrice = dict(
            gte = query_params['regularPrice.from'][0],
            lt = query_params['regularPrice.to'][0]
        ))
        filters.append(dict(range = range))
    elif _filter == "department.keyword":
        term = dict(
            department = query_params["department.keyword.type"][0]
        )
        filters.append(dict(term = term))
print(filters)

[{'range': {'regularPrice': {'gte': '30.01', 'lt': '40.0'}}}, {'term': {'department': 'terms'}}]


In [None]:
Filters: [
    {'range': {'regularPrice': {'gte': '30.01', 'lt': '40.0'}}}, 
    {'term': {'department': 'VIDEO/COMPACT DISC'}}
]

In [None]:
Filters Input
['department.keyword']
Filters: [{'term': {'department': 'PHOTO/COMMODITIES'}}]
Display Filters
['department.keyword: PHOTO/COMMODITIES']
Applied Filters
&filter.name=department.keyword&department.keyword.type=terms&department.keyword.displayName=department.keyword&department.keyword.fieldName=department&department.keyword.key=PHOTO/COMMODITIES