### Import Libraries and environment variables setup

In [6]:
import load_dotenv
import asyncio
import nest_asyncio
nest_asyncio.apply()
load_dotenv.load_dotenv()
from playwright.async_api import async_playwright
from langchain_openai import ChatOpenAI
from langchain_core.tools import Tool
from langchain_core.documents import Document
from langchain_google_community import GoogleSearchAPIWrapper
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.constants import Send
from langchain_community.document_transformers import Html2TextTransformer
from langgraph.graph import StateGraph, START, END
from IPython.display import display, Image
from sentence_transformers import SentenceTransformer, util
from typing import Annotated, List, TypedDict, operator, Dict, Optional, Tuple
from pydantic import BaseModel, Field

### LLM Setup

In [7]:
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.3,
)

In [8]:
llm.invoke("hi").content

'Hello! How can I assist you today?'

### Structured output llms definition

In [25]:
class Product1(BaseModel):
    product_id: Optional[str] = Field(None, description="Unique product identifier.")
    name: Optional[str] = Field(None, description="Standardized product name.")
    brand: Optional[str] = Field(None, description="Brand information.")
    model_number: Optional[str] = Field(None, description="Product model number.")
    description: Optional[str] = Field(None, description="Product description.")
    specifications: Optional[Dict[str, str]] = Field(None, description="Detailed product specifications.")
    product_variants: Optional[List[Dict]] = Field(None, description="Product variants detail."),
    images: Optional[List[str]] = Field(None, description="List of high-quality product image URLs.")
    category: Optional[str] = Field(None, description="Product category.")
    subcategory: Optional[str] = Field(None, description="Product subcategory.")
    price_range: Optional[Tuple[float, float]] = Field(None, description="Pricing reference range (min, max).")
    price_unit: Optional[str] = Field(None, description="Currency unit for pricing.")
    ref_link: Optional[str] = Field(None, description="Reference link to the product page.")

class Product(BaseModel):
    name: str = Field(..., description="Name of the product.")
    description: str = Field(..., description="Description of the product.")
    model_number: Optional[str] = Field(None, description="Model number of the product.")
    specifications: Optional[Dict[str, str]] = Field(None, description="Specifications of the product.")
    images: Optional[List[str]] = Field(None, description="List of image URLs for the product.")
    price: Optional[float] = Field(None, description="Price of the product.")
    price_unit: Optional[str] = Field(None, description="Currency unit for the price.")
    ref_link: Optional[str] = Field(None, description="Reference link to the product page.")
    
 
class ProductListing(BaseModel):
    products: List[Product] = Field(None, description="List of products.")

class SearchQuery(BaseModel):
    search_query: str = Field(None, title="Search Query", description="User search query to find the product listing pages for given product category")
    justification: str = Field(None, title="Justification", description="Justification for the search query")
 
product_llm = llm.with_structured_output(Product)
search_query_llm = llm.with_structured_output(SearchQuery)

In [26]:
import pandas as pd
from typing import List
def save_products_in_csv(products: List[Product], file_path: str) -> None:
    """
    Save the product list to a CSV file.
    """
    df = pd.DataFrame([product.model_dump() for product in products])
    df.to_csv(file_path, index=False)

### Langgraph State Definition

In [27]:
# Graph State
class State(TypedDict):
    category_name: str
    category_type: str
    search_query: str
    product_listing_links: list
    product_detail_page_links: Annotated[list, operator.add]
    products: List[Product]
    completed_products: Annotated[list, operator.add] # all workers write to this in parallel

class ProductDetailPageLinkWorkerState(TypedDict):
    category_name: str
    product_listing_link: str
    product_detail_page_links: Annotated[list, operator.add]

class WorkerState(TypedDict):
    link: str
    products: List[Product]
    completed_products: Annotated[list, operator.add]

### Langgraph Function and Tools Define

In [28]:
search = GoogleSearchAPIWrapper()
def top_results(query):
    return search.results(query, 10, {
        "gl": "in",
        "cr": "countryIN",
        "hl": "en",
    })

tool = Tool(
    name="Google Search Snippets",
    description="Search Google for recent results.",
    func=top_results,
)

In [29]:
async def scrape(link: str):
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()
        await page.goto(link, wait_until="domcontentloaded", timeout=0)
        # await page.goto(link, wait_until="networkidle", timeout=0)

        html = await page.content()
        await browser.close()
        return html

