In [1]:
import json, datetime, time, random, re, collections, logging, tldextract, os, chardet, ast
import pandas as pd
from seleniumwire import webdriver
from seleniumwire.utils import decode as decodesw
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from seleniumbase import Driver, SB

from sqlalchemy import create_engine, inspect, Table, MetaData, select, Column, String, ARRAY, func, Integer
from sqlalchemy.orm import declarative_base, scoped_session, sessionmaker, Session

# 0) Outline
To compare sources used by Google vs. Perplexity, I first source 1800 search queries by starting with a list of base queries divided into 18 different categories. These base queries are passed to Google Keyword Planner and Answerthepublic to collect similar queries. The result is then consolidated into a file suggestions.csv from which a random choice of ten queries per base query is taken. This final list of 1800 queries is then used by script in ./perplexity-google-script to scrape the sources from Google search and Perplexity. The sources are stored in a Postgres database.

## I) Setting up Selenium and DB Connection

In [2]:
def create_selenium_driver(mode, proxy=None):
    '''
    Starts SeleniumBase and returns the driver.
    You can choose between wire mode to capture requests or uc mode for anti-bot-detection.
    :param str mode: either wire or uc
    '''
    if mode == 'wire':
        driver = Driver(wire=True, headed=True, proxy=proxy)
        driver.maximize_window()
        return driver
    elif mode == 'uc':
        driver = Driver(uc=True, headed=True, proxy=proxy)
        driver.maximize_window()
        return driver

In [2]:
class DBConnection():
    '''
    Connection to a table in my Postgres database for news articles.
    '''
    def __init__(self, host, port, db_name, username, password, table_name):
        self.host = host
        self.port = port
        self.db_name = db_name
        self.username = username
        self.password = password
        self.engine = create_engine(f'postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.db_name}?sslmode=require')
        self.metadata = MetaData()
        self.table_name = table_name
        self.Base = declarative_base()
        self.TableClass = self.create_table_class()        
        self.Session = scoped_session(sessionmaker(bind=self.engine))

    def get_engine(self):
        '''
        Returns the engine
        '''
        return self.engine

    
    def create_table_class(self):
        '''
        Creates an SQLAlchemy table using the Base class.
        Uses the specified schema if tables does not yet exist.
        If it exists, it reads the table schema from the db.
        '''
        if not self.engine.dialect.has_table(self.engine.connect(), self.table_name):
            class TableClass(self.Base):
                __tablename__ = self.table_name
                id = Column(Integer, primary_key=True)
                category = Column(String)
                subcategory0 = Column(String)
                subcategory1 = Column(String)
                query = Column(String)
                google = Column(ARRAY(String))
                perplexity = Column(ARRAY(String))
            return TableClass
        else:
            class TableClass(self.Base):
                __table__ = Table(self.table_name, self.metadata, autoload_with=self.engine)
            return TableClass


    def create_table(self):
        '''
        Creates a new table if it does not yet exist.
        '''
        if not self.engine.dialect.has_table(self.engine.connect(), self.table_name):
            self.Base.metadata.create_all(self.engine)
            print(f'The following table has been created: {self.table_name}')


    def write_to_db(self, results):
        '''
        Takes a list of entries (dict) and writes it to the db.
        '''
        with self.Session() as session:
            objects = [self.TableClass(**entry) for entry in results]
            session.bulk_save_objects(objects)
            session.commit()


    def read_from_db(self, entry_id=None):
        '''
        Reads from the database.
        If entry_id is specified, it returns a single entry.
        Otherwise it returns all entries as a DataFrame.
        :param int entry_id: The id to retrieve
        '''
        with self.Session() as session:
            if entry_id:
                result = session.query(self.TableClass).filter_by(id=entry_id).first()
                return result
            else: 
                result = [x for x in session.query(self.TableClass).all()]
                df = pd.DataFrame([x.__dict__ for x in result])
                df.drop('_sa_instance_state', axis=1, inplace=True)
                return df


    def update_entry(self, entry_id, column, new_value):
        '''
        Updates a column for a given id.
        :param int entry_id: The id of the row to update
        :param str column: The name of the column to update
        :param new_value: The new value
        '''
        with self.Session() as session:
            try:
                entry = self.read_from_db(entry_id)
                if not entry:
                    print('No entry found with this ID.')
                    return False
                elif not hasattr(self.TableClass, column):
                    print('Column does not exist in this table.')
                    return False
                else:
                    session.query(self.TableClass).filter_by(id=entry_id).update({column: new_value})
                    session.commit()
            except Exception as e:
                print(e)

## II) Sourcing Search Queries

### Google Keyword Planner

In [7]:
driver = create_selenium_driver('uc')

In [253]:
# Open keyword planner, then login manually...
driver.get('https://ads.google.com/aw/keywordplanner/')

In [254]:
driver.get('https://ads.google.com/aw/keywordplanner/home?ocid=96840841&euid=99592441&uscid=96840841&__c=7316379009&authuser=0&sf=kp&subid=de-de-awhp-g-aw-c-t-kwp-hero%21o2')

