### Grab Tweets from User



In [2]:
import pandas as pd
from ntscraper import Nitter

scraper = Nitter()

Testing instances: 100%|██████████| 9/9 [00:04<00:00,  2.23it/s]


In [3]:
# Function to scrape tweets from a specific user's timeline
def get_user_tweets(username, tweet_count, from_date, to_date):
    try:
        tweets = scraper.get_tweets(username, number=tweet_count, mode='user', since=from_date, until=to_date)
        return tweets
    except Exception as e:
        print("Can't fetch tweets:", e)
        return None

# Function to process the tweet dictionary and save it to a CSV file
def save_tweets_to_csv(tweets, filename='data.csv'):
    if tweets is None:
        print("No tweets to save.")
        return

    dummy_arr = []
    for tweet in tweets['tweets']:
        data = [
            tweet.get('link'),
            tweet.get('text'),
            tweet.get('user', {}).get('name'),
            tweet.get('user', {}).get('username'),
            tweet.get('date'),
            tweet.get('stats', {}).get('likes'),
            tweet.get('stats', {}).get('retweets'),
            tweet.get('stats', {}).get('comments')
        ]
        dummy_arr.append(data)

    df = pd.DataFrame(dummy_arr, columns=['link', 'text', 'to_name', 'to_username', 'date', 'likes', 'retweets', 'comments'])
    df.to_csv(filename, index=False)
    print("Completed... 100%")

In [4]:
# Take user inputs
username = input("Username: ")
tweet_count = int(input("No. of tweets: "))
from_date = input("From date (YYYY-MM-DD): ")
to_date = input("To date (YYYY-MM-DD): ")

tweets = get_user_tweets(username, tweet_count, from_date, to_date)
save_tweets_to_csv(tweets)

13-Feb-25 22:34:36 - No instance specified, using random instance https://nitter.privacydev.net
13-Feb-25 22:34:41 - Current stats for nyushanghai: 10 tweets, 0 threads...
Completed... 100%


### Download Media Files

In [5]:
!node -v
!npm -v

