In [None]:
import os
import time
import requests
import pandas as pd
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import numpy as np

# Configure logging to log to both console and file
LOG_FILE = "scraping_pipeline.log"
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),  # Log to console
        logging.FileHandler(LOG_FILE, mode="w", encoding="utf-8"),  # Log to file
    ],
)

#Constants
# Get the MAL_CLIENT_ID from myanimelist.net 
# Ensure the MAL_CLIENT_ID environment variable is set
# Get the MAL_CLIENT_ID from myanimelist.net 

CLIENT_ID = os.getenv("MAL_CLIENT_ID")
if not CLIENT_ID:
    raise ValueError("MAL_CLIENT_ID environment variable not set!")
BASE_URL = 'https://api.myanimelist.net/v2'
HEADERS = {'X-MAL-CLIENT-ID': CLIENT_ID}
BASE_PATH = ''
CLUBS_FILE = os.path.join(BASE_PATH, 'clubs.txt')
USERS_FILE = os.path.join(BASE_PATH, 'users.csv')
ANIME_LIST_FILE = os.path.join(BASE_PATH, 'animelist.csv')
ANIME_DETAILS_FILE = os.path.join(BASE_PATH, 'anime_details.csv')
FAILED_ANIMES = os.path.join(BASE_PATH, 'failed_animes.csv')

# Ensure data directory exists if needed 
# os.makedirs(BASE_PATH, exist_ok=True)

In [None]:
def fetch_data(url, params=None, retries=3):
    """Fetch data from a URL with retries for transient errors."""
    for attempt in range(retries):
        try:
            time.sleep(1)
            logging.debug(f"Fetching data from {url} (Attempt {attempt + 1}/{retries})")
            response = requests.get(url, headers=HEADERS, params=params, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as http_err:
            if response.status_code == 500:  # Retry for server errors
                logging.warning(f"500 Server Error on attempt {attempt + 1}/{retries}: {http_err}")
                time.sleep(60)  # Wait before retrying
            elif response.status_code == 404:  # Skip 404 errors
                logging.error(f"404 Not Found: {url}")
            elif response.status_code == 403: # Skip 403 errors
                logging.warning(f"403 Forbidden: Access denied for {url}. Skipping retries.")
                return None
            else:
                logging.error(f"HTTP error occurred: {http_err}")
                break
        except requests.exceptions.RequestException as req_err:
            logging.error(f"Request error occurred: {req_err}")
            time.sleep(60)
    logging.error(f"Failed to fetch data after {retries} retries: {url}")
    return None

In [None]:
def scrape_club_ids():
    """Scrape club IDs from MyAnimeList."""
    clubs = set()
    page = 1
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36"
    }
    logging.info("Starting to scrape club IDs...")
    
    while len(clubs) < 300:  # Adjust threshold as needed
        logging.info(f"Scraping club page {page}...")
        url = f"https://myanimelist.net/clubs.php?sort=5&p={page}"
        response = requests.get(url, headers=headers)
        soup = BeautifulSoup(response.text, "html.parser")
        rows = soup.find_all("tr", {"class": "table-data"})

        for row in rows:
            club_id = int(row.find("a", {"class": "fw-b"})["href"].split("=")[-1])
            clubs.add(club_id)
        page += 1

    with open(CLUBS_FILE, "w") as file:
        for club in clubs:
            file.write(f"{club}\n")
    logging.info(f"Scraped {len(clubs)} clubs. Saved to {CLUBS_FILE}")

In [None]:
def fetch_username_from_club(club_id):
    page = 1
    users = []
    url = f"https://api.jikan.moe/v4/clubs/{club_id}/members"
    while True:
        logging.info(f"Scraping members of club {club_id} page : {page}...")
        params = {'page' : page}
        data = fetch_data(url=url, params=params)
        if data and "data" in data:
            users.extend(user["username"] for user in data["data"])
        if data and "pagination" in data and data['pagination']['has_next_page'] == True:
                page += 1  
        else:
            break
    logging.info(f"Finished scraping members of club {club_id}...")
    return users


def scrape_usernames_from_clubs():
    """Scrape usernames of members from clubs using threading."""
    logging.info("Starting to scrape usernames from clubs...")
    all_users = []
    with open(CLUBS_FILE) as file:
        club_ids = [line.strip() for line in file]

    for club_id in club_ids:
        all_users.extend(fetch_username_from_club(club_id))
    users = set(all_users)

    with open(USERS_FILE, "w") as file:
        file.write("user_id,username\n")
        for user_id, username in enumerate(users):
            file.write(f"{user_id},{username}\n")
    logging.info(f"Scraped {len(users)} usernames. Saved to {USERS_FILE}")

