# Scrape Google reviews

This script scapes Google reviews data from a CSV file of Google Maps links. 

- It first selects the reviews button and sorts by newest first. It then scrolls to at least the target date (in this case the last 6 months) and calculates relevant metrics.
- It scrolls by selecting an xpath on the reviews and hitting the down key untill it is reached. A div number in the xpath is incremented following each successful scroll untill either the target date is met or the same date is detected x number of times and the program moves on.
- If an xpath isn't found (likely because the div number is out of range) the program tries again with a closer div number untill it can continue. If Google asks the user to sign in the program presses the back button and re-sorts the reviews by newest first so it can continue.

## Environment

In [None]:
import pandas as pd # tabular data 
import numpy as np # linear algebra
from selenium import webdriver # browser automation
from webdriver_manager.chrome import ChromeDriverManager # downloads binaries for working with chrome based on your browser
from selenium.webdriver.chrome.service import Service # allows chrome driver to be started and stopped 
from selenium.webdriver.chrome.options import Options # allows for the setting of options
from selenium.webdriver.support.ui import WebDriverWait # allows for waiting untill content has loaded
from selenium.webdriver.support import expected_conditions as EC # provides conditions that need to be met to know the page has loaded
from selenium.webdriver.common.by import By # allows you to find and interact with different elements on a page
from selenium.webdriver.common.keys import Keys # allows you to send simulated key strokes
from selenium.webdriver.common.action_chains import ActionChains # a mechanism to build sequences of user actions and execute them in a single go
import time # allows for pauses to sumulate human behavior
from bs4 import BeautifulSoup # parses text from html content
import re # strings and regex
from datetime import datetime # for working with dates

options = Options() # creates an options object
options.add_experimental_option("detach", True) # sets chrome to be launched in a new window

links = pd.read_csv('links.csv', encoding='utf-8')
links = links.loc[162:162,:]
links

## Key parameters

In [None]:
# the number the total reviews are divided by to get a scroll sprint estimate
scroll_divider = 15

# the starting div number the in the xpath the simulated user selects to scroll to
starting_div_num = 6

# the amount by which the div number in the xpath is increased every successful scroll
div_num_incrementor = 6

# the number of scroll sprints - a larger number is beneficial in case there are many reviews
retry_num = 50

# the number of scroll sprints to go through before the program assumes there is nothing left to scroll and breaks
final_same_date_instances = 3

## Extract data

### Prepare Chrome

Opens a Google maps link in Chrome, selects reviews and sorts by newest first.

In [None]:
def create_driver(url): # creates a driver (opens chrome)
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()),options=options)
    driver.get(url)
    return driver

def click_reviews(driver): # navigates to reviews
    xpath = '//*[@id="QA0Szd"]/div/div/div[1]/div[2]/div/div[1]/div/div/div[3]/div/div/button[2]/div[2]/div[2]'
    results = WebDriverWait(driver, 2).until(
        EC.presence_of_element_located((By.XPATH,xpath))
    )
    results.click()
    time.sleep(1)

def click_sort(driver): # pulls up the sort options
    try:
        xpath = '//*[@id="QA0Szd"]/div/div/div[1]/div[2]/div/div[1]/div/div/div[2]/div[7]/div[2]/button/span/span'
        results = WebDriverWait(driver, 2).until(
            EC.presence_of_element_located((By.XPATH,xpath))
        )
        results.click()
    except:
        xpath = '//*[@id="QA0Szd"]/div/div/div[1]/div[2]/div/div[1]/div/div/div[2]/div[5]/div[2]/button/span/span'
        results = WebDriverWait(driver, 2).until(
            EC.presence_of_element_located((By.XPATH,xpath))
        )
        results.click() 

    
    time.sleep(1)
    
def select_newest_reviews(driver): # selects sort by newest
    xpath = '//*[@id="action-menu"]/div[2]'
    results = WebDriverWait(driver, 2).until(
        EC.presence_of_element_located((By.XPATH,xpath))
    )
    results.click()
    time.sleep(1)

def sort_by_newest_reviews(driver): # naviagtes to reviews and sorts by newest
    click_reviews(driver)
    click_sort(driver)
    select_newest_reviews(driver)

