In [None]:
# settings.py

defaults = {
    "fetch_likes_plays": False,
    "fetch_likers": False,
    "fetch_comments": False,
    "fetch_mentions": False,
    "fetch_hashtags": False,
    "fetch_details": False
}


def apply_defaults(cls):
    for name, value in defaults.items():
        setattr(cls, name, value)
    return cls


@apply_defaults
class settings(object):
    pass


def override_settings(args):
    for name in defaults.keys():
        setattr(settings, name, getattr(args, name))


def prepare_override_settings(parser):
    for name in defaults.keys():
        parser.add_argument("--" + name, action="store_true")


In [None]:
from .crawler import InsCrawler

# browser.py
import os

from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.keys import Keys

from .utils import randmized_sleep


class Browser:
    def __init__(self, has_screen):
        dir_path = os.path.dirname(os.path.realpath(__file__))
        service_args = ["--ignore-ssl-errors=true"]
        chrome_options = Options()
        if not has_screen:
            chrome_options.add_argument("--headless")
        chrome_options.add_argument("--start-maximized")
        chrome_options.add_argument("--no-sandbox")
        self.driver = webdriver.Chrome(
            executable_path="%s/bin/chromedriver" % dir_path,
            service_args=service_args,
            chrome_options=chrome_options,
        )
        self.driver.implicitly_wait(5)

    @property
    def page_height(self):
        return self.driver.execute_script("return document.body.scrollHeight")

    def get(self, url):
        self.driver.get(url)

    @property
    def current_url(self):
        return self.driver.current_url

    def implicitly_wait(self, t):
        self.driver.implicitly_wait(t)

    def find_one(self, css_selector, elem=None, waittime=0):
        obj = elem or self.driver

        if waittime:
            WebDriverWait(obj, waittime).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, css_selector))
            )

        try:
            return obj.find_element(By.CSS_SELECTOR, css_selector)
        except NoSuchElementException:
            return None

    def find(self, css_selector, elem=None, waittime=0):
        obj = elem or self.driver

        try:
            if waittime:
                WebDriverWait(obj, waittime).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, css_selector))
                )
        except TimeoutException:
            return None

        try:
            return obj.find_elements(By.CSS_SELECTOR, css_selector)
        except NoSuchElementException:
            return None

    def scroll_down(self, wait=0.3):
        self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
        randmized_sleep(wait)

    def scroll_up(self, offset=-1, wait=2):
        if offset == -1:
            self.driver.execute_script("window.scrollTo(0, 0)")
        else:
            self.driver.execute_script("window.scrollBy(0, -%s)" % offset)
        randmized_sleep(wait)

    def js_click(self, elem):
        self.driver.execute_script("arguments[0].click();", elem)

    def open_new_tab(self, url):
        self.driver.execute_script("window.open('%s');" %url)
        self.driver.switch_to.window(self.driver.window_handles[1])

    def close_current_tab(self):
        self.driver.close()

        self.driver.switch_to.window(self.driver.window_handles[0])

    def __del__(self):
        try:
            self.driver.quit()
        except Exception:
            pass


In [None]:
# crawler.py
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import argparse
import json
import sys
from io import open

from inscrawler import InsCrawler
from inscrawler.settings import override_settings
from inscrawler.settings import prepare_override_settings


def usage():
    return """
        python crawler.py posts -u cal_foodie -n 100 -o ./output
        python crawler.py posts_full -u cal_foodie -n 100 -o ./output
        python crawler.py profile -u cal_foodie -o ./output
        python crawler.py profile_script -u cal_foodie -o ./output
        python crawler.py hashtag -t taiwan -o ./output

        The default number for fetching posts via hashtag is 100.
    """


def get_posts_by_user(username, number, detail, debug):
    ins_crawler = InsCrawler(has_screen=debug)
    return ins_crawler.get_user_posts(username, number, detail)


def get_profile(username):
    ins_crawler = InsCrawler()
    return ins_crawler.get_user_profile(username)


def get_profile_from_script(username):
    ins_cralwer = InsCrawler()
    return ins_cralwer.get_user_profile_from_script_shared_data(username)


def get_posts_by_hashtag(tag, number, debug):
    ins_crawler = InsCrawler(has_screen=debug)
    return ins_crawler.get_latest_posts_by_tag(tag, number)