In [251]:
def get_queries_keyword_planner(driver, query):
    '''
    Uses Google Keyword Planner to get keywords for a given query.
    Important: Manual login required before passing the driver to this function.
    Returns a list of keywords.
    '''
    result = []
    directory = './downloaded_files'
    pattern = re.compile(r'.*\.csv$')
    download_ready = False
    driver.get('https://ads.google.com/aw/keywordplanner/home?ocid=96840841&euid=99592441&uscid=96840841&__c=7316379009&authuser=0&sf=kp&subid=de-de-awhp-g-aw-c-t-kwp-hero%21o2')
    time.sleep(random.randint(0, 10))
    try:
        WebDriverWait(driver, 60).until(EC.element_to_be_clickable((By.XPATH, '//div[contains(@class, "card-frame")]'))).click()
        time.sleep(random.randint(0, 5))
    except Exception as e:
        print(e)
        return result
    try:
        WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//input[contains(@class, "search-input")]'))).send_keys(query)
    except Exception as e:
        print(e)
        return result
    driver.find_element(By.XPATH, '//material-button[contains(@class, "submit-button")]').click()
    try:
        WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//div[@aria-label="Keyword ideas Table"]')))
    except Exception as e:
        print(e)
        return result
    time.sleep(random.randint(2, 10))
    try:
        WebDriverWait(driver, 60).until(EC.element_to_be_clickable((By.XPATH, '//material-menu[contains(@class, "download-menu")]'))).click()
    except Exception as e:
        print(e)
        return result
    try:
        WebDriverWait(driver, 60).until(EC.element_to_be_clickable((By.XPATH, '//span[text()=".csv"]'))).click()
    except Exception as e:
        print(e)
        return result
    time.sleep(20)
    while not download_ready:
        with os.scandir(directory) as entries:
            for entry in entries:
                if pattern.match(entry.name):
                    current_csv = entry.name
                    download_ready = True
    with open(f'./downloaded_files/{current_csv}', 'r', encoding='utf-16') as file:
        lines = file.readlines()
        for line in lines[3:]:
            result.append(re.split(r'\t', line)[0])
    os.remove(f'./downloaded_files/{current_csv}')
    driver.get('https://ads.google.com/aw/keywordplanner/home?ocid=96840841&euid=99592441&uscid=96840841&__c=7316379009&authuser=0&sf=kp&subid=de-de-awhp-g-aw-c-t-kwp-hero%21o2')
    return result

In [256]:
# Run
final = []
base_queries = pd.read_csv('initial_queries.csv')
for index, row in base_queries.iterrows():
    similar_queries = get_queries_keyword_planner(driver, row['query'])
    final.append(
        {
            'category': row['category'],
            'subcategory0': row['subcategory0'],
            'subcategory1': row['subcategory1'],
            'base_query': row['query'],
            'similar_queries': similar_queries
        }
    )

In [257]:
# df = pd.DataFrame(final)
# df.to_csv('keyword_planner_suggestions.csv')

### answerthepublic.com

In [1]:
def get_queries_answerthepublic(query):
    '''
    Uses answerthepublic.com to get long-tail queries for a given keyword.
    Returns a list of queries.
    :param string query: The keyword to get long-tail queries for
    '''
    result = []
    new_keywords = True
    length = 0
    proxy_url = ''
    driver = create_selenium_driver('uc', proxy_url)
    driver.get('https://answerthepublic.com/de')
    try:
        WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//input[@data-new-search-target="keyword"]'))).send_keys(query)
        driver.find_element(By.XPATH, '//input[@value="Search"]').click()
        time.sleep(20)
    except Exception as e:
        print(e)
        return result
    try:
        modal = driver.find_elements(By.XPATH, '//img[@data-action="click->sign-in-modal#close"]')
        if modal:
            modal[0].click()
    except Exception as e:
        print('no modal')
    try:
        WebDriverWait(driver, 60).until(EC.element_to_be_clickable((By.XPATH, '//a[@href="#tab-list"]'))).click()
        time.sleep(10)
    except Exception as e:
        print(e)
        return result
    while new_keywords:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(10)
        keywords = [x.text for x in driver.find_elements(By.XPATH, '//div[@class="lt__keyword"]')[1:]]
        if len(keywords) > length:
            new_keywords = True
            length = len(keywords)
            time.sleep(10)
        else:
            new_keywords = False
    driver.quit()    
    return keywords

In [99]:
# Run
final = []
base_queries = pd.read_csv('initial_queries.csv')
for index, row in base_queries.iterrows():
    similar_queries = get_queries_answerthepublic(row['base_query'])
    final.append(
        {
            'category': row['category'],
            'subcategory0': row['subcategory0'],
            'subcategory1': row['subcategory1'],
            'base_query': row['base_query'],
            'similar_queries': similar_queries
        }
    )

In [9]:
# df = pd.DataFrame(final)
# df.to_csv('answerthepublic_suggestions.csv')

### Combine suggestions by kwp and answerthepublic into one file. Then take a random choice of ten queries per base query while avoiding duplicates

