## 1E. Web Scraping Script from Google Maps 

Last Updated: 2 Sep 2025 </br> 
Description: This script scrapes reviews from Google Maps using Playwright, handles scrolling to scrape multiple pages of reviews, and saves the data as a JSON file.

In [None]:
# pip install playwright
# do: playwright install in terminal

#### Import Libraries

In [None]:
# Import Libraries
import asyncio
from playwright.async_api import async_playwright
import sqlite3
import time
from datetime import datetime
import pandas as pd
import json
import random

In [None]:
# ===============================
# Setup SQLite Database
# ===============================
DB_NAME = "google_reviews.db"
url = "https://www.google.com/maps/place/Marina+Bay+Sands+Singapore/@1.2837629,103.8565316,17z/data=!3m2!4b1!5s0x31da19042de382df:0x5bbfe003fe5e690!4m9!3m8!1s0x31da19ee4cc09203:0x26c9afefa555dd7!5m2!4m1!1i2!8m2!3d1.2837575!4d103.8591065!16zL20vMGRkOTAz?entry=ttu&g_ep=EgoyMDI1MDgyNS4wIKXMDSoASAFQAw%3D%3D"

In [None]:
# Random delay functions
def get_random_delay(min_seconds, max_seconds):
    """Return a random delay between min and max seconds"""
    return random.uniform(min_seconds, max_seconds)

In [None]:
async def random_wait(min_seconds, max_seconds):
    """Wait for a random amount of time"""
    delay = get_random_delay(min_seconds, max_seconds)
    await asyncio.sleep(delay)
    return delay

In [None]:
async def random_mouse_movement(page):
    """Simulate random mouse movements"""
    try:
        # Get viewport size
        viewport = page.viewport_size
        if viewport:
            # Move mouse to random positions
            for _ in range(random.randint(2, 5)):
                x = random.randint(0, viewport['width'] - 100)
                y = random.randint(0, viewport['height'] - 100)
                await page.mouse.move(x, y)
                await asyncio.sleep(random.uniform(0.1, 0.5))
    except:
        pass

