In [1]:
pip install playwright==1.25.2

Collecting playwright==1.25.2
  Downloading playwright-1.25.2-py3-none-macosx_10_13_x86_64.whl (30.6 MB)
[K     |████████████████████████████████| 30.6 MB 3.8 MB/s eta 0:00:01
[?25hCollecting greenlet==1.1.2
  Downloading greenlet-1.1.2-cp39-cp39-macosx_10_14_x86_64.whl (92 kB)
[K     |████████████████████████████████| 92 kB 4.1 MB/s  eta 0:00:01
[?25hCollecting websockets==10.1
  Downloading websockets-10.1-cp39-cp39-macosx_10_9_x86_64.whl (96 kB)
[K     |████████████████████████████████| 96 kB 8.4 MB/s eta 0:00:01
[?25hCollecting pyee==8.1.0
  Downloading pyee-8.1.0-py2.py3-none-any.whl (12 kB)
Installing collected packages: websockets, pyee, greenlet, playwright
  Attempting uninstall: greenlet
    Found existing installation: greenlet 1.1.1
    Uninstalling greenlet-1.1.1:
      Successfully uninstalled greenlet-1.1.1
Successfully installed greenlet-1.1.2 playwright-1.25.2 pyee-8.1.0 websockets-10.1
Note: you may need to restart the kernel to use updated packages.


In [2]:
from playwright.sync_api import sync_playwright
from dataclasses import dataclass, asdict, field
import pandas as pd
import argparse

In [5]:
@dataclass
class Business:
    """holds business data"""

    name: str = None
    address: str = None
    website: str = None
    phone_number: str = None
    reviews_count: int = None
    reviews_average: float = None


@dataclass
class BusinessList:
    """holds list of Business objects,
    and save to both excel and csv
    """

    business_list: list[Business] = field(default_factory=list)

    def dataframe(self):
        """transform business_list to pandas dataframe

        Returns: pandas dataframe
        """
        return pd.json_normalize(
            (asdict(business) for business in self.business_list), sep="_"
        )
    
    def main():
        with sync_playwright() as p:
            browser = p.chromium.launch(headless=False)
            page = browser.new_page()

            page.goto("https://www.google.com/maps", timeout=60000)
        # wait is added for dev phase. can remove it in production
            page.wait_for_timeout(5000)

            page.locator('//input[@id="searchboxinput"]').fill(search_for)
            page.wait_for_timeout(3000)

            page.keyboard.press("Enter")
            page.wait_for_timeout(5000)

        # scrolling
            page.hover('//a[contains(@href, "https://www.google.com/maps/place")]')

        # this variable is used to detect if the bot
        # scraped the same number of listings in the previous iteration
            previously_counted = 0
            while True:
                page.mouse.wheel(0, 10000)
                page.wait_for_timeout(3000)

                if (
                page.locator(
                    '//a[contains(@href, "https://www.google.com/maps/place")]'
                ).count()
                >= total
            ):
                    listings = page.locator(
                    '//a[contains(@href, "https://www.google.com/maps/place")]'
                ).all()[:total]
                    listings = [listing.locator("xpath=..") for listing in listings]
                    print(f"Total Scraped: {len(listings)}")
                    break
                else:
                # logic to break from loop to not run infinitely
                # in case arrived at all available listings
                    if (
                    page.locator(
                        '//a[contains(@href, "https://www.google.com/maps/place")]'
                    ).count()
                    == previously_counted
                ):
                        listings = page.locator(
                        '//a[contains(@href, "https://www.google.com/maps/place")]'
                    ).all()
                        print(f"Arrived at all available\nTotal Scraped: {len(listings)}")
                        break
                    else:
                        previously_counted = page.locator(
                        '//a[contains(@href, "https://www.google.com/maps/place")]'
                    ).count()
                        print(
                        f"Currently Scraped: ",
                        page.locator(
                            '//a[contains(@href, "https://www.google.com/maps/place")]'
                        ).count(),
                    )

        business_list = BusinessList()

        # scraping
        for listing in listings:
            listing.click()
            page.wait_for_timeout(5000)

            name_xpath = '//div[contains(@class, "fontHeadlineSmall")]'
            address_xpath = '//button[@data-item-id="address"]//div[contains(@class, "fontBodyMedium")]'
            website_xpath = '//a[@data-item-id="authority"]//div[contains(@class, "fontBodyMedium")]'
            phone_number_xpath = '//button[contains(@data-item-id, "phone:tel:")]//div[contains(@class, "fontBodyMedium")]'
            reviews_span_xpath = '//span[@role="img"]'

            business = Business()

            if listing.locator(name_xpath).count() > 0:
                business.name = listing.locator(name_xpath).inner_text()
            else:
                business.name = ""
            if page.locator(address_xpath).count() > 0:
                business.address = page.locator(address_xpath).inner_text()
            else:
                business.address = ""
            if page.locator(website_xpath).count() > 0:
                business.website = page.locator(website_xpath).inner_text()
            else:
                business.website = ""
            if page.locator(phone_number_xpath).count() > 0:
                business.phone_number = page.locator(phone_number_xpath).inner_text()
            else:
                business.phone_number = ""
            if listing.locator(reviews_span_xpath).count() > 0:
                business.reviews_average = float(
                    listing.locator(reviews_span_xpath)
                    .get_attribute("aria-label")
                    .split()[0]
                    .replace(",", ".")
                    .strip()
                )
                business.reviews_count = int(
                    listing.locator(reviews_span_xpath)
                    .get_attribute("aria-label")
                    .split()[2]
                    .strip()
                )
            else:
                business.reviews_average = ""
                business.reviews_count = ""

            business_list.business_list.append(business)

        # saving to both excel and csv just to showcase the features.

        browser.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-s", "--search", type=str)
    parser.add_argument("-t", "--total", type=int)
    args = parser.parse_args()

    if args.search:
        search_for = args.search
    else:
        # in case no arguments passed
        # the scraper will search by defaukt for:
        search_for = "london restaurants"

    # total number of products to scrape. Default is 10
    if args.total:
        total = args.total
    else:
        total = 1000

    main()

usage: ipykernel_launcher.py [-h] [-s SEARCH] [-t TOTAL]
ipykernel_launcher.py: error: unrecognized arguments: --ip=127.0.0.1 --stdin=9003 --control=9001 --hb=9000 --Session.signature_scheme="hmac-sha256" --Session.key=b"80cffc79-8448-4b22-bc4a-a79acf8d117f" --shell=9002 --transport="tcp" --iopub=9004 --f=/Users/mayaschweizer/Library/Jupyter/runtime/kernel-v2-8715CNVRfBzPgmig.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