### Estimate scrolls

Estimates the optimum number scrolls as a series of 'sprints' to efficiently scroll to the target date based on overall review count. 

In [None]:
def extract_html(soup, tag, _class): # extracts all html elements
    elements = soup.find_all(tag, class_=_class)
    return [x.text for x in elements]

def get_overall_review_count(driver): # extracts review count by locating its class
    soup = get_soup(driver)
    fontBodySmall_class = extract_html(soup, 'div', 'fontBodySmall')
    for item in fontBodySmall_class:
        if re.search('reviews', item):
            try:
                return int(item.split()[0].replace(',', ''))
            except:
                return int(item[0])
    print('no data found')
    return None

def get_scroll_count(driver): # calculates optimum scroll sprints with a lower limit of 10
    overall_review_count = get_overall_review_count(driver) 
    scrolls = round(overall_review_count / scroll_divider)
    if scrolls < 10:
        print('scrolls: ' + '10')
        return 10
    else:
        print('scrolls: ' + str(scrolls))
        return scrolls

### Scroll down

Scrolls down after page load and opens all 'More' links to get the entire review and response texts. Div numbers in xpaths are incremented by 6 at a time. New elements are located, clicked on and scrolled to. If a div number isn't found, it is reduced by 1 next time to ensure it doesn't accidentally become out of range.

In [None]:
def find_div(driver, sub_div_num, div_num): # waits until it finds a new xpath
    xpath = f'//*[@id="QA0Szd"]/div/div/div[1]/div[2]/div/div[1]/div/div/div[2]/div[{sub_div_num}]/div[{div_num}]'
    div = WebDriverWait(driver, 3).until(
        EC.presence_of_element_located((By.XPATH,xpath))
    )  
    return div

def hit_down_until_div_reached(driver, div): # clicks on and scrolls to an element
    action = ActionChains(driver)
    action.move_to_element(div).click().perform() 
    action.send_keys(Keys.PAGE_DOWN).perform()

def press_down_key(driver, sub_div_num, div_num): # increments or substracts div number if found or not
    try:
        div = find_div(driver, sub_div_num, div_num) 
        hit_down_until_div_reached(driver, div)
        return div_num + div_num_incrementor
    except:
        print('failed to scroll' + ' - div num: ' + str(div_num))
        return div_num - 1

def click_more_buttons(driver): # clicks on and opens all 'More' buttons
    try:
        parent_container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, '#QA0Szd > div > div > div.w6VYqd > div:nth-child(2) > div > div.e07Vkf.kA9KIf > div > div > div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde'))
        )
        more_buttons = parent_container.find_elements(
            By.XPATH, ".//button[contains(., 'More')]"
        )
        button_count = 0
        for button in more_buttons:
            if button.text.strip() == 'More':
                button.click()
                button_count += 1
        if button_count > 0:
            #print('More button(s) clicked: ' + str(button_count))
            pass
    except:
        pass

def scroll_down(div_num, sub_div_num, driver, scrolls): # excecutes a scroll sprint and returns the current div number
    for scroll in range(scrolls):
        div_num = press_down_key(driver, sub_div_num, div_num)
        click_more_buttons(driver)
        time.sleep(0.5)
    return div_num

### Check dates

At the end of each scroll sprint, the target date is checked by testing whether the target date is met. If the target date is not met, the current and previous dates are examined to see whether they are the same. If so, we have probably reached the end of the reviews (or a prolonged server error).

In [None]:
def get_soup(driver): # returns html content
    page_content = driver.page_source
    return BeautifulSoup(page_content, 'html.parser')

def get_dates(driver): # extracts date text by their class
    soup = get_soup(driver) 
    return extract_html(soup, 'span', 'rsqaWe')

def get_date_quantifier(driver): # returns the quantifier for the latest date
    dates = get_dates(driver)
    current_date = dates[len(dates)-1]
    return current_date.split()[1]
    
def target_date_met(driver, div_num): # checks if the target date of two quarters has been met
    dates = get_dates(driver)
    current_date = dates[len(dates)-1]
    for i in range(6,12):
        test_month = str(i) + ' months ago'
        if (current_date == test_month) or ('year' in current_date.split()) or ('years' in current_date.split()):
            print()
            print('target date met: finished scrolling')
            return True
        else:
            pass
    print('target date not met - current date ' + current_date)