In [None]:
def fetch_user_list(username):
    """Fetch a single user's anime list, handling pagination."""
    anime_list = []
    offset = 0
    url = f"{BASE_URL}/users/{username}/animelist"
    logging.info(f'Scraping animelist of user {username}...')
    # Uncomment the following lines if you want to handle pagination for users with bigger than 1000 entries in list
    # while True:
    #     params = {'fields': 'list_status', 'offset': offset, 'limit': '1000', 'sort': 'list_score'}
        
    #     data = fetch_data(url, params)

    #     # Break if no data (e.g., 403 or empty response)
    #     if not data or "data" not in data:
    #         break

    #     # Process anime list entries
    #     for anime in data["data"]:
    #         list_status = anime.get("list_status", {})
    #         anime_list.append([
    #             username,
    #             anime["node"]["id"],
    #             list_status.get("status", "unknown"),
    #             list_status.get("score", 0),
    #             list_status.get("num_episodes_watched", 0)
    #         ])

        # Check for additional pages
        # if "paging" in data and "next" in data["paging"]:
        #     offset += 1000  
        # else:
        #     break
    params = {'fields': 'list_status', 'offset': offset, 'limit': '500', 'sort': 'list_score'}
        
    data = fetch_data(url, params)

    # Break if no data (e.g., 403 or empty response)
    if not data or "data" not in data:
        return None

    # Process anime list entries
    for anime in data["data"]:
        list_status = anime.get("list_status", {})
        anime_list.append([
            username,
            anime["node"]["id"],
            list_status.get("status", "unknown"),
            list_status.get("score", 0),
            list_status.get("num_episodes_watched", 0)
        ])

    return anime_list
    
    
def scrape_users_anime_lists():
    """Scrape anime lists for each user."""
    logging.info("Starting to scrape user anime lists...")
    users = pd.read_csv(USERS_FILE)
    all_anime_list = []
    for _, row in users.iterrows():
        username = row["username"]
        data = fetch_user_list(username=username)
        if data:
            all_anime_list.extend(data)
        if row['user_id'] % 10000 == 0: # If there are more than half million of nonzero scores, stop scraping
            column_4 = np.array([row[3] for row in all_anime_list], dtype=int)
            # Count nonzero values
            count_nonzero = np.count_nonzero(column_4)
            if count_nonzero >= 500000:
                num_of_scrapped = row['user_id']
                break

    df = pd.DataFrame(all_anime_list, columns=["user_id", "anime_id", "status", "score", "num_episodes_watched"])
    df.to_csv(ANIME_LIST_FILE, index=False)
    logging.info(f"Scraped anime lists for {num_of_scrapped} users. Saved to {ANIME_LIST_FILE}")

In [None]:
def scrape_anime_details():
    """Fetch detailed information for each anime."""
    logging.info("Starting to scrape detailed anime information...")
    anime_list = pd.read_csv(ANIME_LIST_FILE)
    anime_ids = anime_list["anime_id"].unique()
    anime_data = []
    failed_animes = []
    len_all = len(anime_ids)
    start_time = time.time()  # Record the start time
    check_interval = 10  # Update speed log every 10 requests
    last_update_time = start_time
    parsed_count = 0 
    params = {'fields': 'id,title,alternative_titles,start_date,end_date,synopsis,mean,rank,popularity,'
        'num_list_users,num_scoring_users,nsfw,genres,media_type,status,num_episodes,start_season,'
        'source,studios,related_anime'}
    
    for i, anime_id in enumerate(anime_ids):
        url = f"{BASE_URL}/anime/{anime_id}"
        parsed_count += 1
        try:
            data = fetch_data(url, params = params)
            if data:
                anime_data.append(data)
                logging.info(f"Fetched details for anime ID {anime_id}. {i} of {len_all}")
            else:
                logging.warning(f"Failed to fetch details for anime ID {anime_id}. {i} of {len_all}")
                failed_animes.append(anime_id)
        except Exception as e:
            logging.error(f"Failed to fetch details for anime ID {anime_id}: {e}. {i} of {len_all}")
            failed_animes.append(anime_id)
        if (i + 1) % check_interval == 0:  # Update speed log every `check_interval` requests
            elapsed_time = time.time() - start_time  # Total elapsed time in seconds
            elapsed_minutes = elapsed_time / 60  # Convert to minutes
            speed = parsed_count / elapsed_minutes if elapsed_minutes > 0 else 0  # Anime per minute

            logging.info(f"🔥 Speed: {speed:.2f} anime/min ({parsed_count} parsed in {elapsed_minutes:.2f} min)")

            last_update_time = time.time()  # Reset last update time

    df = pd.DataFrame(anime_data)
    
    df.to_csv(ANIME_DETAILS_FILE, index=False, encoding='utf-8')
    logging.info(f"Scraped details for {len(anime_data)} of {len_all} animes. Saved to {ANIME_DETAILS_FILE}")
    
    with open(FAILED_ANIMES, "w") as file: #Saving failed to parse due to external factors animes id for later retry 
        file.write("count,anime_id\n")
        for count, anime_id in enumerate(failed_animes):
            file.write(f"{count},{anime_id}\n")