In [186]:
# Load files with suggestions
kwp = pd.read_csv('keyword_planner_suggestions.csv', index_col=0).reset_index()
atp = pd.read_csv('answerthepublic_suggestions.csv', index_col=0).reset_index()
kwp.drop(['index'], inplace=True, axis=1)
atp.drop(['index'], inplace=True, axis=1)

In [187]:
# Consolidate
suggestions = pd.merge(kwp, atp, on=['base_query', 'category', 'subcategory0', 'subcategory1'], how='right', suffixes=('_kwp', '_atp'))
suggestions[['similar_queries_kwp', 'similar_queries_atp']] = suggestions[['similar_queries_kwp', 'similar_queries_atp']].apply(lambda x: x.apply(ast.literal_eval))

In [188]:
# Calculate number of suggestions for each query
suggestions[['num_kwp', 'num_atp']] = suggestions[['similar_queries_kwp', 'similar_queries_atp']].apply(lambda x: x.apply(len))
suggestions['num_total'] = suggestions['num_kwp'] + suggestions['num_atp']

In [189]:
# Select queries to use for analysis avoiding duplicates
selection = []
for i, row in suggestions.iterrows():
    choice = []
    choice.append(row['base_query'])
    while len(choice) < 10:
        kwp_or_atp = random.randint(0, 1)
        try:
            if kwp_or_atp == 0:
                query = random.choice(row['similar_queries_kwp'])
            else:
                query = random.choice(row['similar_queries_atp'])
        except Exception as e:
            continue
        if query not in choice:
            choice.append(query)
    selection.append(choice)
suggestions['selection'] = selection

In [197]:
suggestions.to_csv('suggestions.csv')

### Prepare txt file to run by script

In [203]:
# Load keywords and write to txt file
df = pd.read_csv('suggestions.csv', index_col=0)
df['selection'] = df['selection'].apply(ast.literal_eval)
with open('./queries.txt', 'w') as file:
    for index, row in df.iterrows():
        for query in row['selection']:
            file.write(f'{row["category"]},{row["subcategory0"]},{row["subcategory1"]},{query}\n')

## III) Collecting Sources
The following code has been refactored into a script that can be run as a docker container. See: ./perplexity-google-script

In [5]:
def get_sources_google(driver, query):
    '''
    Returns a list of domains for Google's first-page search results for a given query.
    :param obj driver: The selenium driver
    :param str query: The query to use
    '''
    urls = []
    driver.get(f'https://www.google.de/search?q={query}')
    cookie_consent = driver.find_elements(By.XPATH, '//button/div[text()="Alle ablehnen"]')
    if cookie_consent:
        cookie_consent[0].click()
    for x in driver.find_elements(By.XPATH, '//cite[@role="text"]'):
        if x.text != '':
            if 'http' in x.text:
                urls.append(re.split(' › ', x.text)[0])
            elif 'Aufrufe' or 'Follower' in x.text:
                urls.append('youtube.com')
            else:
                continue
    return urls

In [6]:
def get_sources_perplexity(driver, query):
    '''
    Returns a list of sources used by perplexity.ai for a given query.
    :param obj driver: The selenium driver (needs to be seleniumbase with UC enabled)
    :param str query: The query to use
    '''
    driver.get('https://perplexity.ai')
    driver.find_element(By.XPATH, '//textarea[@placeholder="Ask anything..."]').send_keys(query)
    time.sleep(random.randint(0, 3))
    driver.find_element(By.XPATH, '//button[@aria-label="Submit"]').click()
    time.sleep(random.randint(5, 10))
    view_all_sources = driver.find_elements(By.XPATH, '//div[contains(@class, "grid-flow-col")]/div[contains(@class, "flex")]')
    if view_all_sources:
        driver.find_element(By.XPATH, '//div[contains(@class, "grid-flow-col")]/div[contains(@class, "flex")]').click()
        time.sleep(random.randint(0, 2))
        return [x.get_attribute('href') for x in driver.find_elements(By.XPATH, '//div[contains(@class, "gap-md")]//a')]
    else:
        return [x.get_attribute('href') for x in driver.find_elements(By.XPATH, '//div[contains(@class, "grid-flow-col")]/div/button/a')]

In [212]:
def compare_searches(queries):
    '''
    :param list queries: A list of lists with queries [category, subcategory0, subcategory1, query]
    '''
    results = []
    driver = create_selenium_driver('uc')
    for i, v in enumerate(queries):
        result = {}
        try:
            google_sources = [tldextract.extract(x).domain + '.' + tldextract.extract(x).suffix for x in get_sources_google(driver, v[3])]
        except Exception as e:
            google_sources = [None]
            print(e)
        try:
            perplexity_sources = [tldextract.extract(x).domain + '.' + tldextract.extract(x).suffix for x in get_sources_perplexity(driver, v[3])]
        except Exception as e:
            perplexity_sources = [None]
            print(e)
        result['category'] = v[0]
        result['subcategory0'] = v[1]
        result['subcategory1'] = v[2]
        result['query'] = v[3]
        result['google'] = google_sources
        result['perplexity'] = perplexity_sources
        results.append(result)
    driver.quit()
    return results