In [6]:
%load_ext dotenv
%dotenv

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [84]:
from bs4 import BeautifulSoup
import os, time, pickle
from selenium import webdriver

In [38]:
class Twitter:
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self.driver = webdriver.Firefox()
        self.url = "https://twitter.com/"
        
        self.driver.implicitly_wait(1)
        self.init_twitter_state = self.login()
        
    def get_driver(self):
        """
        Returns the geckodriver instance for this class.
        """
        return self.driver
    
    def login(self):
        """ Login to Twitter """
        try:
            self.driver.get(self.url + "login")
            username_field = self.driver.find_element_by_name("session[username_or_email]")
            password_field = self.driver.find_element_by_name("session[password]")
            username_field.send_keys(self.username)
            self.driver.implicitly_wait(1)
            password_field.send_keys(self.password)
            self.driver.implicitly_wait(1)
            password_field.submit()
            return True
        except:
            return False

In [77]:
class User:
    def __init__(self, tag, twitter):
        self.tag = tag
        self.followers = set()
        self.following = set()
        self.twitter = twitter
        self.driver = twitter.get_driver()
        self.init_twitter_state = twitter.init_twitter_state
    
    """
    def init_twitter(self):
        if(self.init_twitter_state is False):
            twitter = Twitter()
            self.driver = twitter.get_driver()
            self.init_twitter_state = twitter.login()
    """
    def scrape_following(self):
        self.scrape_followx("Following")

    def scrape_followers(self):
        self.scrape_followx("Followers")
    
    def scrape_followx(self, fx):
        """
        Scrape the followers from a given user.
        """
        if(self.tag is None or self.init_twitter_state is False):
            return
    
        url = f"https://twitter.com/{self.tag}/{fx.lower()}"
        self.driver.get(url)
        self.driver.implicitly_wait(2)

        SCROLL_PAUSE_TIME = 1

        # Get scroll height
        last_height = self.driver.execute_script("return document.body.scrollHeight")
        self.driver.implicitly_wait(1)

        while True:
            followx_els = self.get_followx_html(fx)
            self.serialize_followx(followx_els)
            # Scroll down to bottom
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

            # Wait to load page
            time.sleep(SCROLL_PAUSE_TIME)

            # Calculate new scroll height and compare with last scroll height
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break
            last_height = new_height
            if(len(self.followers) % 200) == 0:
                print(f"We have {len(self.followers)} followers.")
            
    def get_followx_html(self, fx):
        """
        Ask the driver to get an update view of the HTML
        :returns followx_els a list of soup items, they are the followers
        """
        followx_html = self.driver.find_elements_by_xpath(f"//div[@aria-label='Timeline: {fx}']")[0].get_attribute('innerHTML')
        soup = BeautifulSoup(followx_html, "html.parser")
        return soup.div.findChildren("div" , recursive=False)

    def serialize_followx(self, followx_els):
        """
        Converts the soup html into a dictionary 
        :params followers_els Twitter div components of the followers
        """
        followx = {}
        for c in followx_els:
            try:
                self.followers.add(str(c.a['href']))
                followx[c.a['href']] = {
                    'url_tag': c.a['href'],
                    'tag': c.span.text,
                    'profile_img': c.a.img['src'],
                    'profile_text': c.findAll('span')[-1].text
                }
            except:
                pass
        return followx

In [71]:
class TwitterSearch:
    def __init__(self, terms, twitter):
        """
        :terms ['term1', 'term2'] A list of search terms
        """
        self.terms = terms
        self.twitter = twitter
        self.driver = twitter.get_driver()
        self.init_twitter_state = twitter.init_twitter_state
        self.tweets = set()
        
    def get_search_html(self):
        """
        Ask the driver to get an update view of the HTML
        :returns
        """
        timeline_html = self.driver.find_elements_by_xpath("//div[@aria-label='Timeline: Search timeline']")[0].get_attribute('innerHTML')
        soup = BeautifulSoup(timeline_html, "html.parser")
        return soup.div.findChildren("div" , recursive=False)

    def serialize_tweets(self, search_els):
        """
        Converts the soup html into a dictionary 
        :
        """
        for t in search_els:
            try:
                tweet_text = t.find('article').text
                tweet = {
                    "user_tag": tweet_text.split('@', 1)[0],
                    "user_name": tweet_text.split('@', 1)[1].split('·', 1)[0],
                    "tweet_link": [i['href'] for i in t.article.findAll('a') if "/status/" in i['href']][0],
                    "tweet": tweet_text.split('@', 1)[1].split('·', 1)[1]
                }
                self.tweets.add(json.dumps(tweet))
            except:
                pass

    
    def get_search_url(self):
        """
        Creates the search URL from the search terms and twitter URL.
        """
        search_terms = " ".join(self.terms)
        return f"{self.twitter.url}/search?q={search_terms}&src=typed_query&f=live"
            
    def search(self):
        if(self.terms is None or self.init_twitter_state is False):
            return
    
        search_url = self.get_search_url()
        self.driver.get(search_url)
        time.sleep(2)
        self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        self.driver.implicitly_wait(1)
        
        SCROLL_PAUSE_TIME = 1

        # Get scroll height
        last_height = self.driver.execute_script("return document.body.scrollHeight")
        self.driver.implicitly_wait(1)

        while True:
            search_els = self.get_search_html()
            self.serialize_tweets(search_els)
            # Scroll down to bottom
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

            # Wait to load page
            time.sleep(SCROLL_PAUSE_TIME)

            # Calculate new scroll height and compare with last scroll height
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break
            last_height = new_height
            if(len(self.tweets) > 20):
                break

In [48]:
# use python-dotenv to create an .env file with your credentials.
twitter_username = os.getenv("TWITTER_USERNAME")
twitter_password = os.getenv("TWITTER_PASSWORD")
twitter = Twitter(twitter_username, twitter_password)

In [78]:
ktn = User("KTNUK", twitter)
idealdn = User("IDEALondon", twitter)
elabs = User("eagle_labs", twitter)
cx = User("conceptionxtech", twitter)

In [90]:
elabs.scrape_following()

We have 200 followers.
We have 400 followers.
We have 600 followers.
We have 800 followers.
We have 1000 followers.
We have 1200 followers.
We have 1400 followers.
We have 1600 followers.
We have 2000 followers.
We have 2200 followers.
We have 2600 followers.
We have 2800 followers.
We have 3000 followers.
We have 3200 followers.


In [91]:
with open('elabs_followings.txt', 'wb') as fp:
    pickle.dump(elabs.followers, fp)

In [95]:
elabs.followers.intersection(idealdn.followers).intersection(ktn.followers)

{'/CJBS_EC',
 '/DigiCatapult',
 '/Digicatbrighton',
 '/FabLabLondon',
 '/LMarks',
 '/MarijaButkovic',
 '/NestaChallenges',
 '/Plexalcity',
 '/RAEng_Hub',
 '/SamanthaRose83',
 '/TechNation',
 '/UKBAngels',
 '/angelacademe',
 '/beisgovuk',
 '/bev_vincent',
 '/capenterprise',
 '/cgledhill',
 '/e_nation',
 '/iamstartacus',
 '/innobham',
 '/innovateuk',
 '/mialomo',
 '/naomitimperley',
 '/nesta_uk',
 '/paolacuneo',
 '/pitchatpalace',
 '/stbaasch',
 '/techUK',
 '/techcityinsider',
 '/techdotlondon',
 '/timothy_barnes',
 '/tradegovuk'}