# Instagram

In [1]:
import json
import logging
import time
from typing import Optional

from bs4 import BeautifulSoup
from selenium.webdriver import Chrome
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement

In [2]:
webdriver = Chrome()
webdriver.get("https://www.instagram.com/")

# need login first, so wait for user to login
# time.sleep(60)

In [3]:
class CustomFormatter(logging.Formatter):
    log_format = "[%(asctime)s] [%(levelname)s] %(message)s"

    def __init__(self):
        super().__init__(self.log_format, datefmt="%Y-%m-%d %H:%M:%S")


# remove any existing handlers to prevent double logging
if logging.getLogger().hasHandlers():
    logging.getLogger().handlers.clear()

handler = logging.StreamHandler()
handler.setFormatter(CustomFormatter())

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)


def log_func(func):
    def wrapper(*args, **kwargs):
        logger.info(f"[{func.__name__}]")
        return func(*args, **kwargs)

    return wrapper


logger.info("Logger setup complete")

[2024-10-14 02:27:12] [INFO] Logger setup complete


In [4]:
@log_func
def show_first_post(url: str):
    try:
        webdriver.get(url)
        time.sleep(2)
        soup = BeautifulSoup(webdriver.page_source, "html.parser")
        divs = soup.find_all(
            "div",
            class_="x1lliihq x1n2onr6 xh8yej3 x4gyw5p xfllauq xo2y696 x11i5rnm x2pgyrj",
        )
        list_urls = []

        for div in divs:
            a_tag = div.find("a", recursive=False)
            if a_tag and "href" in a_tag.attrs:
                list_urls.append(a_tag["href"])

        element = webdriver.find_element(By.XPATH, f'//a[@href="{list_urls[0]}"]')
        element.click()
    except Exception as e:
        logger.error(str(e).split("\n")[0])

In [5]:
@log_func
def get_caption() -> Optional[str]:
    try:
        soup = BeautifulSoup(webdriver.page_source, "html.parser")
        divs = soup.find_all("div", class_="_a9zs")
        for div in divs:
            h1_tag = div.find(
                "h1", class_="_ap3a _aaco _aacu _aacx _aad7 _aade", recursive=False
            )
            for br in h1_tag.find_all("br"):
                br.replace_with("\n")
            if h1_tag:
                return h1_tag.text
        return None
    except Exception as e:
        logger.error(str(e).split("\n")[0])
        return None

In [6]:
def _get_element_xpath(element: WebElement) -> Optional[str]:
    try:
        full_xpath = webdriver.execute_script(
            """
            function getElementXPath(element) {
                if (element.id !== '') {
                    return 'id("' + element.id + '")';
                }
                if (element === document.body) {
                    return element.tagName.toLowerCase();
                }

                let ix = 0;
                const siblings = element.parentNode.childNodes;
                let sameTagSiblings = 0;

                for (let i = 0; i < siblings.length; i++) {
                    if (siblings[i].nodeType === 1 && siblings[i].tagName === element.tagName) {
                        sameTagSiblings++;
                    }
                }
                
                for (let i = 0; i < siblings.length; i++) {
                    const sibling = siblings[i];
                    if (sibling === element) {
                        let text = "";
                        
                        if (sameTagSiblings > 1) {
                            text = '[' + (ix + 1) + ']';
                        }
                        
                        return getElementXPath(element.parentNode) + '/' + element.tagName.toLowerCase() + text;
                    }

                    if (sibling.nodeType === 1 && sibling.tagName === element.tagName) {
                        ix++;
                    }
                }
            }
            return getElementXPath(arguments[0]);

        """,
            element,
        )
        result = f"/html/{full_xpath}"
        return result
    except Exception as e:
        logger.error(str(e).split("\n")[0])
        return None

In [7]:
@log_func
def load_more_comments():
    try:
        title = webdriver.find_element(
            By.XPATH, "//*[contains(text(), 'Load more comments')]"
        )
        if title:
            is_found = True
            while is_found:
                try:
                    title = webdriver.find_element(
                        By.XPATH, "//*[contains(text(), 'Load more comments')]"
                    )
                    title_xpath = _get_element_xpath(title)
                    button_xpath = title_xpath[
                        : title_xpath.rfind("button") + len("button")
                    ]
                    try:
                        button_element = webdriver.find_element(By.XPATH, button_xpath)
                        button_element.click()
                    except Exception as e:
                        logger.error(str(e).split("\n")[0])
                        is_found = False
                except Exception as e:
                    logger.error(str(e).split("\n")[0])
                    is_found = False
    except Exception as e:
        logger.error(str(e).split("\n")[0])

In [8]:
@log_func
def show_replies():
    try:

        button = webdriver.find_elements(
            "xpath", "//button[contains(@class, '_acan _acao _acas _aj1- _ap30')]"
        )
        result_button = [
            b
            for b in button
            if (b.text.startswith("View replies") or b.text.startswith("View all"))
        ]
        total_button = len(result_button)
        if total_button > 0:
            for b in result_button:
                b.click()
            logger.info(f"Total button clicked: {total_button}")
        else:
            logger.warning("No replies found")
    except Exception as e:
        logger.error(str(e).split("\n")[0])

In [9]:
@log_func
def get_comments() -> list[str]:
    try:
        soup = BeautifulSoup(webdriver.page_source, "html.parser")
        comments = soup.find_all("div", class_="_a9zs")
        result = []
        for div in comments:
            span_tag = div.find(
                "span", class_="_ap3a _aaco _aacu _aacx _aad7 _aade", recursive=False
            )
            if span_tag:
                result.append(span_tag.text)
        logger.info(f"Total comments found: {len(result)}")
        return result
    except Exception as e:
        logger.error(str(e).split("\n")[0])
        return list()

