In [105]:
import dotenv
import os
from playwright.async_api import async_playwright
import time
import random
import json

In [106]:
# Configurations
amazon_home = "https://www.amazon.com/"
# "https://www.amazon.com/DYPER-Ingredients-Alternative-Plant-Based-Hypoallergenic/dp/B09LNMG12S/" 
product_to_search = "huggies" # either name or URL
product_url = False # if you want to search by URL, set this to True
max_items = 15 # if product name is given, this is the maximum number of items to search for

In [107]:
# load environment variables
env = dotenv.load_dotenv()

In [108]:
# Launch Browser
async def start_browser():
    p = await async_playwright().start()
    browser = await p.chromium.launch(
        headless=False
        # executable_path="/Users/sahilhadke/Library/Caches/ms-playwright/firefox-1471/firefox/firefox"
    )
    context = await browser.new_context()
    page = await context.new_page()
    return p, browser, context, page

# Execute this block to start the browser
p, browser, context, page = await start_browser()
await page.goto(amazon_home)

<Response url='https://www.amazon.com/' request=<Request url='https://www.amazon.com/' method='GET'>>

In [109]:
# Login Manually
async def login():
    # click on sign in button
    sign_in_button = await page.wait_for_selector("//a[@data-nav-role='signin']")
    await sign_in_button.click()

    # enter email
    email_input = await page.wait_for_selector("input[type='email']")
    await email_input.fill(os.getenv("AMAZON_EMAIL"))

    # click on continue
    continue_button = await page.wait_for_selector("//input[@id='continue']")

    # click on continue
    await continue_button.click()

    # enter password
    password_input = await page.wait_for_selector("input[type='password']")
    await password_input.fill(os.getenv("AMAZON_PASSWORD"))

    # click on sign in
    sign_in_button = await page.wait_for_selector("//input[@id='signInSubmit']")
    await sign_in_button.click()

# Execute this block to login
await login()

In [110]:
# Helper Functions
async def get_reviews(url): # returns a dictionary of reviews

    review_dictionary = {}

    # go to the product page
    await page.goto(url)

    # get product name
    product_name = await page.query_selector("span#productTitle")
    if product_name:
        product_name = await product_name.inner_text()
    else:
        product_name = ""

    review_dictionary["product_name"] = product_name

    # url
    review_dictionary["url"] = url.split("?")[0]

    print(f"Product Name = {product_name}")

    # Scroll to Reviews Section
    scroll_count = 0
    while scroll_count < 20:
        await page.evaluate("window.scrollBy(0, 1000)")
        time.sleep(0.5)

        # break once you reach reviews section
        review_more_link_xpath = "//a[@data-hook='see-all-reviews-link-foot']"
        review_more_link = await page.query_selector(review_more_link_xpath)

        if review_more_link:
            # get the review header in screen
            await review_more_link.scroll_into_view_if_needed()

            await review_more_link.click()
            break

        scroll_count += 1

    # Get reviews
    review_index = 1
    current_page = 1
    review_id = 1
    reviews = []
    max_items = float("inf")
    
    print(f"Scraping Reviews")

    # Scroll to First Review
    scroll_count = 0
    while scroll_count < 20:
        await page.evaluate("window.scrollBy(0, 1000)")
        time.sleep(0.5)

        # break once you reach reviews section
        first_review_xpath = "(//div[@id='cm_cr-review_list']//ul//li)[1]"
        first_review = await page.query_selector(first_review_xpath)

        if first_review:
            # get the review header in screen
            await first_review.scroll_into_view_if_needed()
            break

        scroll_count += 1

    print(f"=================")
    while review_id <= max_items:

        current_review = {}
        print(f'Getting review {review_index} on page = {current_page}')


        # title and id
        title_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//a[@data-hook='review-title']//span[2]"
        title = await page.query_selector(title_xpath)
        title = await title.inner_text() if title else ""

        # hash the title to make it unique and add it as id to the review
        title_hash = hash(title)
        title_hash = str(title_hash).replace("-", "0")
        current_review["review_id"] = title_hash
        current_review["title"] = title


        # name 
        name_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//a[@class='a-profile']/div[2]/span"
        name = await page.query_selector(name_xpath)
        if name:
            await name.scroll_into_view_if_needed()
        else:
            # no more reviews on this page, click on next page
            next_page_button_xpath = f"//ul[@class='a-pagination'][1]//li[@class='a-last']"
            next_page_button = await page.query_selector(next_page_button_xpath)
            if not next_page_button:
                # all reviews are done
                print(f"no next page button found")
                break
            await next_page_button.click()
            time.sleep(random.randint(1, 3))
            review_index = 1
            current_page += 1
            print(f"Moving to next page")
            continue

        name = await name.inner_text() if name else ""
        current_review["name"] = name

        # date
        date_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//span[@data-hook='review-date']"
        date = await page.query_selector(date_xpath)
        date = await date.inner_text() if date else ""
        current_review["date"] = date

        # stars
        stars_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//i[@data-hook='review-star-rating']"
        stars = await page.query_selector(stars_xpath)
        stars = await stars.inner_text() if stars else ""
        current_review["stars"] = str(stars).split(" ")[0]


        # product_specs
        product_specs_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//a[@data-hook='format-strip']"
        product_specs = await page.query_selector(product_specs_xpath)
        product_specs = await product_specs.inner_text() if product_specs else ""
        current_review["product_specs"] = product_specs

        # verified_purchase
        verified_purchase_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//span[@data-hook='avp-badge']"
        verified_purchase = await page.query_selector(verified_purchase_xpath)
        verified_purchase = True if verified_purchase else False
        current_review["verified_purchase"] = verified_purchase

        # if read more button, click
        read_more_button_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//a[@aria-label='Read more of this review']"
        read_more_button = await page.query_selector(read_more_button_xpath)
        if read_more_button:
            await read_more_button.click()

        # helpful statement
        helpful_statement_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//span[@data-hook='helpful-vote-statement']"
        helpful_statement = await page.query_selector(helpful_statement_xpath)
        helpful_statement = await helpful_statement.inner_text() if helpful_statement else ""
        current_review["helpful_statement"] = helpful_statement

        # review
        review_xpath = f"(//div[@id='cm_cr-review_list']//ul//li)[{review_index}]//*//span[@data-hook='review-body']"
        review = await page.query_selector(review_xpath)
        review = await review.inner_text() if review else ""
        current_review["review"] = review

        reviews.append(current_review) # add current_review
        review_index += 1
        review_id += 1

        time.sleep(random.randint(0, 1))

    review_dictionary["reviews"] = reviews
    return review_dictionary