In [None]:
def failed_anime_details():
    """Scraping of failed animes."""
    logging.info("Starting to scrape detailed anime information...")
    anime_list = pd.read_csv(FAILED_ANIMES)
    anime_ids = anime_list["anime_id"].unique()
    anime_data = []
    failed_animes = []
    len_all = len(anime_ids)
    start_time = time.time()  # Record the start time
    check_interval = 10  # Update speed log every 10 requests
    last_update_time = start_time
    parsed_count = 0 
    params = {'fields': 'id,title,alternative_titles,start_date,end_date,synopsis,mean,rank,popularity,'
        'num_list_users,num_scoring_users,nsfw,genres,media_type,status,num_episodes,start_season,'
        'source,studios,related_anime'}
    
    for i, anime_id in enumerate(anime_ids):
        url = f"{BASE_URL}/anime/{anime_id}"
        parsed_count += 1
        try:
            data = fetch_data(url, params = params)
            if data:
                anime_data.append(data)
                logging.info(f"Fetched details for anime ID {anime_id}. {i} of {len_all}")
            else:
                logging.warning(f"Failed to fetch details for anime ID {anime_id}. {i} of {len_all}")
                failed_animes.append(anime_id)
        except Exception as e:
            logging.error(f"Failed to fetch details for anime ID {anime_id}: {e}. {i} of {len_all}")
            failed_animes.append(anime_id)
        if (i + 1) % check_interval == 0:  # Update speed log every `check_interval` requests
            elapsed_time = time.time() - start_time  # Total elapsed time in seconds
            elapsed_minutes = elapsed_time / 60  # Convert to minutes
            speed = parsed_count / elapsed_minutes if elapsed_minutes > 0 else 0  # Anime per minute

            logging.info(f"🔥 Speed: {speed:.2f} anime/min ({parsed_count} parsed in {elapsed_minutes:.2f} min)")

            last_update_time = time.time()  # Reset last update time

    df = pd.DataFrame(anime_data)
    
    
    # df.to_csv(ANIME_DETAILS_FILE, index=False, encoding='utf-8')
    logging.info(f"Scraped details for {len(anime_data)} of {len_all} animes. Saved to {ANIME_DETAILS_FILE}")
    return df
    
    with open(FAILED_ANIMES, "w") as file:
        file.write("count,anime_id\n")
        for count, anime_id in enumerate(failed_animes):
            file.write(f"{count},{anime_id}\n")


In [None]:

logging.info("Starting the scraping pipeline...")
scrape_club_ids()


In [None]:
scrape_usernames_from_clubs()

In [None]:
scrape_users_anime_lists()

In [None]:
scrape_anime_details()

In [None]:
# If there are failed animes, retry fetching their details
df = failed_anime_details()
df.to_csv(os.path.join(BASE_PATH, 'anime_details_f.csv'), index=False, encoding='utf-8')
df_old = pd.read_csv(ANIME_DETAILS_FILE)
frames = [df, df_old]
result = pd.concat(frames)
result.to_csv(ANIME_DETAILS_FILE, index=False, encoding='utf-8')
logging.info("Failed anime details scraping completed successfully.")
logging.info("All tasks completed successfully.")

In [None]:
# If you want to download your data from myanimelist, you can use the following command:
username = 'your_username_here'  # Replace with your MyAnimeList username
df = pd.DataFrame(fetch_user_list(username), columns=["user_id", "anime_id", "status", "score", "num_episodes_watched"])
df.to_csv(os.path.join(BASE_PATH, f'your_animelist.csv'), index=False)