def arg_required(args, fields=[]):
    for field in fields:
        if not getattr(args, field):
            parser.print_help()
            sys.exit()


def output(data, filepath):
    out = json.dumps(data, ensure_ascii=False)
    if filepath:
        with open(filepath, "w", encoding="utf8") as f:
            f.write(out)
    else:
        print(out)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Instagram Crawler", usage=usage())
    parser.add_argument(
        "mode", help="options: [posts, posts_full, profile, profile_script, hashtag]"
    )
    parser.add_argument("-n", "--number", type=int, help="number of returned posts")
    parser.add_argument("-u", "--username", help="instagram's username")
    parser.add_argument("-t", "--tag", help="instagram's tag name")
    parser.add_argument("-o", "--output", help="output file name(json format)")
    parser.add_argument("--debug", action="store_true")

    prepare_override_settings(parser)

    args = parser.parse_args()

    override_settings(args)

    if args.mode in ["posts", "posts_full"]:
        arg_required("username")
        output(
            get_posts_by_user(
                args.username, args.number, args.mode == "posts_full", args.debug
            ),
            args.output,
        )
    elif args.mode == "profile":
        arg_required("username")
        output(get_profile(args.username), args.output)
    elif args.mode == "profile_script":
        arg_required("username")
        output(get_profile_from_script(args.username), args.output)
    elif args.mode == "hashtag":
        arg_required("tag")
        output(
            get_posts_by_hashtag(args.tag, args.number or 100, args.debug), args.output
        )
    else:
        usage()
        
class RetryException(Exception):
    pass

In [None]:
# fetch.py

import re
from time import sleep

from .settings import settings


def get_parsed_mentions(raw_text):
    regex = re.compile(r"@([\w\.]+)")
    regex.findall(raw_text)
    return regex.findall(raw_text)


def get_parsed_hashtags(raw_text):
    regex = re.compile(r"#(\w+)")
    regex.findall(raw_text)
    return regex.findall(raw_text)


def fetch_mentions(raw_test, dict_obj):
    if not settings.fetch_mentions:
        return

    mentions = get_parsed_mentions(raw_test)
    if mentions:
        dict_obj["mentions"] = mentions

def fetch_hashtags(raw_test, dict_obj):
    if not settings.fetch_hashtags:
        return

    hashtags = get_parsed_hashtags(raw_test)
    if hashtags:
        dict_obj["hashtags"] = hashtags


def fetch_datetime(browser, dict_post):
    ele_datetime = browser.find_one(".eo2As .c-Yi7 ._1o9PC")
    datetime = ele_datetime.get_attribute("datetime")
    dict_post["datetime"] = datetime


def fetch_imgs(browser, dict_post):
    img_urls = set()
    while True:
        ele_imgs = browser.find("._97aPb img", waittime=10)

        if isinstance(ele_imgs, list):
            for ele_img in ele_imgs:
                img_urls.add(ele_img.get_attribute("src"))
        else:
            break

        next_photo_btn = browser.find_one("._6CZji .coreSpriteRightChevron")

        if next_photo_btn:
            next_photo_btn.click()
            sleep(0.3)
        else:
            break

    dict_post["img_urls"] = list(img_urls)

def fetch_likes_plays(browser, dict_post):
    if not settings.fetch_likes_plays:
        return

    likes = None
    el_likes = browser.find_one(".Nm9Fw > * > span")
    el_see_likes = browser.find_one(".vcOH2")

    if el_see_likes is not None:
        el_plays = browser.find_one(".vcOH2 > span")
        dict_post["views"] = int(el_plays.text.replace(",", "").replace(".", ""))
        el_see_likes.click()
        el_likes = browser.find_one(".vJRqr > span")
        likes = el_likes.text
        browser.find_one(".QhbhU").click()

    elif el_likes is not None:
        likes = el_likes.text

    dict_post["likes"] = (
        int(likes.replace(",", "").replace(".", "")) if likes is not None else 0
    )


