In [None]:
# Install dependencies (safe to run in Google Colab)
!pip install --quiet playwright nest_asyncio pandas
!playwright install-deps
!playwright install chromium


In [None]:
import asyncio
import random
from typing import Dict, List, Optional

import nest_asyncio
import pandas as pd
from playwright.async_api import async_playwright, Page

# Allow nested event loops in environments like Google Colab
nest_asyncio.apply()


def _parse_compact_count(raw: Optional[str]) -> int:
    """Convert counters like '1,234' or '2.5K' to integers."""
    if not raw:
        return 0
    text = raw.replace(',', '').strip()
    if not text:
        return 0
    multiplier = 1
    if text[-1] in {'K', 'M', 'B'}:
        suffix = text[-1]
        text = text[:-1]
        multiplier = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000}[suffix]
    try:
        return int(float(text) * multiplier)
    except ValueError:
        return 0


class TwitterPlaywrightScraper:
    """Scrape public tweets from multiple profiles using Playwright."""

    def __init__(self, usernames: List[str], headless: bool = True) -> None:
        self.usernames = usernames
        self.headless = headless
        self.user_agent = (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
        )
        self._playwright = None
        self._browser = None
        self._context = None
        self._page: Optional[Page] = None
        self.tweets: List[Dict] = []

    async def _start(self) -> None:
        if self._playwright:
            return
        self._playwright = await async_playwright().start()
        self._browser = await self._playwright.chromium.launch(
            headless=self.headless,
            args=[
                "--disable-blink-features=AutomationControlled",
                "--disable-infobars",
                "--no-sandbox",
            ],
        )
        self._context = await self._browser.new_context(
            user_agent=self.user_agent,
            viewport={"width": 1366, "height": 768},
        )
        self._page = await self._context.new_page()

    async def _stop(self) -> None:
        if self._context:
            await self._context.close()
        if self._browser:
            await self._browser.close()
        if self._playwright:
            await self._playwright.stop()
        self._context = self._browser = self._playwright = self._page = None

    async def _extract_tweet(self, tweet_element) -> Optional[Dict]:
        if not tweet_element:
            return None
        content_el = await tweet_element.query_selector('[data-testid="tweetText"]')
        text_content = await content_el.inner_text() if content_el else await tweet_element.inner_text()
        text_content = " ".join(text_content.split())

        time_el = await tweet_element.query_selector('time')
        timestamp = await time_el.get_attribute('datetime') if time_el else None
        permalink = None
        if time_el:
            permalink = await time_el.evaluate("el => el.closest('a')?.href || null")

        reply_el = await tweet_element.query_selector('[data-testid="reply"]')
        retweet_el = await tweet_element.query_selector('[data-testid="retweet"]')
        like_el = await tweet_element.query_selector('[data-testid="like"]')

        replies = _parse_compact_count(await reply_el.inner_text() if reply_el else "0")
        retweets = _parse_compact_count(await retweet_el.inner_text() if retweet_el else "0")
        likes = _parse_compact_count(await like_el.inner_text() if like_el else "0")

        return {
            "text": text_content,
            "date": timestamp,
            "replies": replies,
            "retweets": retweets,
            "likes": likes,
            "permalink": permalink,
        }

    async def scrape_profile(self, username: str, limit: int = 20) -> List[Dict]:
        await self._start()
        assert self._page
        page = self._page
        await page.goto(f"https://x.com/{username}", wait_until="networkidle")
        await page.wait_for_selector('[data-testid="tweet"]', timeout=20000)

        collected: List[Dict] = []
        seen_ids = set()

        while len(collected) < limit:
            tweets = await page.query_selector_all('[data-testid="tweet"]')
            for tweet in tweets:
                data = await self._extract_tweet(tweet)
                if not data:
                    continue
                status_id = None
                if data.get("permalink"):
                    status_id = data["permalink"].split('/')[-1].split('?')[0]
                if status_id and status_id in seen_ids:
                    continue
                if status_id:
                    seen_ids.add(status_id)
                data["username"] = username
                collected.append(data)
                if len(collected) >= limit:
                    break
            if len(collected) >= limit:
                break

            scroll_amount = random.randint(600, 1200)
            await page.evaluate("window.scrollBy(0, arguments[0]);", scroll_amount)
            await page.wait_for_timeout(random.randint(2000, 4000))
            try:
                await page.wait_for_load_state("networkidle", timeout=5000)
            except Exception:
                pass

        return collected[:limit]

    async def scrape_all(self, limit: int = 20) -> pd.DataFrame:
        await self._start()
        try:
            for user in self.usernames:
                profile_tweets = await self.scrape_profile(user, limit=limit)
                self.tweets.extend(profile_tweets)
        finally:
            await self._stop()
        return self.to_dataframe()

    def to_dataframe(self) -> pd.DataFrame:
        return pd.DataFrame(self.tweets)

    def save_to_csv(self, filename: str) -> None:
        df = self.to_dataframe()
        df.to_csv(filename, index=False)



In [None]:
# Example usage
usernames = ["SpaceX", "NASA"]
scraper = TwitterPlaywrightScraper(usernames=usernames, headless=True)

async def main():
    df = await scraper.scrape_all(limit=20)
    print(df.head())
    scraper.save_to_csv("tweets.csv")

# Use a running event loop if present (e.g., in Google Colab)
loop = asyncio.get_event_loop()
if loop.is_running():
    task = loop.create_task(main())
    await task
else:
    loop.run_until_complete(main())