def same_date_detected(driver, previous_date): # checks if the new date is the same as the previous
    dates = get_dates(driver)
    current_date = dates[len(dates)-1]
    if current_date ==  previous_date:
        return True, ''
    else:
        return False, current_date

### Extract data

Loops through up to 50 scroll sprints to get at least a years worth of reviews. Breaks at the end of each sprint if a year is reached or the same date is detected. If the same date is detected, a False value for the quarter_achieved variable is returned, indicating quarterly percentage change on year metrics need not be calculated below.

In [None]:
def extract_data(driver, sub_div_num): # extracts at least a years worth of reviews if available
    scrolls = get_scroll_count(driver)
    div_num, previous_date, retries, same_date_instances = starting_div_num, 'today', retry_num, 0
    
    for i in range(retries):
        div_num = scroll_down(div_num, sub_div_num, driver, scrolls)
        try:
            if target_date_met(driver, div_num):
                return driver, driver.page_source, get_soup(driver), get_dates(driver), True
                
            else:
                same_date_as_previous, previous_date = same_date_detected(driver, previous_date)
                
                if same_date_as_previous:
                    same_date_instances += 1

                    if same_date_instances == final_same_date_instances:
                        print('Same date detected: breaking')
                        return driver, driver.page_source, get_soup(driver), get_dates(driver), False
        except:
            driver.back()
            click_sort(driver)
            select_newest_reviews(driver)
            div_num = 30
            #driver.execute_script("window.history.go(-1)")

""" DEBUGGING CODE
links_df = pd.read_csv('test_links.csv', encoding='utf-8')
name = links_df.name[0]
url = links_df.link[0]
sub_div_num = links_df.sub_div[0]

print('working on: ' + name)
driver = create_driver(url)
time.sleep(1)
sort_by_newest_reviews(driver)
driver, page_content, soup, dates, quarter_achieved = extract_data(driver, sub_div_num)
time.sleep(1)

dates[len(dates)-1]"""

## Calculate metrics

### Split html

The html content is first split into separate yearly and quarterly (latest 3 months) variables. If there are no reviews older than these time periods, everything is returned.

In [None]:
def get_target_soup(page_content, soup): # limits the html page content into the latest 6 months
    for num in [x for x in range(7, 12)]:
        try:
            target_soup = page_content.split(f'rsqaWe">{num} months')[0]
            if target_soup != page_content:
                return BeautifulSoup(target_soup, 'html.parser')
        except:
            pass
            
    target_soup = page_content.split('rsqaWe">a year')[0]
    if target_soup != page_content:
        return BeautifulSoup(target_soup, 'html.parser')
    else:
        for num in [x for x in range(2, 12)]:
            target_soup = page_content.split(f'rsqaWe">{num} years')[0]
            if target_soup != page_content:
                return BeautifulSoup(target_soup, 'html.parser')
    return soup

def get_quarterly_soup(page_content, soup): # limits the html page content into the latest 3 months
    for num in [x for x in range(4, 12)]:
        try:
            first_3_months = page_content.split(f'rsqaWe">{num} months')[0]
            if first_3_months != page_content:
                return BeautifulSoup(first_3_months, 'html.parser')
        except:
            pass
    return soup

### Calculate initial metrics

Calculates the initial metrics asked for by the client.

In [None]:
def get_target_reviews(target_soup): # returns target review count
    try:
        return len(extract_html(target_soup, 'span', 'rsqaWe'))
    except:
        print('Unable to calculate target reviews')
        return ''

def get_quarterly_reviews(quarterly_soup): # return quarterly review count
    try:
        return len(extract_html(quarterly_soup, 'span', 'rsqaWe'))
    except:
        print('Unable to calculate quarterly reviews')
        return ''

def get_average_stars(soup): # returns average stars
    try:
        results = str(soup).split('<div class="DU9Pgb"><span aria-label="')
        stars = []
        for i, result in enumerate(results):
            if i != 0 and i != (len(results)-1):
                stars.append(int(result[0]))
        return round(np.mean(stars), 2)
    except:
        print('Unable to calculate average stars')
        return ''

