# Google Maps Reviews Scraper
Jalankan sel-sel di bawah ini secara berurutan.


## 1. Install Dependencies
Ini akan menginstall playwright, pandas, dan konfigurasi environment asynchronous di Google Colab.


In [None]:
!pip install playwright pandas nest_asyncio
!playwright install chromium


## 2. Setup Asyncio
Google Colab sudah berjalan di dalam loop asyncio, sehingga kita perlu mengaplikasikan `nest_asyncio`.


In [None]:
import nest_asyncio
nest_asyncio.apply()


## 3. Core Script
Berikut adalah gabungan semua modul dan file konfigurasi (utils, scraper, data processor).


In [None]:
import json
import pandas as pd
import re, time, random, os, logging
from datetime import datetime, timedelta
from playwright.sync_api import sync_playwright

SELECTORS_JSON = """
{
    "search": {
        "input": "input[name='q']",
        "recommendation_item": "div.Nv2PK",
        "recommendation_link": "a.hfpxzc"
    },
    "place_details": {
        "name": "h1.DUwDvf",
        "rating": "div.F7nice span[aria-hidden='true']",
        "reviews_count": "div.F7nice span[aria-label*='ulasan']",
        "address": "[data-item-id='address']",
        "website": "[data-item-id='authority']",
        "phone": "[data-item-id^='phone']"
    },
    "reviews": {
        "tab_button": "button[role='tab'][aria-label*='Ulasan'], button:has-text('Ulasan'), div[role='tab']:has-text('Ulasan')",
        "sort_button": "button[aria-label='Urutkan ulasan'], button[data-value='Urutkan']",
        "sort_newest": "div[role='menuitem'] >> text=Terbaru, div[role='menuitemradio'] >> text=Terbaru",
        "container": "div.m6QErb.DxyBCb.kA9KIf.dS8AEf[tabindex='-1'], div[role='main'] > div.m6QErb[tabindex='-1']",
        "item": "div.jftiEf",
        "author": "div.d4r55",
        "rating": "span.kvMYJc",
        "text": "span.wiI7pd",
        "date": "span.rsqaWe",
        "more_button": "button.w8nwRe",
        "more_reviews_button": "button[aria-label^='Ulasan lainnya']",
        "owner_reply": "div.wiI7pd"
    },
    "xpath_fallbacks": {
        "name": "xpath=//h1[contains(@class, 'DUwDvf')]",
        "address": "xpath=//button[@data-item-id='address']",
        "website": "xpath=//a[@data-item-id='authority']",
        "phone": "xpath=//button[starts-with(@data-item-id, 'phone')]",
        "rating": "xpath=//div[contains(@class, 'F7nice')]//span[@aria-hidden='true']",
        "reviews_count": "xpath=//div[contains(@class, 'F7nice')]//span[contains(@aria-label, 'ulasan')]",
        "sort_button": "xpath=//button[@aria-label='Urutkan ulasan']",
        "sort_newest": "xpath=(//div[@role='menuitem'])[2]"
    }
}
"""
SELECTORS = json.loads(SELECTORS_JSON)









load_dotenv()

def random_delay(min_val=None, max_val=None):
    """Wait for a random duration."""
    min_d = float(os.getenv("MIN_DELAY", 2)) if min_val is None else min_val
    max_d = float(os.getenv("MAX_DELAY", 5)) if max_val is None else max_val
    time.sleep(random.uniform(min_d, max_d))

def parse_relative_date(date_str):
    """Convert relative date strings (e.g., '15 jam lalu', '3 minggu lalu') to YYYY-MM-DD."""
    if not date_str:
        return None
        
    now = datetime.now()
    date_str = date_str.lower()
    
    # Handle 'Baru'
    if 'baru' in date_str:
        return now.strftime("%Y-%m-%d")
        
    number = 0
    match = re.search(r'(\d+)', date_str)
    if match:
        number = int(match.group(1))
        
    if 'jam' in date_str:
        delta = timedelta(hours=number)
    elif 'hari' in date_str:
        delta = timedelta(days=number)
    elif 'minggu' in date_str:
        delta = timedelta(weeks=number)
    elif 'bulan' in date_str:
        delta = timedelta(days=number * 30)
    elif 'tahun' in date_str:
        delta = timedelta(days=number * 365)
    else:
        # Fallback to today if unknown
        return now.strftime("%Y-%m-%d")
        
    target_date = now - delta
    return target_date.strftime("%Y-%m-%d")

