### hybrid html 'selenium' parse with 'beautiful soup' lib

In [1]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException
from selenium.webdriver.common.keys import Keys
# from selenium.webdriver.common.action_chains import ActionChains
from bs4 import BeautifulSoup
import pickle
from itertools import chain
from os.path import exists
from datetime import datetime, timezone, timedelta
import time
import random as rnd
# import types
import twitter_config as cfg

DATA_ROOT = 'twitter_data/'

class Webbroswer(object):
  def __init__(self, **params):
    self.user = params.get('username', None)
    self.passwd = params.get('password', None)
    self.verify = params.get('verify', None)
    self.driver = params.get('driver', 'Chrome')
    self.mobile = params.get('mobile', False)
    self.proxy = params.get('proxy', False)
    assert self.user is not None, '"username" param required'
    assert self.passwd is not None, '"password" param required'
    assert self.verify is not None, '"verify" param required'
    assert self.driver in ['Chrome', 'Firefox', 'Safari', 'IE'], 'unsupported browser type'
    self.cookies_file = ''.join([DATA_ROOT, self.verify, '_', self.driver, '.cookies'])
    self.browsing_file = self.cookies_file.replace('.cookies', '.json')
    self.cookies_cache = []
    self.browser = None
    self.url = None

  def clear_browsing(self):
    self.url = cfg.BASE_URL

  def save_browsing(self):
    with open(self.browsing_file, "w") as file:
      file.write(self.url)

  def load_browsing(self):
    if not exists(self.browsing_file): return
    with open(self.browsing_file, "r") as file:
      self.url = file.read()

  def save_cookies(self):
    self.cookies_cache = self.browser.get_cookies()
    with open(self.cookies_file, "wb") as file:
      pickle.dump(self.cookies_cache, file)

  def load_cookies(self):
    if not exists(self.cookies_file): return
    with open(self.cookies_file, "rb") as file:
      self.cookies_cache = pickle.load(file)

  def apply_cookies(self):
    for cookie in self.cookies_cache:
      self.browser.add_cookie(cookie)

  def open_browser(self, url=None):
    # --- browser driver obj
    if self.driver == 'Chrome':
      browser_options = webdriver.ChromeOptions()
      if self.mobile:
        browser_options.add_experimental_option('mobileEmulation', {'deviceName':'Nexus 7'})
      if self.proxy:
        browser_options.add_argument(f'--proxy-server={self.proxy}')
      self.browser = webdriver.Chrome(options=browser_options)
    elif self.driver == 'Firefox':
      self.browser = webdriver.Firefox()
    elif self.driver == 'Safari':
      self.browser = webdriver.Safari()
    elif self.driver == 'IE':
      self.browser = webdriver.Ie()
    self.browser.implicitly_wait(10)
    # --- load data
    self.load_cookies()
    # --- set url
    if url is None: self.load_browsing()
    else: self.url = url
    if self.url is None: self.url = cfg.BASE_URL
    # --- open url, apply cookies
    self.browser.get(self.url)
    self.apply_cookies()
    self.browser.refresh()

  def close_browser(self, clear_browsing=False):
    if self.browser:
      self.save_cookies()
      if clear_browsing: self.clear_browsing()
      self.save_browsing()
      self.browser.quit()
      self.url = None

  def open_page(self, url=cfg.BASE_URL, forceRefresh=False):
    refresh_needed = forceRefresh
    if self.browser is None:
      self.open_browser(url='about:blank')
      refresh_needed = True
    if url != self.url:
      self.browser.get(url)
      self.url = url
    if refresh_needed:
      self.browser.refresh()

  def logged_in(self):
    self._random_timeout()
    logged_in = False
    try:
      verify_field = self.browser.find_element(By.XPATH, "//*[@data-testid='login']")
    except NoSuchElementException:
      logged_in = True
    finally:
      return logged_in

  def login_username(self, username):
    login_required = False
    try:
      username_field = self.browser.find_element(By.XPATH, "//*[@autocomplete='username']")
      username_field.send_keys(username)
      next_button = self.browser.find_element(By.XPATH, "//*[text()='Next']/../..")
      next_button.click()
      login_required = True
    except NoSuchElementException:
      print('seems already logged in')
    finally:
      return login_required

  def login_verify(self, verify):
    self._random_timeout()
    try:
      verify_field = self.browser.find_element(By.XPATH, "//*[@data-testid='ocfEnterTextTextInput']")
      verify_field.send_keys(verify)
      next_button = self.browser.find_element(By.XPATH, "//*[text()='Next']/../..")
      next_button.click()
    except NoSuchElementException:
      print('verify name skipped')

  def login_password(self, password):
    try:
      password_field = self.browser.find_element(By.XPATH, "//*[@autocomplete='current-password']")
      password_field.send_keys(password)
      login_button = self.browser.find_element(By.XPATH, "//*[text()='Log in']/../..")
      login_button.click()
    except NoSuchElementException:
      print('password skipped')

  def login(self):
    self.browser.implicitly_wait(10)
    self.open_page(cfg.LOGIN_URL)
    self._random_sleep(2)
    if self.login_username(self.user):
      self._random_sleep(2)
      self.login_verify(self.verify)
      self._random_sleep(2)
      self.login_password(self.passwd)

  def click_following_on_homepage(self):
    if self.browser.current_url != cfg.HOME_URL:
      self.open_page(cfg.HOME_UR, True)
    self.browser.implicitly_wait(0.3)
    try:
      following_tab = self.browser.find_element(By.XPATH, "//*[@data-testid='ScrollSnap-List']//span[text()='Following']")
      # next_button = self.browser.find_element(By.XPATH, "//*[text()='Next']/../..")
      following_tab.click()
    except NoSuchElementException:
      print('following tab not found')

  def scroll_to_bottom(self, scroll_round=1):
    SCROLL_PAUSE_TIME = 2
    # scroll to bottom until no more content loaded
    last_height = self.browser.execute_script("return document.body.scrollHeight")
    for i in range(scroll_round):
      self.browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
      time.sleep(SCROLL_PAUSE_TIME)
      new_height = self.browser.execute_script("return document.body.scrollHeight")
      if new_height == last_height:
        break
      last_height = new_height

  def _random_timeout(self, least=0.5):
    self.browser.implicitly_wait(rnd.random() + least)

  def _random_sleep(self, least=1, dimension=1):
    time.sleep(rnd.random()*dimension + least)

  def _pick_tweet_text(self, func):
    text_element = func(attrs={"data-testid":"tweetText"})
    return text_element.text if text_element else None

  def _pick_tweet_media_items(self, func, attrs, sel):
    divs = func(attrs=attrs)
    elements = []
    for div in divs:
      _elments = [e['src'] for e in div.select(sel)]
      elements = chain(elements, _elments)
    # return list(elements) # element duplicated
    return list(dict.fromkeys(elements))

  def _pick_tweet_imgs(self, func):
    return self._pick_tweet_media_items(func, {"data-testid":"tweetPhoto"}, 'img[src]')

  def _pick_tweet_videos(self, func):
    return self._pick_tweet_media_items(func, {"data-testid":"videoComponent"}, 'video[src]')

  def _pick_tweet_stats(self, func):
    elements = func(attrs={"data-testid":"app-text-transition-container"})
    if len(elements) == 0: return None
    tweet_stats = [x.text for x in elements] # [ reply, repost, like, view]
    stats_len = len(tweet_stats)
    return {
      'reply':  tweet_stats[0] if stats_len > 0 else None,
      'repost': tweet_stats[1] if stats_len > 1 else None,
      'like':   tweet_stats[2] if stats_len > 2 else None,
      'view':   tweet_stats[3] if stats_len > 3 else None
    }

  def _pick_tweek_link(self, func):
    te = func('time')
    if te is None: return None
    element = te.find_parent('a')
    return element['href'] if element else None

  def _pick_tweet_timestamp(self, func):
    element = func('time')
    if element is None: return None
    timestamp = element['datetime']
    return datetime.fromisoformat(timestamp.rstrip('Z')).replace(tzinfo=timezone.utc)

  def _pick_quote_owner(self, func):
    user_element = func(attrs={"data-testid":"User-Name"})
    ne = user_element.find(lambda tag: tag.string and tag.string.startswith('@'))
    return ne.text[1:] if ne else None

  def _pick_quote_link(self, func):
    text_element = func(attrs={"data-testid":"tweetText"})
    element = text_element.find('a')
    return element['href'] if element else None

  def _gen_res(self, res_map):
    if not bool(res_map): return None
    res = {}
    for res_type in res_map:
      f = res_map[res_type]['f']
      arg = res_map[res_type]['arg']
      res[res_type] = f(arg)
    return res

  def _parse_tweet(self, tweet):
    tweet_anchor = tweet
    tweet_find_name = 'find_next'
    tweet_find_all_name = 'find_all_next'

    quote_res_map = None
    quote = tweet.find('span', string='Quote')
    if quote:
      tweet_anchor = quote
      tweet_find_name = 'find_previous'
      tweet_find_all_name = 'find_all_previous'

      quote_find = getattr(quote, 'find_next')
      quote_find_all = getattr(quote, 'find_all_next')
      quote_res_map = {
        'owner': {'f': self._pick_quote_owner, 'arg': quote_find},
        'link': {'f': self._pick_quote_link, 'arg': quote_find},
        'text': {'f': self._pick_tweet_text, 'arg': quote_find},
        'timestamp': {'f': self._pick_tweet_timestamp, 'arg':quote_find},
        'imgs': {'f': self._pick_tweet_imgs, 'arg': quote_find_all},
        'videos': {'f': self._pick_tweet_videos, 'arg':quote_find_all},
      }

    tweet_find = getattr(tweet_anchor, tweet_find_name)
    tweet_find_all = getattr(tweet_anchor, tweet_find_all_name)
    tweet_res_map = {
      'text': {'f': self._pick_tweet_text, 'arg': tweet_find},
      'timestamp': {'f': self._pick_tweet_timestamp, 'arg':tweet_find},
      'link': {'f': self._pick_tweek_link, 'arg': tweet_find},
      'imgs': {'f': self._pick_tweet_imgs, 'arg': tweet_find_all},
      'videos': {'f': self._pick_tweet_videos, 'arg':tweet_find_all},
      'stats': {'f': self._pick_tweet_stats, 'arg': getattr(tweet, 'find_all')}
    }

    return {
      'tweet': self._gen_res(tweet_res_map),
      'quote': self._gen_res(quote_res_map)
    }

  def tweets_from_html(self, html):
    # soup = BeautifulSoup(html, 'html.parser')
    # soup = BeautifulSoup(html, "html5lib")
    soup = BeautifulSoup(html, 'lxml')
    tweets = soup.find_all(attrs={"data-testid":"tweet"})
    ret = [self._parse_tweet(t.extract()) for t in tweets]
    return ret

  def html_of_browser_element(self, element):
    return self.browser.execute_script("return arguments[0].outerHTML;", element) 

  def first_load_html(self):
    # --- select the root element and use BS4 to parse it
    # tweets_text = self.browser.find_elements(By.XPATH, "//*[@data-testid='tweetText']/span")
    # tweets_text = browser.find_elements(By.CSS_SELECTOR, "[data-testid='tweetText']")
    # tweets = self.browser.find_elements(By.XPATH, "//*[@data-testid='tweet']")
    html_segment = self.browser.find_element(By.XPATH, "//*[@data-testid='primaryColumn']")
    return self.html_of_browser_element(html_segment)

  def _find(self, node, method, pattern, timeout=2):
    self._random_timeout(0.1)
    find_func = getattr(node, method)
    elapsed_time = 0
    ret = None
    while ret is None and elapsed_time < timeout:
      tn = time.time()
      try:
        ret = find_func(By.XPATH, pattern)
      except NoSuchElementException:
        ret = None
      except StaleElementReferenceException:
        ret = None
        break
      elapsed_time += time.time() - tn

    return ret

  def find_element(self, node, pattern, timeout=2):
    return self._find(node=node, method='find_element', pattern=pattern, timeout=timeout)

  def find_elements(self, node, pattern, timeout=2):
    return self._find(node=node, method='find_elements', pattern=pattern, timeout=timeout)

  def merge_tweets(self, tweets, more_tweets):
    for tweet in more_tweets:
      if all(t['tweet']['link'] != tweet['tweet']['link'] for t in tweets):
        tweets.append(tweet)
    return tweets

  def load_more_tweets(self, node, pgdn_count=1, timeout=2):
    # press PgDn key to load more tweets
    for ct in range(pgdn_count):
      self.browser.switch_to.active_element.send_keys(Keys.PAGE_DOWN)
      self._random_sleep(0.1, 0.9)

    # load more divs; parse more tweet divs as tweets (list of tweet dict)
    more_tweet_divs = self.find_elements(node=node,
                                         pattern="following-sibling::*[@data-testid='cellInnerDiv']",
                                         timeout=timeout)
    more_tweets = []
    if more_tweet_divs is not None:
      for tweet_div in more_tweet_divs:
        div_tweets = self.tweets_from_html(self.html_of_browser_element(tweet_div))
        more_tweets = chain(more_tweets, div_tweets)
      more_tweets = list(more_tweets)

    return more_tweets, more_tweet_divs
  
  def _parse_tracetime(self, traceback):
    time_unit_map = { 's': 'seconds', 'm': 'minutes', 'h': 'hours', 'd': 'days', 'w': 'weeks' }
    value = int(traceback[:-1])
    unit = traceback[-1]
    if unit not in time_unit_map:
      raise ValueError(f"Unknown time unit: {unit}")
    return timedelta(**{time_unit_map[unit]: value})

  def _tweets_after_time(self, tweets, targettime, timestamps={}, stat={'miss':0,'meet':0, 'stable':0}):
    miss = 0
    for tweet in tweets:
      link = tweet['tweet']['link']
      if link is None: continue # skip non tweet
      split = link.split('/')
      account = split[1]
      timestamp = tweet['tweet']['timestamp']
      timestamps[account] = timestamp
      if timestamp < targettime: stat['meet'] += 1
      else: miss += 1
    # stat['weight'] = stat['meet'] / (miss + stat['miss'])
    stat['miss'] += miss
    if miss > 0: stat['stable'] = 0
    else: stat['stable'] += 1
    # stat['weight'] = stat['miss'] / (miss + stat['miss'])
    return stat, timestamps

  def load_tweets(self, traceback=None, stable=5, more_round=0, timeout=10):
    # on page loaded, find tweet cell divs
    tweet_divs = self.find_elements(node=self.browser, pattern="//*[@data-testid='cellInnerDiv']", timeout=timeout)
    if not tweet_divs: return []

    # parse existing tweets from html of browser
    tweets = self.tweets_from_html(self.first_load_html())

    # traceback target time
    if traceback:
      # check tweets' time back to time of traceback point
      targettime = datetime.now(timezone.utc) - self._parse_tracetime(traceback)
      stat, timestamps = self._tweets_after_time(tweets, targettime)

      # load more if most of tweets' time not yet reach back to trace point
      last_div = tweet_divs[-1]
      while stat['stable'] < stable:
        print(f'stable: {stat['stable']:.2f}, meet: {stat['meet']}, miss: {stat['miss']}')
        more_tweets, more_divs = self.load_more_tweets(node=last_div, pgdn_count=1, timeout=timeout)
        tweets = self.merge_tweets(tweets, more_tweets)
        if more_divs:
          stat, timestamps = self._tweets_after_time(more_tweets, targettime, timestamps, stat)
          last_div = more_divs[-1]
        else:
          time.sleep(0.1)
        # break
    # load more if 'more_round' given
    else:
      for round in range(more_round):
        more_tweets, more_divs = self.load_more_tweets(node=last_div, pgdn_count=1, timeout=timeout)
        tweets = self.merge_tweets(tweets, more_tweets)
        if more_divs:
          last_div = more_divs[-1]
        else:
          time.sleep(0.1)
          # break

    return tweets

  def load_url_tweets(self, url, traceback=None, stable=5, more_round=0, timeout=10):
    self.open_page(url, True)
    return self.load_tweets(traceback, stable, more_round, timeout)

  def wait_page_loading(self, timeout=30):
    tweet_div = self.find_element(node=self.browser, pattern="//*[@data-testid='cellInnerDiv']", timeout=timeout)
    return True if tweet_div else False

  def follow_accounts(self, accounts):
    for account in accounts:
      url = cfg.BASE_URL + '/' + account
      self.open_page(url)
      follow_button = self.find_element(self.browser, "//*[@data-testid='placementTracking']//span[text()='Follow']")
      if follow_button: follow_button.click()
      time.sleep(0.2)

  def unfollow_accounts(self, accounts):
    for account in accounts:
      url = cfg.BASE_URL + '/' + account
      self.open_page(url)
      # find 'Following' button to popup; then click 'Unfollow'
      popup = self.find_element(self.browser, "//*[@data-testid='placementTracking']//span[text()='Following']")
      if popup:
        popup.click()
        time.sleep(0.2)
        unfollow = self.find_element(self.browser, "//*[@data-testid='confirmationSheetConfirm']//span[text()='Unfollow']")
        if unfollow: unfollow.click()
      else:
        # try to find unfollow icon to click
        icon = self.find_element(self.browser, f"//*[@aria-label='Unfollow @{account}']")
        if icon:
          icon.click()
          unfollow = self.find_element(self.browser, f"//*[@data-testid='Dropdown']//span[text()='Unfollow @{account}']")
          if unfollow: unfollow.click()
      time.sleep(0.2)

  def print(self, tweets):
    print(f' num of tweets: {len(tweets)} '.center(50, '-'))
    LB_PAD = 6
    def print_l(label, lines, label_pad=LB_PAD):
      if isinstance(lines, list):
        if len(lines) > 0:
          print(label.ljust(label_pad, ' '), lines[0])
          for l in lines[1:]: print(' '*LB_PAD, l)
        else:
          print(label.ljust(label_pad, ' '), None)
      else:
        print(label.ljust(label_pad, ' '), lines)
  
    for i in range(0, len(tweets)):
      t = tweets[i]
      print(f' tweet<{i}> '.center(50, '-'))
      print_l('link:', t['tweet']['link'])
      print_l('text:', t['tweet']['text'].split('\n') if  t['tweet']['text'] else None)
      print_l('imgs:', t['tweet']['imgs'])
      print_l('vids:', t['tweet']['videos'])
      print_l('stats:',t['tweet']['stats'])
      print_l('date:', t['tweet']['timestamp'].astimezone() if t['tweet']['timestamp'] else None)
      if t['quote']:
        print(' quote '.center(30, '~'))
        print_l('owner:',t['quote']['owner'])
        print_l('text:',  t['quote']['text'].split('\n') if t['quote']['text'] else None)
        print_l('link:',  t['quote']['link'] if t['quote']['link'] else None)
        print_l('imgs:',  t['quote']['imgs'])
        print_l('vids:',  t['quote']['videos'])
        print_l('date:',  t['quote']['timestamp'].astimezone() if t['quote']['timestamp'] else None)