In [30]:
def compare_strings(str1: str, str2: str, model_name: str = "all-MiniLM-L6-v2") -> float:
    """
    Compare two strings using a Hugging Face Sentence Transformer model and return the similarity score.
    
    Args:
        str1 (str): First string.
        str2 (str): Second string.
        model_name (str): Name of the pre-trained sentence transformer model.
                          Default is "all-MiniLM-L6-v2".

    Returns:
        float: Cosine similarity score between the two strings (range: 0 to 1).
    """
    model = SentenceTransformer(model_name)
    embeddings = model.encode([str1, str2], convert_to_tensor=True)
    similarity = util.pytorch_cos_sim(embeddings[0], embeddings[1]).item()
    return similarity

### Langgraph Nodes Define

In [37]:
# Nodes
class CategoryType(BaseModel):
    category_type: str = Field(None, title="Category Type", description="Type of category one of [generic, brand]")

def check_brand_category_type(state: State):
    print("started.....")
    result = llm.with_structured_output(CategoryType).invoke(f"Check if {state['category_name']} is a brand product category or generic category.")
    print(result)
    return {"category_type": result.category_type}
    
def continue_or_stop(state: State):
    if state["category_type"] == "brand":
        return "get_search_engine_query"
    else:
        return END

def get_search_engine_query(state: State):
    search_query = llm.with_structured_output(SearchQuery).invoke(
        f"Get a search query for {state['category_name']} category which is optimized for search engines to get relevant product listing pages."
        )
    print(search_query)
    return {"search_query": search_query.search_query}

class ProductListingPageLinks(BaseModel):
    links: List[str] = Field(None, description="Product listing page links relevant to provided brand category name.")

def get_product_listing_page_links(state: State):
    product_listing_links = state.get('product_listing_links')
    if product_listing_links is None or len(product_listing_links) == 0:
        results = tool.run(state["search_query"])
        print(results)
        links = llm.with_structured_output(ProductListingPageLinks).invoke(
            f"Filter out product listing page links relevant to {state['category_name']} category in search results. results: {results}"
        )
        product_listing_links = links.links
        print("google search product listing links: ", product_listing_links)
    return {"product_listing_links": product_listing_links}

class PdpLinkPages(BaseModel):
    pdp_links: List[str] = Field(None, description="Product detail page links relevant to provided product listing page links.")

def scrap_and_get_pdp_links(worker_state: ProductDetailPageLinkWorkerState):
    try:
        html = asyncio.run(scrape(worker_state["product_listing_link"]))
        html_transformer = Html2TextTransformer(ignore_links=False)
        docs_transformed = html_transformer.transform_documents([Document(page_content=html)])
        content = docs_transformed[0].page_content
        # print(content)
        
        product_detail_page_links = llm.with_structured_output(PdpLinkPages).invoke(
            [
                SystemMessage(content="You are proudct detail page url collector from given raw content."),
                HumanMessage(content=f"Give proper formatted list of product detail page links in following content belonging to {worker_state['category_name']} category if present else empty list: {content}")
            ]
        )
        domain_name = worker_state["product_listing_link"].split("/")[2]
        product_detail_page_links.pdp_links = [link if "http" in link else f"https://{domain_name}{link}" for link in product_detail_page_links.pdp_links]
        return {"product_detail_page_links": product_detail_page_links.pdp_links}
    except Exception as e:
        print(str(e))
        return {"error": str(e)}

def get_product_detail_page_links(state: State):
    if len(state["product_listing_links"]) > 0:
        print("Product Listing Page Links")
        print(state["product_listing_links"])
    return [Send("scrap_and_get_pdp_links", {"product_listing_link": link, "category_name": state["category_name"]}) for link in state["product_listing_links"]]

def scrap_and_get_products(worker_state: WorkerState):
    try:
        html = asyncio.run(scrape(worker_state["link"]))
        html_transformer = Html2TextTransformer(ignore_images=False)
        docs_transformed = html_transformer.transform_documents([Document(page_content=html)])
        content = docs_transformed[0].page_content
        
        product_result = product_llm.invoke(
            [
                SystemMessage(content="You are product detail extractor from given raw content."),
                HumanMessage(content=f"Give proper product detail in following content: {content}")
            ]
        )
        if product_result.images is not None:
            product_result.images = [image if "http" in image else f"https://{worker_state['link'].split('/')[2]}{image}" for image in product_result.images]
        
        product_result.ref_link = worker_state["link"]
        return {"completed_products": [product_result]}
    except Exception as e:
        print(str(e))
        return {"error": str(e)}