In [10]:
@log_func
def next_post():
    try:
        button = webdriver.find_element(
            By.XPATH,
            f'//span[@style="display: inline-block; transform: rotate(90deg);"]',
        )
        button.click()
    except Exception as e:
        logger.error(str(e).split("\n")[0])


@log_func
def has_next_post() -> bool:
    try:
        webdriver.find_element(
            By.XPATH,
            f'//span[@style="display: inline-block; transform: rotate(90deg);"]',
        )
        return True
    except Exception as e:
        logger.error(str(e).split("\n")[0])
        return False

In [11]:
from typing import TypedDict, List, Dict


class PostResult(TypedDict):
    caption: str
    comments: List[str]


class Dataset(TypedDict):
    data: Dict[str, PostResult]

In [12]:
@log_func
def _get_single_post_data() -> PostResult:
    load_more_comments()
    show_replies()
    caption = get_caption()
    comments = get_comments()
    return PostResult(caption=caption, comments=comments)

In [13]:
def scraping_instagram(username: str, post: Optional[int] = -1) -> Dataset:
    try:
        if post == 0:
            return

        result = Dataset(data={})
        url = f"https://www.instagram.com/{username}/"
        show_first_post(url)

        # get data
        post_data = _get_single_post_data()
        # 'https://www.instagram.com/p/:POST_ID/?img_index=1'
        post_id = webdriver.current_url.split("/")[4]
        result.get("data").update({post_id: post_data})

        if post == -1:
            while has_next_post():
                next_post()
                time.sleep(2)
                post_data = _get_single_post_data()
                post_id = webdriver.current_url.split("/")[4]
                result.get("data").update({post_id: post_data})
        else:
            while post and has_next_post():
                next_post()
                post -= 1
                time.sleep(2)
                post_data = _get_single_post_data()
                post_id = webdriver.current_url.split("/")[4]
                result.get("data").update({post_id: post_data})
            if post:
                logger.warning("Total post less than expected")

        # stats
        total_post = len(result.get("data"))
        total_comments = sum(
            len(post.get("comments")) for post in result.get("data").values()
        )
        logger.info(f"Total post scraped: {total_post}")
        logger.info(f"Total comments scraped: {total_comments}")

        return result
    except Exception as e:
        logger.error(str(e).split("\n")[0])
        return Dataset(data={})

In [14]:
dataset = scraping_instagram("putu_waw")

[2024-10-14 02:27:19] [INFO] [show_first_post]
[2024-10-14 02:27:23] [INFO] [_get_single_post_data]
[2024-10-14 02:27:23] [INFO] [load_more_comments]
[2024-10-14 02:27:23] [ERROR] Message: no such element: Unable to locate element: {"method":"xpath","selector":"//*[contains(text(), 'Load more comments')]"}
[2024-10-14 02:27:23] [INFO] [show_replies]
[2024-10-14 02:27:23] [INFO] [get_caption]
[2024-10-14 02:27:23] [INFO] [get_comments]
[2024-10-14 02:27:23] [INFO] Total comments found: 0
[2024-10-14 02:27:23] [INFO] [has_next_post]
[2024-10-14 02:27:24] [INFO] [next_post]
[2024-10-14 02:27:26] [INFO] [_get_single_post_data]
[2024-10-14 02:27:26] [INFO] [load_more_comments]
[2024-10-14 02:27:26] [ERROR] Message: no such element: Unable to locate element: {"method":"xpath","selector":"//*[contains(text(), 'Load more comments')]"}
[2024-10-14 02:27:26] [INFO] [show_replies]
[2024-10-14 02:27:26] [INFO] Total button clicked: 1
[2024-10-14 02:27:26] [INFO] [get_caption]
[2024-10-14 02:27:26]

In [15]:
dataset

{'data': {'C3W-SslrQpp': {'caption': 'Halo seluruh mahasiswa Indonesia. Saya siap mengikuti Magang dan Studi Independen Bersertifikat Angkatan 6!',
   'comments': []},
  'Cv4iiXPLDl6': {'caption': 'Halo! Saya Putu Widyantara Artanta Wibawa dari Universitas Udayana siap mengikuti National Onboarding MSIB Angkatan 5!\n\n#BerprosesLebihBaik #KampusMerdeka #MSIB5 #MagangMerdeka #MagangBersertifikat #BukanMagangdanStudiBiasa #MSIB5',
   'comments': ['Mangaaat', '🔥', 'Great My son😍', 'Semangat frenn🔥']},
  'CZqhWPWlNYN': {'caption': '[SAYA SIAP MENGIKUTI MAHASISYA UPANAYANA XIX]\n\nOm Swastyastu 🙏\n"Om Ano Bhadrah Kratavo Yantu Visvatah" - (Yajur Veda XXV. 14)\n(Semoga pikiran yang baik datang dari segala penjuru)\n\nMahasisya Upanayana merupakan upacara penyucian diri dengan tujuan memohon doa restu secara niskala tatkala seorang mahasiswa akan menuntut ilmu dan berguru di Universitas Udayana.\n\nSaya Putu Widyantara Artanta Wibawa, Siap mengikuti Mahasisya Upanayana XIX tahun 2022. \n\n"Ta

In [16]:
def get_existing_dataset(file_path: str) -> Dataset:
    try:
        with open(file_path, "r") as f:
            data = json.load(f)
            return Dataset(**data)
    except Exception as e:
        logger.error(str(e))

In [17]:
# existing_dataset = get_existing_dataset("dataset.json")

In [18]:
# new_dataset = existing_dataset.copy()
# new_dataset.get("data").update(dataset.get("data"))

In [19]:
with open("dataset.json", "w") as f:
    json.dump(dataset, f)