### test

In [3]:
if __name__ == '__main__':
  def launch_browser(login=False, homepage=True):
    account = cfg.ACCOUNTS[0]
    account['mobile'] = False
    mb = Webbroswer(**account)
    mb.open_browser()
    if login: mb.login()
    if homepage: mb.click_following_on_homepage()
    return mb

  def print_tweets(mb, url):
    print('')
    print(f' {url} '.center(80, '='))
    tn = time.time()
    ts = mb.load_tweets(url)
    print(f'------ elapsed: {(time.time() - tn):.2f}s' )
    mb.print(ts)

  def print_tweets_on_following_page(mb: Webbroswer):
    print('')
    print(f' {cfg.HOME_URL} '.center(80, '='))
    tn = time.time()
    ts = mb.load_tweets(traceback='1h')
    print(f'------ elapsed: {(time.time() - tn):.2f}s' )
    mb.print(ts)

  def test_url_tweets():
    URL1 = 'https://twitter.com/elonmusk'
    URL2 = 'https://twitter.com/VitalikButerin'
    mb = launch_browser()
    time.sleep(5)
    print_tweets(mb, URL1)
    time.sleep(5)
    print_tweets(mb, URL2)
    mb.close_browser(clear_browsing=True)

  def test_following_tweets():
    mb = launch_browser()
    time.sleep(2)
    print_tweets_on_following_page(mb)

  def test_accounts_follow():
    accounts = ['elonmusk', 'VitalikButerin']
    mb = launch_browser(homepage=False)
    time.sleep(4)
    mb.follow_accounts(accounts)

  def test_accounts_unfollow():
    accounts = ['elonmusk', 'VitalikButerin']
    mb = launch_browser(homepage=False)
    time.sleep(4)
    mb.unfollow_accounts(accounts)

  test_following_tweets()


stable: 0.00, meet: 3, miss: 7
stable: 1.00, meet: 9, miss: 7
stable: 2.00, meet: 12, miss: 7
stable: 3.00, meet: 15, miss: 7
stable: 4.00, meet: 17, miss: 7
------ elapsed: 5.33s
--------------- num of tweets: 25 ----------------
-------------------- tweet<0> --------------------
link:  /Law360/status/1734203905616347381
text:  Welcome to Monday and a quick roundup of some of Law360's biggest news.
imgs:  https://pbs.twimg.com/media/GBEhQvsbMAAGHjZ?format=jpg&name=small
vids:  None
stats: {'reply': '1', 'repost': '', 'like': '', 'view': '245'}
date:  2023-12-11 21:30:05+08:00
-------------------- tweet<1> --------------------
link:  /Law360/status/1734205167724319054
text:  The D.C. Circuit issued a narrowed gag order restraining Donald Trump's public statements amid his criminal election-interference case after finding a lower court restricted "more protected speech than is necessary."
imgs:  None
vids:  None
stats: {'reply': '', 'repost': '', 'like': '', 'view': '58'}
date:  2023-1