def assign_workers_to_links(state: State):
    print("Product Detail Page Links")
    print(state["product_detail_page_links"])
    return [Send("scrap_and_get_products", {"link": link}) for link in state["product_detail_page_links"]]

def synthesizer(state: State):
    unique_products = [] # based on model number
    for product in state["completed_products"]:
        if product.model_number is None or product.model_number not in [p.model_number for p in unique_products]:
            unique_products.append(product)
            
    # unique_products1 = [] 
    # compare products based on name and description and save them in unique_products1
    # for product in unique_products:
    #     if product.name is None or product.description is None:
    #         continue
    #     if len(unique_products1) == 0:
    #         unique_products1.append(product)
    #     else:
    #         for p in unique_products1:
    #             similarity = compare_strings(product.name, p.name)
    #             if similarity < 0.8:
    #                 unique_products1.append(product)
    #                 break
    
    return {
        "products": unique_products
    }

### Compiling Langgraph

In [38]:
# Build workflow
b2b_products_repo_builder = StateGraph(State)

# Add the nodes
b2b_products_repo_builder.add_node("check_brand_category_type", check_brand_category_type)
b2b_products_repo_builder.add_node("get_search_engine_query", get_search_engine_query)
b2b_products_repo_builder.add_node("get_product_listing_page_links", get_product_listing_page_links)
b2b_products_repo_builder.add_node("scrap_and_get_pdp_links", scrap_and_get_pdp_links)
b2b_products_repo_builder.add_node("scrap_and_get_products", scrap_and_get_products)
b2b_products_repo_builder.add_node("synthesizer", synthesizer)

# Add edges to connect nodes
b2b_products_repo_builder.add_edge(START, "check_brand_category_type")
b2b_products_repo_builder.add_conditional_edges('check_brand_category_type', continue_or_stop, {
    "get_search_engine_query": 'get_search_engine_query',
    END: END
})
b2b_products_repo_builder.add_edge("get_search_engine_query", "get_product_listing_page_links")
b2b_products_repo_builder.add_conditional_edges(
    "get_product_listing_page_links", get_product_detail_page_links, ["scrap_and_get_pdp_links"]
)
b2b_products_repo_builder.add_conditional_edges(
    "scrap_and_get_pdp_links", assign_workers_to_links, ["scrap_and_get_products"]
)
b2b_products_repo_builder.add_edge("scrap_and_get_products", "synthesizer")
b2b_products_repo_builder.add_edge("synthesizer", END)

# Compile the workflow
b2b_products_repo = b2b_products_repo_builder.compile()

# Show the workflow
# display(Image(b2b_products_repo.get_graph().draw_mermaid_png()))

In [39]:
state = b2b_products_repo.invoke({
    "category_name": "Honda Water Pumps",
})