def get_replies(soup): # returns relies for yearly or quarterly soup 
    try:
        return len(extract_html(soup, 'div', 'wiI7pd'))
    except:
        print('Unable to calculate replies')
        return ''

def get_reply_rate(soup, reviews_count): # returns reply rate for yearly or quarterly soup 
    try:
        return round((get_replies(soup) / reviews_count) * 100)
    except:
        print('Unable to calculate reply rate')
        return 0
    

def get_initial_metrics(target_soup, quarterly_soup, soup): # combines the initial metrics
    target_reviews, quarterly_reviews = get_target_reviews(target_soup), get_quarterly_reviews(quarterly_soup)
    average_stars, target_replies = get_average_stars(target_soup), get_replies(target_soup)
    target_reply_rate = get_reply_rate(target_soup, target_reviews)
    return target_reviews, quarterly_reviews, average_stars, target_replies, target_reply_rate

## Save data

### Create dataframe

Calculates and prints out metrics before storing them in a pandas dataframe.

In [None]:
def create_dataframe(page_content, soup, quarter_achieved): # processes the calculations an creates a pandas dataframe
    target_soup = get_target_soup(page_content, soup)
    quarterly_soup = get_quarterly_soup(page_content, soup)
    target_reviews, quarterly_reviews, average_stars, target_replies, target_reply_rate = get_initial_metrics(target_soup, quarterly_soup, soup)
    initial_metrics = ['target_reviews', 'quarterly_reviews', 'average_stars', 'target_replies', 'target_reply_rate']              
    
    print()
    print('Initial metrics: ')
    print()
    for metric in initial_metrics:
        print(metric + ': ' + str(eval(metric)))
    print()
    
    df = pd.DataFrame()
    df.loc[0, 'dealer'] = name
    df.loc[0, 'scrape_date'] = datetime.today().strftime('%Y-%m-%d')
    df.loc[0, 'complete_data'] = str(quarter_achieved)
    
    for metric in initial_metrics:
        df.loc[0, metric] = eval(metric)
        
    return df

""" DEBUGGING CODE
links_df = links.reset_index(drop=True)
name = links_df.name[0]
url = links_df.link[0]
sub_div_num = links_df.sub_div[0]

print('working on: ' + name)
driver = create_driver(url)
time.sleep(1)
sort_by_newest_reviews(driver)
time.sleep(1)
driver, page_content, soup, dates, quarter_achieved = extract_data(driver, sub_div_num)

if not quarter_achieved:
    print()
    print('quarter not achieved')
    print()

target_soup = get_target_soup(page_content, soup)
quarterly_soup = get_quarterly_soup(page_content, soup)

target_reviews, quarterly_reviews, average_stars, target_replies, target_reply_rate = get_initial_metrics(target_soup, quarterly_soup, soup)

df = create_dataframe(page_content, soup, quarter_achieved)"""

## Save dataframe

Each new row containing the metrics and review/reply texts is joined with the existing dataframe and saved before moving on to the next one. This prevents data loss should an error occur at some point during the execution.

In [None]:
def save_data_frame(df): # concatenates each new row with the existing dataframe and saves it
    old_df = pd.read_csv('review_data.csv', encoding='utf-8')
    new_df = pd.concat([old_df, df], axis=0).reset_index(drop=True)
    new_df.to_csv('review_data.csv', index=False)
    print()
    print('data saved')
    print()
    print('---')

for i in list(links.index): # final loop - opens chrome, scrolls, calculates metrics, saves the data and closes chrome
    name = links.name[i]
    url = links.link[i]
    sub_div_num = links.sub_div[i]
    print()
    print('working on: ' + name)
    print()
    driver = create_driver(url)
    time.sleep(1)
    sort_by_newest_reviews(driver)
    time.sleep(1)
    driver, page_content, soup, dates, quarter_achieved = extract_data(driver, sub_div_num)
    print()
    df = create_dataframe(page_content, soup, quarter_achieved)
    save_data_frame(df)
    driver.quit()