v23.2.0
10.9.0
[1G[0K

In [6]:
!npm install happy-dl

Support for loading ES Module in require() is an experimental feature and might change at any time
[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K
added 31 packages in 2s
[1G[0K⠏[1G[0K
[1G[0K⠏[1G[0K16 packages are looking for funding
[1G[0K⠏[1G[0K  run `npm fund` for details
[1G[0K⠏[1G[0K[1mnpm[22m [96mnotice[39m
[1mnpm[22m [96mnotice[39m New [31mmajor[39m version of npm available! [31m10.9.0[39m -> [34m11.1.0[39m
[1mnpm[22m [96mnotice[39m Changelog: [34mhttps://github.com/npm/cli/releases/tag/v11.1.0[39m
[1mnpm[22m [96mnotice[39m To update run: [4mnpm install -g npm@11.1.0[24m
[1mnpm[22m [96mnotice[39m
[1G[0K⠏[1G[0K

In [7]:
%%bash
cat > test.js <<'EOF'
const happyDL = require("happy-dl");

async function fetchTwitterData(url) {
  try {
    const result = await happyDL.twitterDownloader(url);
    // Ensure we output valid JSON even if result is undefined.
    if (result === undefined) {
      console.log(JSON.stringify({ error: "No media found or result undefined" }));
    } else {
      console.log(JSON.stringify(result));
    }
  } catch (error) {
    // Print the error to stderr and output a JSON object with the error message.
    console.error("Error fetching Twitter media details:", error);
    console.log(JSON.stringify({ error: error.message }));
  }
}

const twitterUrl = process.argv[2];
if (!twitterUrl) {
  console.error("No twitter URL provided");
  console.log(JSON.stringify({ error: "No twitter URL provided" }));
  process.exit(1);
}

fetchTwitterData(twitterUrl);
EOF

In [8]:
import pandas as pd
import subprocess
import json

# Load your original CSV file (must contain a column named "link")
df = pd.read_csv('data.csv')

if 'link' not in df.columns:
    raise ValueError("The CSV file must contain a 'link' column.")

# Prepare lists to hold our new column values
media_types_all = []
media_files_all = []

# Process each Twitter URL in the CSV
for idx, row in df.iterrows():
    twitter_link = row['link']
    print(f"Processing: {twitter_link}")
    try:
        # Run the Node.js script for the current Twitter link
        completed = subprocess.run(
            ['node', 'test.js', twitter_link],
            capture_output=True,
            text=True,
            check=True
        )
        if completed.stderr:
            print("Stderr:", completed.stderr)

        output = completed.stdout.strip()
        if output:
            try:
                data = json.loads(output)
                # If an error occurred in test.js, skip this row
                if "error" in data:
                    print(f"Error in result for {twitter_link}: {data['error']}")
                    media_types_all.append(json.dumps([]))
                    media_files_all.append(json.dumps([]))
                    continue

                # If data is not a list, check if it is a dict with a "results" key that is a list.
                if not isinstance(data, list):
                    if isinstance(data, dict) and "results" in data and isinstance(data["results"], list):
                        data = data["results"]
                    else:
                        print(f"Unexpected data type for {twitter_link}")
                        media_types_all.append(json.dumps([]))
                        media_files_all.append(json.dumps([]))
                        continue

                current_media_types = []
                current_media_files = []
                for media in data:
                    mtype = media.get("type")
                    variants = media.get("variants", [])
                    if not variants:
                        continue
                    if mtype == "photo":
                        # For photos, we use the first variant URL
                        url = variants[0].get("url")
                        if url:
                            current_media_types.append("image")
                            current_media_files.append(url)
                    elif mtype == "video":
                        # For videos, choose the variant with quality "1280p" if available; otherwise, use the last variant
                        chosen_variant = None
                        for variant in variants:
                            if variant.get("quality") == "1280p":
                                chosen_variant = variant
                                break
                        if not chosen_variant:
                            chosen_variant = variants[-1]
                        url = chosen_variant.get("url")
                        if url:
                            current_media_types.append("video")
                            current_media_files.append(url)
                    else:
                        print(f"Unrecognized media type {mtype} for {twitter_link}. Defaulting to 'text'.")
                        # Default to "text" regardless; use the first variant's URL if available, else an empty string.
                        url = variants[0].get("url") if (variants and variants[0].get("url")) else ""
                        current_media_types.append("text")
                        current_media_files.append(url)

                print(f"Result for {twitter_link}: types: {current_media_types}, urls: {current_media_files}")
                # Save the results as JSON strings so we can easily parse them later
                media_types_all.append(json.dumps(current_media_types))
                media_files_all.append(json.dumps(current_media_files))
            except json.JSONDecodeError as e:
                print("JSON decode error:", e, "Output was:", output)
                media_types_all.append(json.dumps([]))
                media_files_all.append(json.dumps([]))
    except subprocess.CalledProcessError as e:
        print("Subprocess error:", e)
        media_types_all.append(json.dumps([]))
        media_files_all.append(json.dumps([]))

# Add the new columns to the DataFrame
df["media type"] = media_types_all
df["media file"] = media_files_all

# Save the updated DataFrame back to data.csv (or choose a different filename)
df.to_csv("data.csv", index=False)

Processing: https://twitter.com/nyushanghai/status/1874031782359949539#m
Result for https://twitter.com/nyushanghai/status/1874031782359949539#m: types: ['video'], urls: ['https://video.twimg.com/ext_tw_video/1874030891418464256/pu/vid/avc1/720x1280/9sjw-W3Sb6Xrxchu.mp4?tag=12']
Processing: https://twitter.com/nyushanghai/status/1869898005895131398#m
Unexpected data type for https://twitter.com/nyushanghai/status/1869898005895131398#m
Processing: https://twitter.com/nyushanghai/status/1868852614563266670#m
Result for https://twitter.com/nyushanghai/status/1868852614563266670#m: types: ['image', 'image', 'image'], urls: ['https://pbs.twimg.com/media/Ge9_F6xbwAAmLcy.jpg', 'https://pbs.twimg.com/media/Ge9_HlfbwAAJaM7.jpg', 'https://pbs.twimg.com/media/Ge9_H3SasAA_LFF.jpg']
Processing: https://twitter.com/nyushanghai/status/1868808330791035310#m
Result for https://twitter.com/nyushanghai/status/1868808330791035310#m: types: ['image', 'image', 'image', 'image'], urls: ['https://pbs.twimg.co

In [9]:
import os
import pandas as pd
import json
import requests
from urllib.parse import urlparse

# Create directories for downloads if they don't exist
os.makedirs("images", exist_ok=True)
os.makedirs("videos", exist_ok=True)

# Read the updated data.csv
df = pd.read_csv("data.csv")

downloaded_images = 0
downloaded_videos = 0

# Process each row in the CSV
for idx, row in df.iterrows():
    try:
        media_types = json.loads(row["media type"])
        media_files = json.loads(row["media file"])
    except Exception as e:
        print(f"Error parsing JSON in row {idx}: {e}")
        continue

    if not isinstance(media_types, list) or not isinstance(media_files, list):
        print(f"Row {idx} media columns are not lists.")
        continue

    # For each media file in this row
    for j, url in enumerate(media_files):
        # Ensure we have a corresponding media type
        if j >= len(media_types):
            continue
        mtype = media_types[j]
        if not url:
            continue
        try:
            response = requests.get(url, stream=True)
            response.raise_for_status()
            parsed = urlparse(url)
            ext = os.path.splitext(parsed.path)[1]
            if mtype == "image":
                if not ext:
                    ext = ".jpg"
                filename = f"images/image_{idx}_{j}{ext}"
            elif mtype == "video":
                if not ext:
                    ext = ".mp4"
                filename = f"videos/video_{idx}_{j}{ext}"
            else:
                print(f"Row {idx}, item {j}: Unrecognized media type: {mtype}")
                continue
            with open(filename, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            print(f"Downloaded {mtype}: {filename}")
            if mtype == "image":
                downloaded_images += 1
            elif mtype == "video":
                downloaded_videos += 1
        except Exception as e:
            print(f"Error downloading from {url}: {e}")

print(f"Downloaded {downloaded_images} images and {downloaded_videos} videos.")

Downloaded video: videos/video_0_0.mp4
Downloaded image: images/image_2_0.jpg
Downloaded image: images/image_2_1.jpg
Downloaded image: images/image_2_2.jpg
Downloaded image: images/image_3_0.jpg
Downloaded image: images/image_3_1.jpg
Downloaded image: images/image_3_2.jpg
Downloaded image: images/image_3_3.jpg
Downloaded image: images/image_4_0.jpg
Downloaded image: images/image_5_0.jpg
Downloaded image: images/image_6_0.jpg
Downloaded image: images/image_6_1.jpg
Downloaded image: images/image_6_2.jpg
Downloaded image: images/image_6_3.jpg
Downloaded image: images/image_7_0.jpg
Downloaded image: images/image_8_0.jpg
Downloaded image: images/image_8_1.jpg
Downloaded image: images/image_9_0.jpg
Downloaded 17 images and 1 videos.