started.....
category_type='brand'
search_query='Honda Water Pumps for Sale - Best Prices and Reviews' justification="This search query includes the brand name 'Honda' and the product type 'Water Pumps', which helps in targeting users specifically looking for Honda water pumps. The phrase 'for Sale' indicates intent to purchase, while 'Best Prices and Reviews' attracts users looking for competitive pricing and product evaluations, enhancing the likelihood of relevant product listing pages appearing in search results."
[{'title': 'Honda Portable Water Pumps', 'link': 'https://www.hondaindiapower.com/product-category/water-pumps', 'snippet': 'This pumpset discharge water 1100 litres per minute. This model is best suitable for water intensive crop. 32,800.00. Buy Now. Water Pumps. WB20XD. Honda Water\xa0...'}, {'title': 'Balwaan WP-33R 7HP Water Pump', 'link': 'https://www.balwaan.com/product/balwaan-wp33r-water-pump', 'snippet': 'Buy Balwaan 7HP Water Pump (WP33R) | Get Amazing offers on

In [48]:
save_products_in_csv(state["products"], f"brand-products/{state['category_name']}.csv")

In [47]:
dict(state['completed_products'][0])

{'name': 'Honda WS 20X Water Pump',
 'description': 'The Honda WS 20X Water Pump is a powerful 2 HP petrol water pump designed for agricultural and multi-purpose use. It features a 2-inch output and input, non-self priming capability, and can discharge water at a rate of 520 liters per minute. This pump operates on a 4-stroke engine and comes with a 2-year warranty, ensuring reliability and durability for various water pumping needs.',
 'model_number': 'YJIA-E30',
 'specifications': {'Rated Power': '1.5 kW (2 HP)',
  'Max Discharge': '520 L/min',
  'Max Head': '16 m',
  'Type': 'Non Self Priming Pump',
  'Dry Weight': '22 kg',
  'Engine Type': 'Air Cooled 4 stroke, O.H.V., Petrol Engine',
  'Displacement': '79.7 cc',
  'Fuel Type': 'Petrol'},
 'images': ['https://images-eu.ssl-images-amazon.com/images/I/410wFohvTIL._AC_UL232_SR232,232_.jpg'],
 'price': 19300.0,
 'price_unit': '₹',
 'ref_link': 'https://www.amazon.in/Honda-Water-Petrol-Output-Input/dp/B0D4LWS2KJ/ref=sr_1_1'}

In [28]:
products = [
    Product(
        product_id="1",
        name="JCB Machine",
        brand="JCB",
        model_number=None,
        description="JCB's range of rental-ready RS generators includes five Tier 4 Final-compliant models from 56 kW to 500 kW, designed for the North American rental market. Models include: G70RS, G125RS, G220RS, G400RS, G625RS",
        specifications={"spec1": "value1"},
        product_variants=[],
        images=["https://www.jcb.com/-/media/jcb/products/compact-excavators/19c-1e/19c-1e-compact-excavator-1.jpg"],
        category="Construction",
        subcategory="Excavator",
        price_range=(1000000, 2000000),
        price_unit="INR"
    ),
    Product(
        product_id="2",
        name="JCB Machine",
        brand="JCB",
        model_number=None,
        description="The JCB Electric Dumpster is a 100% electric powered machine designed for indoor and outdoor work in zero emission zones. It features low noise operation and is built to save time, energy and money.",
        specifications={"spec1": "value1"},
        product_variants=[],
        images=["https://www.jcb.com/-/media/jcb/products/compact-excavators/19c-1e/19c-1e-compact-excavator-1.jpg"],
        category="Construction",
        subcategory="Excavator",
        price_range=(1000000, 2000000),
        price_unit="INR"
    ),
]

In [37]:
compare_strings("sumit kushwah", "sumit kushwah")

1.0

In [31]:
unique_products = [] # based on model number
for product in products:
    if product.model_number is None or product.model_number not in [p.model_number for p in unique_products]:
        unique_products.append(product)
        
        
# unique prouducts where model_numer is none and compare with similarity greate than 90
unique_products = [] # based on model number
for product in products:
    if product.model_number is None and [p.model_number for p in unique_products]:
        if compare_strings(product.name + " " + product.description, p.name + " " + p.description) > 0.90:
            unique_products.append(product)

In [32]:
unique_products

[Product(product_id='1', name='JCB Machine', brand='JCB', model_number=None, description="JCB's range of rental-ready RS generators includes five Tier 4 Final-compliant models from 56 kW to 500 kW, designed for the North American rental market. Models include: G70RS, G125RS, G220RS, G400RS, G625RS", specifications={'spec1': 'value1'}, product_variants=[], images=['https://www.jcb.com/-/media/jcb/products/compact-excavators/19c-1e/19c-1e-compact-excavator-1.jpg'], category='Construction', subcategory='Excavator', price_range=(1000000.0, 2000000.0), price_unit='INR'),
 Product(product_id='2', name='JCB Machine', brand='JCB', model_number=None, description='The JCB Electric Dumpster is a 100% electric powered machine designed for indoor and outdoor work in zero emission zones. It features low noise operation and is built to save time, energy and money.', specifications={'spec1': 'value1'}, product_variants=[], images=['https://www.jcb.com/-/media/jcb/products/compact-excavators/19c-1e/19c

In [78]:
state['products']

[Product(product_id='JCB-AP-001', name='JCB Access Electric Scissors', brand='JCB', model_number='AP-ES-2023', description='JCB Access Electric Scissors range designed for rental companies and contractors worldwide. Features power to platform as standard. Certified to EN280 and complies with ANSI and CSA legislation.', specifications={'certification': 'EN280, ANSI, CSA compliant', 'type': 'Electric Scissors', 'power_supply': 'Electric', 'standard_feature': 'Power to platform'}, product_models=(FieldInfo(annotation=NoneType, required=False, default=None, description='Product vaiants detail.'),), images=['https://www.jcb.com/images/jcb-access-platforms.jpg'], category='Construction Equipment', subcategory='Access Platforms', price_range=(50000.0, 150000.0), price_unit='USD', confidence_scores={'name': 0.9, 'brand': 1.0, 'category': 0.9, 'specifications': 0.8, 'price_range': 0.6}),
 Product(product_id='JCB-BHL-2024', name='JCB Backhoe Loader', brand='JCB', model_number='BHL-2024', descrip

In [79]:
len(state['products'])

48

In [81]:
state['products'][1].dict()

/var/folders/6j/90fxt7w17_v5v0v_cdqhj18w0000gn/T/ipykernel_82714/4002298138.py:1: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  state['products'][1].dict()


{'product_id': 'JCB-BHL-2024',
 'name': 'JCB Backhoe Loader',
 'brand': 'JCB',
 'model_number': 'BHL-2024',
 'description': 'JCB Backhoe Loaders are versatile machines that combine excellent trenching, excavating, loading, lifting and material handling capabilities with the benefits of a single machine that is easy to maintain and simple to operate. Pioneered by JCB in 1953, they remain the world leader in Backhoe technology, innovation and capability.',
 'specifications': {'type': 'Backhoe Loader',
  'capabilities': 'Trenching, excavating, loading, lifting, material handling',
  'features': 'Easy maintenance, simple operation',
  'market_position': "World's most popular backhoe loader"},
 'product_models': (FieldInfo(annotation=NoneType, required=False, default=None, description='Product vaiants detail.'),),
 'images': ['https://www.jcb.com/images/jcb-backhoe-loader.jpg'],
 'category': 'Construction Equipment',
 'subcategory': 'Backhoe Loaders',
 'price_range': (75000.0, 150000.0),
 '

In [41]:
state = b2b_products_repo.invoke({
    "category_name": "Usha Sewing Machines",
    "product_listing_links": [
        "https://www.flipkart.com/sewing-machines/usha~brand/pr?sid=j9e,abm,0zg",
    ]
})

started.....
category_type='brand'
search_query='Usha sewing machines' justification='This is a straightforward search query targeting the Usha brand of sewing machines. It\'s simple, direct, and uses the exact brand name "Usha" along with the product category "sewing machines" which will help users find relevant results in search engines.'
Product Listing Links
['https://www.flipkart.com/sewing-machines/usha~brand/pr?sid=j9e,abm,0zg']
Product Detail Page Links
['https://www.flipkart.com/usha-craft-master-industria-manual-sewing-machine/p/itm03224df3ca385', 'https://www.flipkart.com/usha-anand-composite-h-manual-sewing-machine/p/itm3c3e1ade332e0', 'https://www.flipkart.com/usha-craft-master-delux-industrial-manual-sewing-machine/p/itmf22ad54c21b19', 'https://www.flipkart.com/usha-bandhan-composite-c-manual-sewing-machine/p/itmf7u4qfh7ftzew', 'https://www.flipkart.com/usha-aayush-manual-sewing-machine/p/itm8504b941c61ea', 'https://www.flipkart.com/usha-bandhan-dlx-composite-cover-manual

In [42]:
state['products']

[Product(product_id='USHA-CRAFT-MASTER-IND', name='USHA CRAFT MASTER Industria Manual Sewing Machine', brand='USHA', model_number='CRAFT MASTER Industria', description='Manual sewing machine with 1 built-in stitch pattern and sewing speed of 1800 SPM. Suitable for domestic use.', specifications={'Type': 'Manual', 'Number of Stitches': '1', 'Sewing Speed': '1800 SPM', 'Color': 'Black', 'Width': '23 cm', 'Height': '33 cm', 'Depth': '33 cm', 'Weight': '15.2 kg', 'Number of Buttonhole Styles': '0'}, images=['https://rukminim2.flixcart.com/image/416/416/xif0q/sewing-machine/f/t/h/craft-master-industria-0-1-usha-original-imah7df3xtp7thtq.jpeg', 'https://rukminim2.flixcart.com/image/128/128/xif0q/sewing-machine/7/j/e/craft-master-industria-0-1-usha-original-imah7df3hxsxn5kz.jpeg', 'https://rukminim2.flixcart.com/image/128/128/xif0q/sewing-machine/f/n/2/craft-master-industria-0-1-usha-original-imah7df3nkwy89vt.jpeg', 'https://rukminim2.flixcart.com/image/128/128/xif0q/sewing-machine/i/3/p/craf