# Instagram Media Downloader

This notebook provides an agentic way to download images and videos from Instagram posts using your authenticated session.

## Features
- Automated login to Instagram
- Navigate through posts
- Intercept network requests to capture media URLs
- Download images and videos

## Requirements
Install the required packages first:
```bash
pip install selenium requests pillow webdriver-manager
```

In [1]:
# Install required packages
!pip install selenium requests pillow webdriver-manager --quiet

In [2]:
import json
import os
import time
import requests
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse

from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager

## Configuration

In [3]:
# Configuration
DOWNLOAD_FOLDER = "instagram_downloads"
MOBILE_VIEW = True  # Toggle Chrome's device toolbar (Ctrl+Shift+M) to keep Instagram in mobile layout

# Create download folder
Path(DOWNLOAD_FOLDER).mkdir(exist_ok=True)

print(f"Downloads will be saved to: {os.path.abspath(DOWNLOAD_FOLDER)}")

Downloads will be saved to: d:\OneDrive - Emory\Schweidel\PerceptionMap\code\instagram_downloads


## Instagram Media Downloader Class

In [4]:
class InstagramMediaDownloader:
    def __init__(self, mobile_view=True, headless=False):
        self.download_folder = DOWNLOAD_FOLDER
        self.driver = None
        self.mobile_view = mobile_view
        self.headless = headless
        self._mobile_toolbar_enabled = False

    def setup_driver(self):
        """Setup Chrome driver with appropriate options"""
        chrome_options = Options()

        if self.headless:
            chrome_options.add_argument('--headless=new')

        chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])

        service = Service(ChromeDriverManager().install())
        self.driver = webdriver.Chrome(service=service, options=chrome_options)
        self.driver.implicitly_wait(10)
        self.driver.set_window_size(1200, 900)

        print("✓ Driver setup complete")

    def _drain_performance_logs(self):
        """Clear accumulated performance logs to avoid stale entries"""
        if not self.driver:
            return
        try:
            self.driver.get_log('performance')
        except Exception:
            pass

    def _ensure_mobile_toolbar(self):
        """Toggle Chrome's device toolbar to emulate mobile view"""
        if not self.mobile_view or self._mobile_toolbar_enabled or not self.driver:
            return
        try:
            body = self.driver.find_element(By.TAG_NAME, "body")
        except NoSuchElementException:
            return
        ActionChains(self.driver).move_to_element(body).click(body).perform()
        time.sleep(0.5)
        ActionChains(self.driver).send_keys(Keys.F12).perform()
        time.sleep(0.5)
        ActionChains(self.driver).key_down(Keys.CONTROL).key_down(Keys.SHIFT).send_keys('m').key_up(Keys.SHIFT).key_up(Keys.CONTROL).perform()
        time.sleep(0.5)
        ActionChains(self.driver).send_keys(Keys.F12).perform()
        time.sleep(0.5)
        self._mobile_toolbar_enabled = True

    def _click_if_present(self, locator, timeout=5):
        """Click an element if it becomes clickable within timeout"""
        try:
            element = WebDriverWait(self.driver, timeout).until(
                EC.element_to_be_clickable(locator)
            )
            element.click()
            time.sleep(1)
            return True
        except Exception:
            return False

    def login(self, username, password):
        """Login to Instagram"""
        print("Logging in to Instagram...")
        self.driver.get("https://www.instagram.com/accounts/login/")
        time.sleep(2)
        self._ensure_mobile_toolbar()
        wait = WebDriverWait(self.driver, 20)
        try:
            self._click_if_present(
                (By.XPATH, "//button[contains(text(), 'Only Allow Essential') or contains(text(), 'Allow All Cookies') or contains(text(), 'Accept')]"),
                timeout=8,
            )
            username_input = wait.until(EC.presence_of_element_located((By.NAME, "username")))
            password_input = wait.until(EC.presence_of_element_located((By.NAME, "password")))
            username_input.clear()
            password_input.clear()
            username_input.send_keys(username)
            password_input.send_keys(password)
            password_input.send_keys(Keys.RETURN)
            time.sleep(5)
            prompts = [
                (By.XPATH, "//button[contains(text(), 'Not Now')"]),
                (By.XPATH, "//button[contains(text(), 'Save Info')"]),
                (By.XPATH, "//button[contains(text(), 'Turn On')"]),
            ]
            for locator in prompts:
                self._click_if_present(locator, timeout=5)
            print("✓ Login successful")
            return True
        except TimeoutException as exc:
            print(f"✗ Login failed: {str(exc)}")
        except Exception as exc:
            print(f"✗ Login failed: {str(exc)}")
        return False

    def navigate_to_post(self, post_url):
        """Navigate to a specific Instagram post"""
        print(f"Navigating to post: {post_url}")
        self.driver.get(post_url)
        time.sleep(3)
        self._ensure_mobile_toolbar()
        print("✓ Post loaded")

    def _wait_for_media_to_render(self, timeout=10):
        """Wait for post media elements to appear"""
        try:
            WebDriverWait(self.driver, timeout).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "article img, article video"))
            )
        except TimeoutException:
            pass

    def _is_desired_media(self, url, mime_type, response):
        if not url or ('image' not in mime_type and 'video' not in mime_type):
            return False
        if 'cdninstagram.com' not in url and 'fbcdn.net' not in url:
            return False
        path = urlparse(url).path.lower()
        excluded_tokens = ('profilepic', 'sprite', 'favicon', 'glyph', 'badge', 'logo', 'emoji')
        if any(token in path for token in excluded_tokens):
            return False
        headers = response.get('headers', {})
        content_length = headers.get('content-length') or headers.get('Content-Length')
        if content_length and 'video' not in mime_type:
            try:
                if int(content_length) < 35000:
                    return False
            except ValueError:
                pass
        return True

    def extract_media_urls_from_logs(self):
        """Extract media URLs from browser performance logs"""
        if not self.driver:
            return []
        try:
            logs = self.driver.get_log('performance')
        except Exception:
            return []
        media_urls = []
        for log_entry in logs:
            try:
                message = json.loads(log_entry['message'])['message']
            except (KeyError, json.JSONDecodeError):
                continue
            if message.get('method') != 'Network.responseReceived':
                continue
            response = message.get('params', {}).get('response', {})
            url = response.get('url', '')
            mime_type = response.get('mimeType', '')
            if self._is_desired_media(url, mime_type, response):
                media_urls.append({
                    'url': url,
                    'type': 'video' if 'video' in mime_type else 'image'
                })
        return media_urls

    def _harvest_media_from_logs(self, collected, seen):
        for media in self.extract_media_urls_from_logs():
            if media['url'] in seen:
                continue
            seen.add(media['url'])
            collected.append(media)

    def _click_next_slide(self):
        selectors = [
            (By.CSS_SELECTOR, "button[aria-label='Next']"),
            (By.XPATH, "//button[@aria-label='Next']"),
            (By.XPATH, "//button//*[name()='svg' and @aria-label='Next']/.."),
        ]
        for locator in selectors:
            try:
                button = WebDriverWait(self.driver, 3).until(
                    EC.element_to_be_clickable(locator)
                )
                button.click()
                time.sleep(1.5)
                return True
            except Exception:
                continue
        return False

    def _collect_carousel_media(self):
        collected = []
        seen = set()
        self._wait_for_media_to_render()
        time.sleep(1.5)
        self._harvest_media_from_logs(collected, seen)
        while self._click_next_slide():
            self._wait_for_media_to_render()
            self._harvest_media_from_logs(collected, seen)
        return collected

    def download_media(self, media_list, post_id=None):
        """Download media files"""
        if not media_list:
            print("No media found to download")
            return []
        downloaded_files = []
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        for idx, media in enumerate(media_list):
            time.sleep(2)
            try:
                url = media['url']
                media_type = media['type']
                extension = 'mp4' if media_type == 'video' else 'jpg'
                if post_id:
                    filename = f"{post_id}_{idx+1}_{timestamp}.{extension}"
                else:
                    filename = f"media_{idx+1}_{timestamp}.{extension}"
                filepath = os.path.join(self.download_folder, filename)
                print(f"Downloading {media_type}: {filename}")
                response = requests.get(url, stream=True)
                response.raise_for_status()
                with open(filepath, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                downloaded_files.append(filepath)
                print(f"✓ Downloaded: {filename}")
            except Exception as exc:
                print(f"✗ Failed to download {media.get('type', 'media')}: {str(exc)}")
        return downloaded_files

    def download_post(self, post_url):
        """Download all media from a single post"""
        self.navigate_to_post(post_url)
        self._drain_performance_logs()
        self.driver.refresh()
        time.sleep(3)
        self._ensure_mobile_toolbar()
        self._wait_for_media_to_render()
        media_items = self._collect_carousel_media()
        post_id = post_url.rstrip('/').split('/')[-1]
        print(f"Found {len(media_items)} media file(s)")
        downloaded = self.download_media(media_items, post_id)
        return downloaded

    def download_multiple_posts(self, post_urls):
        """Download media from multiple posts"""
        all_downloads = []
        for index, url in enumerate(post_urls, 1):
            print(f"\n{'='*60}")
            print(f"Processing post {index}/{len(post_urls)}")
            print(f"{'='*60}")
            try:
                self._drain_performance_logs()
                downloaded = self.download_post(url)
                all_downloads.extend(downloaded)
                time.sleep(2)
            except Exception as exc:
                print(f"✗ Error processing post: {str(exc)}")
        return all_downloads

    def close(self):
        """Close the browser"""
        if self.driver:
            self.driver.quit()
            print("\n✓ Browser closed")

## Usage Example 1: Download from a Single Post

In [None]:
# Initialize the downloader
downloader = InstagramMediaDownloader(mobile_view=MOBILE_VIEW, headless=False)
downloader.setup_driver()

# Login (replace with your credentials)
USERNAME = "your_username"  # Replace with your Instagram username
PASSWORD = "your_password"  # Replace with your Instagram password

if downloader.login(USERNAME, PASSWORD):
    # Download from a single post (carousel requests are captured after each refresh/next click)
    POST_URL = "https://www.instagram.com/p/DOfLxX1j11j/?img_index=1" # "https://www.instagram.com/p/POST_ID/"  # Replace with actual post URL
    
    downloaded_files = downloader.download_post(POST_URL)
    
    print(f"\n{'='*60}")
    print(f"Download Complete!")
    print(f"{'='*60}")
    print(f"Total files downloaded: {len(downloaded_files)}")
    for file in downloaded_files:
        print(f"  - {file}")

# Close the browser
downloader.close()

✓ Driver setup complete
Logging in to Instagram...
✓ Login successful
Navigating to post: https://www.instagram.com/p/DOfLxX1j11j/?img_index=1
✓ Post loaded
Extracting media URLs from network logs...
Extracting media URLs from page source...
Found 42 media file(s)
Downloading image: ?img_index=1_1_20251026_164145.jpg
✓ Downloaded: ?img_index=1_1_20251026_164145.jpg
Downloading image: ?img_index=1_2_20251026_164145.jpg
✓ Downloaded: ?img_index=1_2_20251026_164145.jpg
Downloading image: ?img_index=1_3_20251026_164145.jpg
✓ Downloaded: ?img_index=1_3_20251026_164145.jpg
Downloading image: ?img_index=1_4_20251026_164145.jpg
✓ Downloaded: ?img_index=1_4_20251026_164145.jpg
Downloading image: ?img_index=1_5_20251026_164145.jpg
✓ Downloaded: ?img_index=1_5_20251026_164145.jpg
Downloading image: ?img_index=1_6_20251026_164145.jpg
✓ Downloaded: ?img_index=1_6_20251026_164145.jpg
Downloading image: ?img_index=1_7_20251026_164145.jpg
✓ Downloaded: ?img_index=1_7_20251026_164145.jpg
Downloading im

## Usage Example 2: Download from Multiple Posts

In [None]:
# Initialize the downloader
downloader = InstagramMediaDownloader(mobile_view=MOBILE_VIEW, headless=False)
downloader.setup_driver()

# Login
USERNAME = "your_username"
PASSWORD = "your_password"

if downloader.login(USERNAME, PASSWORD):
    # List of post URLs to download
    POST_URLS = [
        "https://www.instagram.com/p/POST_ID_1/",
        "https://www.instagram.com/p/POST_ID_2/",
        "https://www.instagram.com/p/POST_ID_3/",
    ]
    
    downloaded_files = downloader.download_multiple_posts(POST_URLS)
    
    print(f"\n{'='*60}")
    print(f"All Downloads Complete!")
    print(f"{'='*60}")
    print(f"Total files downloaded: {len(downloaded_files)}")

# Close the browser
downloader.close()

## Advanced: Interactive Mode

Use this to manually navigate and download media interactively.

In [None]:
# Interactive mode - browser stays open for manual navigation
downloader = InstagramMediaDownloader(mobile_view=MOBILE_VIEW, headless=False)
downloader.setup_driver()

# Login
USERNAME = "your_username"
PASSWORD = "your_password"

downloader.login(USERNAME, PASSWORD)

print("\nBrowser is open. Navigate to posts manually.")
print("When you're on a post you want to download, run the next cell.")

In [None]:
# Download media from current page
current_url = downloader.driver.current_url
print(f"Current URL: {current_url}")

if '/p/' in current_url or '/reel/' in current_url:
    downloaded = downloader.download_post(current_url)
    print(f"\nDownloaded {len(downloaded)} file(s)")
else:
    print("Please navigate to a post first!")

In [None]:
# Close browser when done
downloader.close()

## Tips and Notes

1. **Mobile View**: Mobile emulation is enabled by default as it often makes media extraction easier
2. **Network Logs**: The script captures network traffic to find media URLs automatically
3. **Rate Limiting**: Add delays between downloads to avoid being rate-limited by Instagram
4. **Login Sessions**: Your session is preserved during the browser lifetime
5. **Headless Mode**: Set `headless=True` if you don't need to see the browser
6. **File Naming**: Files are named with post ID and timestamp for easy organization

## Troubleshooting

- If login fails, check your credentials
- If media isn't found, try refreshing the page or using interactive mode
- Instagram may require 2FA - use interactive mode to handle this manually
- Some posts may have anti-scraping measures - this is normal

In [None]:
# ⚠️ EDUCATIONAL EXAMPLE ONLY
# This script collects visible <img> sources from the current page.
# Do NOT use this to scrape or download Instagram content without permission.

from playwright.sync_api import sync_playwright
import os
import requests

URL = "https://www.instagram.com/p/DOfLxX1j11j/"
SAVE_DIR = "downloads"

os.makedirs(SAVE_DIR, exist_ok=True)

with sync_playwright() as p:
    browser = p.chromium.launch(headless=False)
    page = browser.new_page()
    page.goto(URL, timeout=60000)

    # Wait for images to load
    page.wait_for_selector("img", timeout=10000)
    images = page.query_selector_all("img")

    print(f"Found {len(images)} images on {URL}")
    for i, img in enumerate(images, 1):
        src = img.get_attribute("src")
        if not src or "data:" in src:
            continue
        print(f"{i}: {src}")
        try:
            filename = os.path.join(SAVE_DIR, f"image_{i}.jpg")
            with open(filename, "wb") as f:
                f.write(requests.get(src).content)
        except Exception as e:
            print(f"Error saving {src}: {e}")

    browser.close()