# create output json file
def save_reviews(reviews, filename=f"{product_to_search}_reviews.json"):
    """Save reviews to JSON file after every 50 reviews."""
    if os.path.exists(filename):
        with open(filename, "r") as file:
            try:
                existing_data = json.load(file)
            except json.JSONDecodeError:
                existing_data = {}
    else:
        existing_data = {}

    existing_data.update(reviews)  # Append new reviews to existing data

    with open(filename, "w") as file:
        json.dump(existing_data, file, indent=4)
    

In [111]:
# Scrape only one product (given URL)
product_reviews = {}
if product_url:
    product_reviews = await get_reviews(product_to_search)

In [112]:
# Scrape products with given name
products_to_search = ['huggies', 'mypura', 'alppi baby', 'rascals baby', 'believe baby', 'parasol co', 'millie moon', 'ecoiginals', 'everylife']

for product_to_search in products_to_search:

    product_to_search = product_to_search + ' diapers'

    try:
        product_reviews = {}
        if not product_url:

            product_id = 1

            while product_id <= max_items:

                # go to product search page
                await page.goto(f"https://www.amazon.com/s?k={product_to_search.replace(' ', '+')}")   

                # select sort by best sellers
                try:
                    sort_select_xpath = "//select[@id='s-result-sort-select']//..//..//..//form"
                    sort_select = await page.query_selector(sort_select_xpath)
                    await sort_select.click()

                    # best sellers xpath
                    best_sellers_xpath = "//a[@id='s-result-sort-select_5']"
                    best_sellers = await page.query_selector(best_sellers_xpath)
                    await best_sellers.click()
                except:
                    print("Could not sort by best sellers")

                time.sleep(random.randint(2, 3))

                # click on product
                product_listing = f"(//div[@role='listitem'][{product_id}])[1]//a"
                product = await page.query_selector(product_listing)
                if product:
                    await product.click()
                else:
                    print(f"Product not found: id = {product_id}")
                    product_id += 1
                    continue

                # get page url
                url = page.url
                time.sleep(random.randint(1, 3))

                current_product_reviews = await get_reviews(url)
                
                # if product name to search not in title, skip
                # if product_to_search.lower() not in current_product_reviews["product_name"].lower():
                #     print(f"Product not found: id = {product_id}")
                #     product_id += 1
                #     continue
                product_reviews[str(product_id)] = current_product_reviews

                # update in json file
                save_reviews(product_reviews, filename=f"{product_to_search}_reviews.json")

                product_id += 1
    except Exception as e:
        print(f"Error: {e}")
        continue

Product Name = Huggies Simply Clean Unscented Baby Diaper Wipes, 11 Flip-Top Packs (704 Wipes Total), Packaging May Vary
Scraping Reviews
Getting review 1 on page = 1
Getting review 2 on page = 1
Getting review 3 on page = 1
Getting review 4 on page = 1
Getting review 5 on page = 1
Getting review 6 on page = 1
Getting review 7 on page = 1
Getting review 8 on page = 1
Getting review 9 on page = 1
Getting review 10 on page = 1
Getting review 11 on page = 1
Moving to next page
Getting review 1 on page = 2
Getting review 2 on page = 2
Getting review 3 on page = 2
Getting review 4 on page = 2
Getting review 5 on page = 2
Getting review 6 on page = 2
Getting review 7 on page = 2
Getting review 8 on page = 2
Getting review 9 on page = 2
Getting review 10 on page = 2
Getting review 11 on page = 2
Moving to next page
Getting review 1 on page = 3
Getting review 2 on page = 3
Getting review 3 on page = 3
Getting review 4 on page = 3
Getting review 5 on page = 3
Getting review 6 on page = 3
Gettin

In [87]:
# close
await browser.close()