def fetch_likers(browser, dict_post):
    if not settings.fetch_likers:
        return
    like_info_btn = browser.find_one(".EDfFK ._0mzm-.sqdOP")
    like_info_btn.click()

    likers = {}
    liker_elems_css_selector = ".Igw0E ._7UhW9.xLCgt a"
    likers_elems = list(browser.find(liker_elems_css_selector))
    last_liker = None
    while likers_elems:
        for ele in likers_elems:
            likers[ele.get_attribute("href")] = ele.get_attribute("title")

        if last_liker == likers_elems[-1]:
            break

        last_liker = likers_elems[-1]
        last_liker.location_once_scrolled_into_view
        sleep(0.6)
        likers_elems = list(browser.find(liker_elems_css_selector))

    dict_post["likers"] = list(likers.values())
    close_btn = browser.find_one(".WaOAr button")
    close_btn.click()


def fetch_caption(browser, dict_post):
    ele_comments = browser.find(".eo2As .gElp9")

    if len(ele_comments) > 0:

        temp_element = browser.find("span",ele_comments[0])

        for element in temp_element:

            if element.text not in ['Verified',''] and 'caption' not in dict_post:
                dict_post["caption"] = element.text

        fetch_mentions(dict_post.get("caption",""), dict_post)
        fetch_hashtags(dict_post.get("caption",""), dict_post)


def fetch_comments(browser, dict_post):
    if not settings.fetch_comments:
        return

    show_more_selector = "button .glyphsSpriteCircle_add__outline__24__grey_9"
    show_more = browser.find_one(show_more_selector)
    while show_more:
        show_more.location_once_scrolled_into_view
        show_more.click()
        sleep(0.3)
        show_more = browser.find_one(show_more_selector)

    show_comment_btns = browser.find(".EizgU")
    for show_comment_btn in show_comment_btns:
        show_comment_btn.location_once_scrolled_into_view
        show_comment_btn.click()
        sleep(0.3)

    ele_comments = browser.find(".eo2As .gElp9")
    comments = []
    for els_comment in ele_comments[1:]:
        author = browser.find_one(".FPmhX", els_comment).text

        temp_element = browser.find("span", els_comment)

        for element in temp_element:

            if element.text not in ['Verified','']:
                comment = element.text

        comment_obj = {"author": author, "comment": comment}

        fetch_mentions(comment, comment_obj)
        fetch_hashtags(comment, comment_obj)

        comments.append(comment_obj)

    if comments:
        dict_post["comments"] = comments


def fetch_initial_comment(browser, dict_post):
    comments_elem = browser.find_one("ul.XQXOT")
    first_post_elem = browser.find_one(".ZyFrc", comments_elem)
    caption = browser.find_one("span", first_post_elem)

    if caption:
        dict_post["description"] = caption.text


def fetch_details(browser, dict_post):
    if not settings.fetch_details:
        return

    browser.open_new_tab(dict_post["key"])

    username = browser.find_one("a.ZIAjV")
    location = browser.find_one("a.O4GlU")

    if username:
        dict_post["username"] = username.text
    if location:
        dict_post["location"] = location.text

    fetch_initial_comment(browser, dict_post)

    browser.close_current_tab()


In [None]:
# liker.py

import argparse

from inscrawler import InsCrawler


def usage():
    return """
        python crawler.py [tag]
    """


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Instagram Liker", usage=usage())
    parser.add_argument("hashtag", help="hashtag name")
    parser.add_argument(
        "-n", "--number", type=int, default=1000, help="number of posts to like"
    )
    args = parser.parse_args()
    ins_crawler = InsCrawler(has_screen=True)
    ins_crawler.auto_like(tag=args.hashtag, maximum=args.number)


In [None]:
# utils.py

import random
from functools import wraps
from time import sleep

from .exceptions import RetryException


def instagram_int(string):
    return int(string.replace(",", ""))


def retry(attempt=10, wait=0.3):
    def wrap(func):
        @wraps(func)
        def wrapped_f(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except RetryException:
                if attempt > 1:
                    sleep(wait)
                    return retry(attempt - 1, wait)(func)(*args, **kwargs)
                else:
                    exc = RetryException()
                    exc.__cause__ = None
                    raise exc

        return wrapped_f

    return wrap


def randmized_sleep(average=1):
    _min, _max = average * 1 / 2, average * 3 / 2
    sleep(random.uniform(_min, _max))


def validate_posts(dict_posts):
    """
        The validator is to verify if the posts are fetched wrong.
        Ex. the content got messed up or duplicated.
    """
    posts = dict_posts.values()
    contents = [post["datetime"] for post in posts]
    # assert len(set(contents)) == len(contents)
    if len(set(contents)) == len(contents):
        print("These post data should be correct.")
