In [2]:
# Import packages
import pandas as pd
import numpy as np
from getpass import getpass
from time import sleep
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import StaleElementReferenceException
from webdriver_manager.chrome import ChromeDriverManager
import csv


from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions
from selenium.common import exceptions

### Get webdriver, and then get username, password and search term as input to use in scraper

In [3]:
# Create webdriver instance
def create_webdriver_instance():
    driver = webdriver.Chrome()
    return driver


# Get username as input
def get_username():
    print("Enter your username or email and press ENTER:")
    myusername = input()
    return myusername
 
    
# Get password as input    
def get_password():
    print("Enter your password and press ENTER:")
    mypassword = getpass()
    return mypassword


# Get search term as input
def get_search_term():
    print("Enter your desired search term and press ENTER:")
    mysearchterm = input()
    return mysearchterm

# Name output file
def get_filename():
    print("Enter your desired filename and press ENTER:")
    myfilename = input()
    return myfilename


### These functions log in to twitter and enter the search terms

In [4]:
# Function to log in to Twitter using username and password via specified webdriver
def login_to_twitter(myusername, mypassword, driver):
    url = 'https://www.twitter.com/login'
    try:
        driver.get(url)
        xpath_username = '//input[@name="session[username_or_email]"]'
        WebDriverWait(driver, 10).until(expected_conditions.presence_of_element_located((By.XPATH, xpath_username)))
        uid_input = driver.find_element_by_xpath(xpath_username)
        uid_input.send_keys(myusername)
    except exceptions.TimeoutException:
        print("Timeout while waiting for Login screen")
        return False

    pwd_input = driver.find_element_by_xpath('//input[@name="session[password]"]')
    pwd_input.send_keys(mypassword)
    try:
        pwd_input.send_keys(Keys.RETURN)
        url = "https://www.twitter.com/home"
        WebDriverWait(driver, 10).until(expected_conditions.url_to_be(url))
    except exceptions.TimeoutException:
        print("Timeout while waiting for home screen")
    return True


# Function to enter search term
def enter_search_term(mysearchterm, driver):
    mysearch = mysearchterm
    driver.find_element_by_xpath('//a[@data-testid="AppTabBar_Explore_Link"]').click()
    search_input = driver.find_element_by_xpath('//input[@aria-label="Search query"]')
    search_input.send_keys(mysearch)
    search_input.send_keys(Keys.RETURN)


### The following functions collect the data from Tweets and save them to a csv as they are collected

In [5]:
def generate_tweet_id(tweet):
    return ''.join(tweet)


def scroll_down_page(driver, last_position, num_seconds_to_load=0.5, scroll_attempt=0, max_attempts=5):
    end_of_scroll_region = False
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    sleep(num_seconds_to_load)
    curr_position = driver.execute_script("return window.pageYOffset;")
    if curr_position == last_position:
        if scroll_attempt < max_attempts:
            end_of_scroll_region = True
        else:
            scroll_down_page(last_position, curr_position, scroll_attempt + 1)
    last_position = curr_position
    return last_position, end_of_scroll_region


def save_tweet_data_to_csv(records, filepath, mode='a+'):
    header = ['User', 'Handle', 'PostDate', 'TweetText', 'ReplyCount', 'RetweetCount', 'LikeCount']
    with open(filepath, mode=mode, newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        if mode == 'w':
            writer.writerow(header)
        if records:
            writer.writerow(records)


def collect_all_tweets_from_current_view(driver, lookback_limit=25):
    page_cards = driver.find_elements_by_xpath('//div[@data-testid="tweet"]')
    if len(page_cards) <= lookback_limit:
        return page_cards
    else:
        return page_cards[-lookback_limit:]


def extract_data_from_current_tweet_card(card):
    try:
        user = card.find_element_by_xpath('.//span').text
    except exceptions.NoSuchElementException:
        user = ""
    except exceptions.StaleElementReferenceException:
        return
    try:
        handle = card.find_element_by_xpath('.//span[contains(text(), "@")]').text
    except exceptions.NoSuchElementException:
        handle = ""
    try:
        postdate = card.find_element_by_xpath('.//time').get_attribute('datetime')
    except exceptions.NoSuchElementException:
        return
    try:
        _comment = card.find_element_by_xpath('.//div[2]/div[2]/div[1]').text
    except exceptions.NoSuchElementException:
        _comment = ""
    try:
        _responding = card.find_element_by_xpath('.//div[2]/div[2]/div[2]').text
    except exceptions.NoSuchElementException:
        _responding = ""
    tweet_text = _comment + _responding
    try:
        reply_count = card.find_element_by_xpath('.//div[@data-testid="reply"]').text
    except exceptions.NoSuchElementException:
        reply_count = ""
    try:
        retweet_count = card.find_element_by_xpath('.//div[@data-testid="retweet"]').text
    except exceptions.NoSuchElementException:
        retweet_count = ""
    try:
        like_count = card.find_element_by_xpath('.//div[@data-testid="like"]').text
    except exceptions.NoSuchElementException:
        like_count = ""

    tweet = (user, handle, postdate, tweet_text, reply_count, retweet_count, like_count)
    return tweet


### Main function to call all other functions

In [6]:
def main(myusername, mypassword, mysearchterm, filepath, page_sort='Latest'):
    try:
        # create empty .csv file to save tweet data to
        save_tweet_data_to_csv(None, filepath, 'w')  
        last_position = None
        end_of_scroll_region = False
        unique_tweets = set()

        # create new webdriver instance
        driver = create_webdriver_instance()
        logged_in = login_to_twitter(myusername, mypassword, driver)
        if not logged_in:
            return

        # enter search term on explore page
        enter_search_term(mysearchterm, driver)

        #collect Tweets and scroll infinitely, save Tweets as they are collected
        while not end_of_scroll_region:
            cards = collect_all_tweets_from_current_view(driver)
            for card in cards:
                try:
                    tweet = extract_data_from_current_tweet_card(card)
                except exceptions.StaleElementReferenceException:
                    continue
                if not tweet:
                    continue
                tweet_id = generate_tweet_id(tweet)
                if tweet_id not in unique_tweets:
                    unique_tweets.add(tweet_id)
                    save_tweet_data_to_csv(tweet, filepath)
            last_position, end_of_scroll_region = scroll_down_page(driver, last_position)
        driver.quiralt()
    except:
        print("Scrape terminated")


### Call main function

In [7]:
if __name__ == '__main__':
    myusername = get_username()
    mypassword = get_password()
    filepath = get_filename()
    mysearchterm = get_search_term()

    
main(myusername, mypassword, mysearchterm, filepath, page_sort='Latest')

Enter your username or email and press ENTER:
0478632991
Enter your password and press ENTER:
········
Enter your desired filename and press ENTER:
harleydavidson.csv
Enter your desired search term and press ENTER:
harley davidson australia
Timeout while waiting for home screen
Scrape terminated