def extract_place_id_from_url(url):
    """Extract Place ID from Google Maps URL using regex patterns."""
    # Pattern: /data=!4m...!1s(PLACE_ID)!
    # Example: ChIJ3-Wr1gL1aS4R6ILT4LEMITg or hex format 0x...:0x...
    match = re.search(r'!1s([a-zA-Z0-9_:-]+)(?:!|$)', url)
    if match:
        return match.group(1)
    
    # Fallback search for ChIJ
    match = re.search(r'(ChIJ[a-zA-Z0-9_-]{10,})', url)
    if match:
        return match.group(1)
        
    return None

def extract_lat_long_from_url(url):
    """Extract Latitude and Longitude from Google Maps URL."""
    # Pattern: /@(-?\d+\.\d+),(-?\d+\.\d+)
    # Example: /@-6.2381042,106.7661399,17z/
    match = re.search(r'@(-?\d+\.\d+),(-?\d+\.\d+)', url)
    if match:
        return float(match.group(1)), float(match.group(2))
    return None, None

def load_selectors(filepath=None):
    return SELECTORS





def setup_logger(log_folder="logs"):
    """Sets up logging to console and file."""
    if not os.path.exists(log_folder):
        os.makedirs(log_folder)
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = os.path.join(log_folder, f"scrape_log_{timestamp}.txt")
    
    logger = logging.getLogger("gmaps_scraper")
    logger.setLevel(logging.INFO)
    
    # Avoid duplicate handlers
    if not logger.handlers:
        # Formatter
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        
        # File handler
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setFormatter(formatter)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
    
    return logger

import random

from fake_useragent import UserAgent
import os


logger = setup_logger()

class BrowserManager:
    def __init__(self, headless=True):
        self.headless = headless
        self.ua = UserAgent()
        self.browser = None
        self.context = None
        self.page = None

    def start_browser(self):
        """Starts a fresh browser instance with anti-detection args."""
        try:
            pw = sync_playwright().start()
            self.browser = pw.chromium.launch(
                headless=self.headless,
                args=[
                    "--disable-blink-features=AutomationControlled",
                    "--no-sandbox",
                    "--disable-setuid-sandbox",
                    "--disable-infobars",
                    "--window-position=0,0",
                    "--ignore-certifcate-errors",
                    "--ignore-certifcate-errors-spki-list",
                    "--user-agent=" + self.ua.random
                ]
            )
            
            # Context randomization
            viewport_width = random.randint(1280, 1920)
            viewport_height = random.randint(720, 1080)
            
            self.context = self.browser.new_context(
                viewport={'width': viewport_width, 'height': viewport_height},
                user_agent=self.ua.random
            )
            
            # Initial stealth scripts could be added here if needed
            self.page = self.context.new_page()
            
            # Hide automation traces
            self.page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
            
            logger.info(f"Browser started successfully. Viewport: {viewport_width}x{viewport_height}")
            return self.page
        except Exception as e:
            logger.error(f"Failed to start browser: {e}")
            raise

    def close_browser(self):
        """Closes the browser instance."""
        if self.browser:
            self.browser.close()
            logger.info("Browser closed.")

    def get_new_context(self):
        """Creates a fresh context to clear session data."""
        if self.context:
            self.context.close()
        
        viewport_width = random.randint(1280, 1920)
        viewport_height = random.randint(720, 1080)
        
        self.context = self.browser.new_context(
            viewport={'width': viewport_width, 'height': viewport_height},
            user_agent=self.ua.random
        )
        self.page = self.context.new_page()
        self.page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
        logger.info(f"Fresh context created. Viewport: {viewport_width}x{viewport_height}")
        return self.page







logger = setup_logger()