In [None]:
async def run():
    # Connect to DB
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS reviews (
            reviewer_name TEXT,
            rating TEXT,
            review_text TEXT,
            date TEXT,
            review_source TEXT,
            extraction_timestamp TEXT,
            PRIMARY KEY (reviewer_name, review_text, date)
        )
    """)
    conn.commit()

    async with async_playwright() as p:
        # Launch browser with more human-like settings
        browser = await p.chromium.launch(
            headless=False,
            args=[
                '--disable-blink-features=AutomationControlled',
                '--disable-web-security',
                '--disable-features=VizDisplayCompositor',
                '--no-sandbox',
                '--disable-setuid-sandbox'
            ]
        )
        
        # Create context with human-like user agent and viewport
        context = await browser.new_context(
            viewport={'width': 1366, 'height': 768},
            user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
        )
        
        page = await context.new_page()
        
        try:
            print("Navigating to URL...")
            await page.goto(url, wait_until='domcontentloaded')
            delay = await random_wait(3, 8)  # Random initial wait
            print(f"Waited {delay:.2f} seconds after page load")
            
            # Simulate some mouse movement
            await random_mouse_movement(page)
            
            # Click Reviews tab
            print("Looking for Reviews button...")
            try:
                # Try the XPath approach from Selenium with random delay before click
                await random_wait(1, 3)
                await page.click("//button[contains(., 'reviews') or contains(., 'Reviews')]", timeout=10000)
                print("✓ Clicked Reviews tab using XPath")
            except Exception as e:
                print(f"XPath click failed: {e}, trying alternative selectors...")
                # Alternative selectors
                review_selectors = [
                    "button[aria-label*='Reviews']",
                    "button[aria-label*='reviews']",
                    "button:has-text('Reviews')",
                    "button:has-text('reviews')"
                ]
                
                for selector in review_selectors:
                    try:
                        await random_wait(0.5, 2)
                        await page.click(selector, timeout=3000)
                        print(f"✓ Clicked Reviews tab using: {selector}")
                        break
                    except:
                        continue
                else:
                    print("✗ Could not find Reviews button")
                    return
            
            # Random wait after clicking reviews tab
            delay = await random_wait(4, 8)
            print(f"Waited {delay:.2f} seconds after clicking Reviews tab")
            
            # Find the scrollable container
            print("Looking for scrollable container...")
            scrollable_div = None
            try:
                # Use the exact same XPath as Selenium
                scrollable_div = await page.wait_for_selector(
                    "//div[@class='m6QErb DxyBCb kA9KIf dS8AEf XiKgde ']", 
                    timeout=10000
                )
                print("✓ Found scrollable container using Selenium XPath")
            except:
                print("✗ Could not find exact scrollable container, trying alternatives...")
                # Alternative selectors for scrollable area
                alternative_selectors = [
                    "div[role='main']",
                    "div[class*='scroll']",
                    "div[class*='review']",
                    "div[data-review-id]"
                ]
                
                for selector in alternative_selectors:
                    elements = await page.query_selector_all(selector)
                    if elements:
                        scrollable_div = elements[0]
                        print(f"✓ Using alternative container: {selector}")
                        break
                
                if not scrollable_div:
                    print("✗ No scrollable container found, using page body")
                    scrollable_div = page
            
            # Get total reviews count
            total_reviews = None
            try:
                reviews_count_element = await page.query_selector("//div[contains(@class, 'fontBodySmall') and contains(text(), 'reviews')]")
                if reviews_count_element:
                    reviews_count_text = await reviews_count_element.inner_text()
                    total_reviews = int(reviews_count_text.split(" ")[0].replace(",", ""))
                    print(f"✓ Total reviews detected: {total_reviews}")
            except:
                print("✗ Could not detect total review count")
            
            # Calculate max scrolls
            max_scrolls = total_reviews // 10 + 100 if total_reviews else 200
            print(f"Max scroll attempts set to: {max_scrolls}")
            
            # Main scraping loop
            seen_keys = set()
            scroll_attempts = 0
            stuck_count = 0
            max_stuck_attempts = 10
            last_review_count = 0
            
            while scroll_attempts < max_scrolls and stuck_count < max_stuck_attempts:
                print(f"--- Scroll {scroll_attempts + 1}/{max_scrolls} ---")
                
                # Scroll down
                if scrollable_div == page:
                    await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
                else:
                    await scrollable_div.evaluate("element => element.scrollTop = element.scrollHeight")
                
                # Random wait after scrolling (4-8 seconds instead of fixed 6)
                delay = await random_wait(4, 8)
                print(f"Waited {delay:.2f} seconds after scrolling")
                
                # Simulate occasional mouse movement
                if random.random() < 0.3:  # 30% chance
                    await random_mouse_movement(page)
                
                # Find review cards
                review_cards = await page.query_selector_all(".jftiEf")
                print(f"Detected {len(review_cards)} review cards total")
                
                # Process only new cards
                new_cards = review_cards[last_review_count:]
                print(f"Processing {len(new_cards)} new review cards")
                
                new_extracted = 0
                for i, card in enumerate(new_cards):
                    try:
                        extraction_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        
                        # Random small delay between processing cards
                        if i % 3 == 0:  # Every 3rd card
                            await random_wait(0.1, 0.5)
                        
                        # Try to click "More" button
                        try:
                            more_button = await card.query_selector("//button[contains(., 'More')]")
                            if more_button:
                                await random_wait(0.5, 1.5)
                                await more_button.click()
                                await random_wait(1, 3)
                        except:
                            pass
                        
                        # Extract review data
                        name = "N/A"
                        try:
                            name_elem = await card.query_selector(".d4r55")
                            if name_elem:
                                name = await name_elem.inner_text()
                        except:
                            pass
                        
                        rating = "N/A"
                        try:
                            rating_container = await card.query_selector(".DU9Pgb")
                            if rating_container:
                                rating_elem = await rating_container.query_selector(".fontBodyLarge")
                                if rating_elem:
                                    rating = await rating_elem.inner_text()
                        except:
                            pass
                        
                        review_text = ""
                        try:
                            text_elem = await card.query_selector(".wiI7pd")
                            if text_elem:
                                review_text = await text_elem.inner_text()
                        except:
                            pass
                        
                        date = "N/A"
                        try:
                            date_elem = await card.query_selector(".xRkPPb")
                            if date_elem:
                                date = await date_elem.inner_text()
                        except:
                            pass
                        
                        review_source = "N/A"
                        try:
                            source_elem = await card.query_selector(".qmhsmd")
                            if source_elem:
                                review_source = await source_elem.inner_text()
                        except:
                            pass
                        
                        # Create unique key
                        key = (name, review_text, date)
                        
                        if key not in seen_keys and (name != "N/A" or rating != "N/A" or review_text):
                            # Insert into database
                            cursor.execute("""
                                INSERT OR IGNORE INTO reviews
                                (reviewer_name, rating, review_text, date, review_source, extraction_timestamp)
                                VALUES (?, ?, ?, ?, ?, ?)
                            """, (name, rating, review_text, date, review_source, extraction_timestamp))
                            conn.commit()
                            
                            seen_keys.add(key)
                            new_extracted += 1
                            
                    except Exception as e:
                        print(f"Error processing review card: {e}")
                        continue
                
                print(f"✓ Extracted {new_extracted} new reviews this scroll")
                last_review_count = len(review_cards)
                
                # Stuck detection
                if len(new_cards) == 0:
                    stuck_count += 1
                    print(f"✗ Scroll stuck {stuck_count}/{max_stuck_attempts}. Waiting...")
                    delay = await random_wait(8, 15)  # Longer random wait when stuck
                    print(f"Waited {delay:.2f} seconds during stuck period")
                    
                    # Try to interact with last review
                    if review_cards:
                        try:
                            last_card = review_cards[-1]
                            await random_wait(1, 2)
                            await last_card.click(click_count=2)
                            print("✓ Double-clicked last review to refresh DOM")
                            await random_wait(3, 6)
                        except:
                            pass
                else:
                    stuck_count = 0
                
                # Stop if we've collected all reviews
                if total_reviews and len(seen_keys) >= total_reviews:
                    print("✓ Reached total reported reviews")
                    break
                
                scroll_attempts += 1
            
            print(f"Finished scrolling. Total unique reviews collected: {len(seen_keys)}")
            
        except Exception as e:
            print(f"Error during scraping: {e}")
            await page.screenshot(path='error_screenshot.png')
            print("Error screenshot saved as 'error_screenshot.png'")
        
        finally:
            await browser.close()
    
    # Export data
    try:
        df = pd.read_sql_query("SELECT * FROM reviews", conn)
        df.to_csv("../Data/google_maps_reviews.csv", index=False, encoding="utf-8-sig")
        
        with open("..Data/google_maps_reviews.json", "w", encoding="utf-8") as f:
            json.dump(df.to_dict(orient="records"), f, ensure_ascii=False, indent=4)
        
        print(f"Exported {len(df)} unique reviews to CSV and JSON.")
        
    except Exception as e:
        print(f"Export error: {e}")
    
    conn.close()

In [22]:
# Run the script
if __name__ == "__main__":
    # If you're in Jupyter/async environment, use:
    await run()

✓ Extracted 20 new reviews this scroll
--- Scroll 26/6028 ---
Waited 7.41 seconds after scrolling
Detected 630 review cards total
Processing 20 new review cards
✓ Extracted 20 new reviews this scroll
--- Scroll 27/6028 ---
Waited 6.94 seconds after scrolling
Detected 670 review cards total
Processing 40 new review cards
✓ Extracted 40 new reviews this scroll
--- Scroll 28/6028 ---
Waited 6.13 seconds after scrolling
Detected 690 review cards total
Processing 20 new review cards
✓ Extracted 20 new reviews this scroll
--- Scroll 29/6028 ---
Waited 6.07 seconds after scrolling
Detected 710 review cards total
Processing 20 new review cards
✓ Extracted 20 new reviews this scroll
--- Scroll 30/6028 ---
Waited 4.82 seconds after scrolling
Detected 720 review cards total
Processing 10 new review cards
✓ Extracted 10 new reviews this scroll
--- Scroll 31/6028 ---
Waited 6.77 seconds after scrolling
Detected 740 review cards total
Processing 20 new review cards
✓ Extracted 20 new reviews this sc