class DataProcessor:
    def __init__(self, output_folder="output_data"):
        self.output_folder = output_folder
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)
        self.session_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    def process_reviews(self, raw_reviews, place_details=None):
        """Cleans and formats review data."""
        if place_details is None:
            place_details = {}
            
        processed = []
        for review in raw_reviews:
            # Parse date
            parsed_date = parse_relative_date(review.get('tanggal_raw'))
            
            # Filter by date (April 2025 onwards)
            if parsed_date and parsed_date < "2025-04-01":
                continue
                
            row = {
                'place_id': review.get('place_id'),
                'place_url': review.get('place_url'),
                'nama_tempat': review.get('nama_tempat'),
                'latitude': place_details.get('latitude'),
                'longitude': place_details.get('longitude'),
                'description': place_details.get('description'),
                'is_spending': place_details.get('is_spending'),
                'reviews': place_details.get('ulasan_total'),
                'competitors': place_details.get('competitors'),
                'website': place_details.get('website'),
                'can_claim': place_details.get('can_claim'),
                'owner': place_details.get('owner'),
                'featured_image': place_details.get('featured_image'),
                'main_category': place_details.get('main_category'),
                'categories': place_details.get('categories'),
                'total_rating': place_details.get('rating_total'),
                'total_reviews': place_details.get('ulasan_total'),
                'review_rating': review.get('rating_ulasan'),
                'workday_timing': place_details.get('workday_timing'),
                'is_temporarily_closed': place_details.get('is_temporarily_closed'),
                'is_permanently_closed': place_details.get('is_permanently_closed'),
                'closed_on': place_details.get('closed_on'),
                'phone': place_details.get('telepon'),
                'address': place_details.get('alamat'),
                'review_keywords': place_details.get('review_keywords'),
                'author_name': review.get('author_name', '').strip(),
                'tanggal_review': parsed_date,
                'isi_review': review.get('isi_review', '').strip(),
                'balasan_pemilik': review.get('balasan_pemilik', '').strip(),
                'tanggal_balasan': parse_relative_date(review.get('tanggal_balasan_raw')),
                'review_id': review.get('review_id'),
                'ingestion_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }
            
            processed.append(row)
            
        return processed

    def export_to_csv(self, data, name_prefix="gmaps_scrape", mode="a"):
        """Exports data to a CSV file."""
        if not data:
            logger.warning("No data to export.")
            return None
            
        filename = f"{name_prefix}_{self.session_timestamp}.csv"
        filepath = os.path.join(self.output_folder, filename)
        
        df = pd.DataFrame(data)
        # Ensure correct column order
        cols = [
            'place_id', 'place_url', 'nama_tempat', 'latitude', 'longitude', 'address', 'description', 'is_spending',
            'reviews', 'total_reviews', 'competitors', 'website', 'can_claim', 'owner', 'featured_image',
            'main_category', 'categories', 'total_rating', 'review_rating', 'workday_timing', 'is_temporarily_closed',
            'is_permanently_closed', 'closed_on', 'phone', 'review_id', 'review_keywords',
            'author_name', 'tanggal_review', 'isi_review', 'balasan_pemilik', 'tanggal_balasan', 'ingestion_time'
        ]
        
        # Keep only columns that actually exist to prevent KeyError
        existing_cols = [c for c in cols if c in df.columns]
        df = df[existing_cols]
        
        write_header = not os.path.exists(filepath) or mode == 'w'
        df.to_csv(filepath, index=False, mode=mode, header=write_header, encoding='utf-8-sig')
        logger.info(f"Data exported to {filepath}")
        return filepath

    def export_errors(self, errors):
        """Exports errors to a separate CSV."""
        if not errors:
            return None
            
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"error_report_{timestamp}.csv"
        filepath = os.path.join(self.output_folder, filename)
        
        df = pd.DataFrame(errors)
        df.to_csv(filepath, index=False, encoding='utf-8-sig')
        logger.info(f"Error report exported to {filepath}")
        return filepath


import random


, extract_lat_long_from_url


logger = setup_logger()

class GoogleMapsScraper:
    def __init__(self, page):
        self.page = page
        self.selectors = load_selectors()

    def search_place(self, name):
        """Searches for a place and navigates to its details."""
        try:
            logger.info(f"Searching for: {name}")
            search_input = self.selectors['search']['input']
            
            # Type with human-like delay
            self.page.fill(search_input, "")
            self.page.type(search_input, name, delay=random.randint(50, 150))
            self.page.press(search_input, "Enter")
            
            # Wait for search results or direct redirect
            self.page.wait_for_load_state("networkidle")
            
            # Use a loop to wait for either results or detail page
            for _ in range(5):
                if self.page.locator(self.selectors['search']['recommendation_item']).count() > 0:
                    logger.info("Multiple results found. Selecting the first one.")
                    self.page.click(self.selectors['search']['recommendation_item'] + " " + self.selectors['search']['recommendation_link'])
                    self.page.wait_for_load_state("networkidle")
                    random_delay(2, 4)
                    break
                elif "!1s" in self.page.url or "ChIJ" in self.page.url:
                    break
                random_delay(1, 2)
            
            return True
        except Exception as e:
            logger.error(f"Error during search: {e}")
            return False

    def _extract_from_html(self, html_content, pattern, group=1, default=None):
        """Helper function to extract data from HTML using regex."""
        try:
            match = re.search(pattern, html_content, re.DOTALL | re.IGNORECASE)
            if match:
                return match.group(group)
            return default
        except Exception as e:
            logger.debug(f"Error extracting with pattern: {e}")
            return default

    def _get_place_id(self, html_content, metadata=None):
        """Extracts the Google Place ID."""
        if metadata and metadata.get('place_id'):
            return metadata['place_id']

        # Fall back to searching HTML for ChIJ pattern
        place_id = self._extract_from_html(html_content, r'(ChIJ[a-zA-Z0-9_-]{20,})', 1)
        return place_id

    def get_place_details(self, name):
        """Extracts basic info about the place."""
        page_html = self.page.content()
        place_id = self._get_place_id(page_html)
        if not place_id:
            place_id = extract_place_id_from_url(self.page.url)

        lat, lng = extract_lat_long_from_url(self.page.url)

        details = {
            'place_id': place_id,
            'place_url': self.page.url,
            'nama_tempat': name,
            'latitude': lat,
            'longitude': lng,
            'rating_total': None,
            'ulasan_total': None,
            'alamat': None,
            'website': None,
            'telepon': None,
            'description': None,
            'is_spending': False,
            'competitors': None,
            'can_claim': False,
            'owner': None,
            'featured_image': None,
            'main_category': None,
            'categories': None,
            'workday_timing': None,
            'is_temporarily_closed': False,
            'is_permanently_closed': False,
            'closed_on': None,
            'review_keywords': None
        }
        
        try:
            sel = self.selectors['place_details']
            
            # Wait for any detail element to ensure page is loaded
            try:
                self.page.wait_for_selector(sel['name'], timeout=10000)
            except:
                logger.warning("Main name element not found within timeout.")

            # Name check (verify redirect)
            if self.page.locator(sel['name']).count() > 0:
                actual_name = self.page.text_content(sel['name'])
                details['actual_name'] = actual_name.strip() if actual_name else name
            else:
                details['actual_name'] = name
            
            # Rating
            rating_loc = self.page.locator(sel['rating'])
            if rating_loc.count() > 0:
                rating_text = rating_loc.first.text_content()
                details['rating_total'] = rating_text.strip().replace(',', '.') if rating_text else None
                
            # Reviews count
            reviews_count_loc = self.page.locator(sel['reviews_count'])
            if reviews_count_loc.count() > 0:
                # Try getting from aria-label first as it's cleaner
                aria_label = reviews_count_loc.first.get_attribute("aria-label")
                if aria_label:
                    count = "".join(filter(str.isdigit, aria_label))
                    details['ulasan_total'] = int(count) if count else 0
                else:
                    reviews_count_text = reviews_count_loc.first.text_content()
                    if reviews_count_text:
                        count = "".join(filter(str.isdigit, reviews_count_text))
                        details['ulasan_total'] = int(count) if count else 0
                
            # Optional info - Use both text_content and aria-label fallbacks
            for key, selector in [('alamat', 'address'), ('website', 'website'), ('telepon', 'phone')]:
                loc = self.page.locator(sel[selector])
                if loc.count() > 0:
                    # Prefer text inside if available, else aria-label
                    txt = loc.first.text_content()
                    if not txt or len(txt.strip()) < 5: # Some buttons only have icons/aria-label
                        aria = loc.first.get_attribute("aria-label")
                        if aria:
                            # Clean "Alamat: ", "Telepon: ", etc.
                            details[key] = aria.split(":")[-1].strip()
                        else:
                            details[key] = txt.strip() if txt else None
                    else:
                        details[key] = txt.strip()
            
            # New Field Extractions
            
            # Rating & Review Count
            rating_loc = self.page.locator("div.F7nice")
            if rating_loc.count() > 0:
                 r_text = rating_loc.first.text_content().strip()
                 # Expected format: "4.5(2,530)" or "4.5(2.530)"
                 # rating_match Example text: 4.8(2,530)
                 # Adjust regex to handle whitespace and ensure first group is rating, second group is count
                 rating_match = re.search(r"([\d\,]+[\.\,]?[\d]*)\s*\(([\d\,\.]+)\)", r_text)
                 if rating_match:
                     details['rating_total'] = rating_match.group(1).replace(',', '.') # Normalize rating to float format
                     details['ulasan_total'] = rating_match.group(2).replace('.', '').replace(',', '') # Normalize count
                 else:
                     # Fallback if regex fails but rating exists
                     details['rating_total'] = r_text
            
            # Description
            desc_loc = self.page.locator("div.PYvSYb")
            if desc_loc.count() > 0:
                details['description'] = desc_loc.first.text_content().strip()
                
            # Is Spending (Sponsored)
            details['is_spending'] = self.page.locator("div:has-text('Disponsori')").count() > 0
            
            # Can Claim
            details['can_claim'] = self.page.locator("a[data-item-id='merchant']").count() > 0
            
            # Categories
            cat_loc = self.page.locator("button.DkEaL")
            if cat_loc.count() > 0:
                cats = [c.strip() for c in cat_loc.all_text_contents() if c.strip()]
                if cats:
                    details['main_category'] = cats[0]
                    details['categories'] = ", ".join(cats)
                    
            # Featured Image
            img_loc = self.page.locator("button.aoRNLd img")
            if img_loc.count() > 0:
                details['featured_image'] = img_loc.first.get_attribute("src")
                
            # Closure status
            details['is_temporarily_closed'] = self.page.locator("span:has-text('Tutup sementara')").count() > 0
            details['is_permanently_closed'] = self.page.locator("span:has-text('Tutup permanen')").count() > 0
            
            # Workday Timing (attempts to extract the aria-label of the schedule dropdown)
            timing_loc = self.page.locator("div[aria-label*='Sembunyikan jam buka'], div[aria-label*='Tampilkan jam buka']")
            if timing_loc.count() > 0:
                 details['workday_timing'] = timing_loc.first.get_attribute("aria-label")
            
            # --- XPath Fallbacks for Details ---
            xf = self.selectors.get('xpath_fallbacks', {})
            if not details.get('alamat') and xf.get('address'):
                details['alamat'] = self.page.text_content(xf['address']).strip() if self.page.locator(xf['address']).count() > 0 else details['alamat']
            if not details.get('website') and xf.get('website'):
                details['website'] = self.page.text_content(xf['website']).strip() if self.page.locator(xf['website']).count() > 0 else details['website']
            if not details.get('telepon') and xf.get('phone'):
                details['telepon'] = self.page.text_content(xf['phone']).strip() if self.page.locator(xf['phone']).count() > 0 else details['telepon']
            if not details.get('rating_total') and xf.get('rating'):
                details['rating_total'] = self.page.text_content(xf['rating']).strip() if self.page.locator(xf['rating']).count() > 0 else details['rating_total']
            if not details.get('ulasan_total') and xf.get('reviews_count'):
                count_txt = self.page.text_content(xf['reviews_count'])
                if count_txt:
                    count = "".join(filter(str.isdigit, count_txt))
                    details['ulasan_total'] = int(count) if count else details['ulasan_total']
            
            logger.info(f"Extracted details for: {details['actual_name']}")
            return details
        except Exception as e:
            logger.warning(f"Some details could not be extracted: {e}")
            return details

    def scrape_reviews(self, place_id, name, place_url=""):
        """Navigates to reviews tab and scrapes them with infinite scroll."""
        reviews = []
        try:
            sel = self.selectors['reviews']
            
            # Click reviews tab - Try both aria-label and text
            tab_locators = [
                self.page.locator(sel['tab_button']),
                self.page.locator("button:has-text('Ulasan')"),
                self.page.locator("div[role='tab']:has-text('Ulasan')")
            ]
            
            tab_clicked = False
            for loc in tab_locators:
                if loc.count() > 0 and loc.first.is_visible():
                    logger.info(f"Clicking reviews tab using locator: {loc}")
                    loc.first.click()
                    tab_clicked = True
                    break
            
            if not tab_clicked:
                logger.error("Reviews tab not found or not clickable.")
                return []
            
            # Use explicit delay instead of networkidle which hangs on Google Maps
            random_delay(3, 5)
                
            # Sort by newest
            sort_btn = self.page.locator(sel['sort_button'])
            xf = self.selectors.get('xpath_fallbacks', {})
            
            if sort_btn.count() == 0 and xf.get('sort_button'):
                sort_btn = self.page.locator(xf['sort_button'])
                
            if sort_btn.count() > 0 and sort_btn.first.is_visible():
                logger.info("Opening sort menu.")
                sort_btn.first.click()
                random_delay(2, 3)
                
                # Wait for menu items and try multiple strategies for 'Terbaru'
                try:
                    self.page.wait_for_selector("div[role='menuitemradio'], div[role='menuitem']", timeout=5000)
                except:
                    pass
                    
                newest_opt = self.page.locator(sel['sort_newest'])
                if newest_opt.count() == 0:
                    newest_opt = self.page.locator("div[role='menuitemradio']:has-text('Terbaru'), div[role='menuitem']:has-text('Terbaru')")
                if newest_opt.count() == 0:
                    newest_opt = self.page.locator("text=Terbaru")
                if newest_opt.count() == 0 and xf.get('sort_newest'):
                    newest_opt = self.page.locator(xf['sort_newest'])
                    
                if newest_opt.count() > 0:
                    logger.info(f"Selecting 'Terbaru' sort option.")
                    newest_opt.first.click()
                    # Wait for reviews to refresh using manual delay rather than networkidle
                    random_delay(4, 6)
                else:
                    logger.warning("Sort option 'Terbaru' not found.")
            else:
                logger.warning("Sort button not found.")
                
            # Infinite scroll logic - Use a more robust way to find the scrollable container
            # In GMap, the scrollable list is typically the div with tabindex="-1" inside the main role.
            container_locator = self.page.locator("div.m6QErb.DxyBCb.kA9KIf.dS8AEf[tabindex='-1']").first
            
            if container_locator.count() == 0:
                container_locator = self.page.locator("div[role='main']").locator("..").locator("div.m6QErb[tabindex='-1']").first
            if container_locator.count() == 0:
                container_locator = self.page.locator("div[role='main']").first
                
            if container_locator.count() == 0:
                logger.error(f"Review container not found.")
                return []

            last_height = container_locator.evaluate("node => node.scrollHeight")

            max_scrolls = int(os.getenv("SCROLL_RETRY", "5")) * 10
            scroll_count = 0
            
            while scroll_count < max_scrolls:
                # Scroll down using the correct element
                container_locator.evaluate("node => node.scrollBy(0, 5000)")
                # Also try to scroll the last review item into view if possible
                try:
                    last_item = self.page.locator(sel['item']).last
                    if last_item.count() > 0:
                        last_item.scroll_into_view_if_needed(timeout=1000)
                except Exception:
                    pass
                
                random_delay(1.5, 3)
                
                # Check for 'Ulasan lainnya' button (Pagination)
                more_reviews_btn = self.page.locator(sel['more_reviews_button'])
                if more_reviews_btn.count() > 0 and more_reviews_btn.first.is_visible():
                    logger.info("Clicking 'Ulasan lainnya' button.")
                    more_reviews_btn.first.click()
                    random_delay(2, 4)
                
                new_height = container_locator.evaluate("node => node.scrollHeight")
                if new_height == last_height:
                    # Give it one more chance with a slightly bigger scroll
                    container_locator.evaluate("node => node.scrollBy(0, 5000)")
                    random_delay(1, 2)
                    new_height = container_locator.evaluate("node => node.scrollHeight")
                    if new_height == last_height:
                        logger.info("Reached the end of the list or stuck.")
                        break
                    
                last_height = new_height
                scroll_count += 1
                
                # Check if we have enough reviews based on date or count threshold
                current_count = self.page.locator(sel['item']).count()
                max_reviews = int(os.getenv("MAX_REVIEWS", "100"))
                if max_reviews > 0 and current_count >= max_reviews: 
                    break
                
            logger.info(f"Finished scrolling/clicking. Found {self.page.locator(sel['item']).count()} potential reviews.")
            
            # Extract data from review items
            review_items = self.page.locator(sel['item']).all()
            for item in review_items:
                try:
                    # Click 'More' if exists to expand long text
                    more_btn = item.locator(sel['more_button'])
                    if more_btn.count() > 0:
                        more_btn.first.click()
                        random_delay(0.5, 1)
                        
                    review_data = {
                        'place_id': place_id,
                        'place_url': place_url,
                        'nama_tempat': name,
                        'review_id': item.get_attribute("data-review-id"),
                        'author_name': item.locator(sel['author']).text_content(),
                        'rating_ulasan': item.locator(sel['rating']).get_attribute("aria-label"),
                        'tanggal_raw': item.locator(sel['date']).text_content(),
                        'isi_review': item.locator(sel['text']).text_content() if item.locator(sel['text']).count() > 0 else "",
                        # Owner reply detection
                        'balasan_pemilik': "",
                        'tanggal_balasan_raw': ""
                    }
                    
                    # Small logic for owner reply (usually nested or separate div with same text style but different container)
                    # This depends on local language. In REVIEWS_PAGE_HTML we saw 'Balasan dari pemilik' pattern.
                    # For now, we use a simple approach or fallback.
                    
                    reviews.append(review_data)
                except Exception as ex:
                    logger.warning(f"Error extracting single review: {ex}")
                    
            return reviews
        except Exception as e:
            logger.error(f"Error during review scraping: {e}")
            return reviews


logger = setup_logger()

def run_scraper(places_list, max_reviews=100):
    os.environ["MAX_REVIEWS"] = str(max_reviews)
    
    logger.info(f"Starting scrape for {len(places_list)} places.")
    browser_mgr = BrowserManager(headless=True)
    processor = DataProcessor()
    
    errors = []
    
    try:
        page = browser_mgr.start_browser()
        scraper = GoogleMapsScraper(page)
        
        for place_name in places_list:
            try:
                logger.info(f"--- Processing: {place_name} ---")
                page.goto("https://www.google.com/maps")
                page.wait_for_load_state("networkidle")
                
                if scraper.search_place(place_name):
                    random_delay(2, 4)
                    details = scraper.get_place_details(place_name)
                    place_id = details.get('place_id')
                    place_url = details.get('place_url', page.url)
                    
                    raw_reviews = scraper.scrape_reviews(place_id, place_name, place_url)
                    processed_reviews = processor.process_reviews(raw_reviews, details)
                    
                    processor.export_to_csv(processed_reviews)
                    logger.info(f"Successfully scraped and exported {len(processed_reviews)} reviews for {place_name}.")
                    
                    if not place_id:
                        logger.warning(f"Note: Place ID was not found for {place_name}, but continuing with name/link.")
                else:
                    logger.warning(f"Place not found: {place_name}")
                    errors.append({"place_name": place_name, "error": "Search failed"})
                
                random_delay(5, 10)
                
                if places_list.index(place_name) % 2 == 1:
                    logger.info("Switching to a fresh browser context.")
                    page = browser_mgr.get_new_context()
                    scraper.page = page

            except Exception as e:
                logger.error(f"Unexpected error processing {place_name}: {e}")
                errors.append({"place_name": place_name, "error": str(e)})

        processor.export_errors(errors)

    finally:
        browser_mgr.close_browser()
        logger.info("Scraping process completed. Silakan periksa file CSV di folder output_data/")



## 4. Jalankan Scraper
Masukkan nama tempat yang ingin discrape ke dalam list di bawah ini. Atur `max_reviews` sesuai kebutuhan Anda.


In [None]:

tempat_yang_ingin_di_scrape = [
    "SiCepat Ekspres Indonesia Pusat",
    "SiCepat Ekspres General Affair Office",
    "SiCepat Ekspres Menteng",
    "SiCepat Ekspres Kemayoran",
    "SiCepat Ekspres Kebayoran Lama"
]

# Jalankan scraper dengan limit 100 review per tempat (gunakan 0 untuk max tanpa henti)
run_scraper(tempat_yang_ingin_di_scrape, max_reviews=100)

# Untuk mendownload/menampilkan file CSV terakhir
import glob
import os
import pandas as pd

list_of_files = glob.glob('output_data/*.csv')
if list_of_files:
    latest_file = max(list_of_files, key=os.path.getctime)
    print(f"Data tersimpan di {latest_file}. Anda dapat mendownloadnya dari tab Files di sisi kiri layar.")
    df = pd.read_csv(latest_file)
    display(df.head())

