In [None]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import os
import time



In [None]:
service = ChromeService(ChromeDriverManager().install())
chrome_options = ChromeOptions()
chrome_options.add_argument("--headless=new")  # Chế độ headless
chrome_options.add_argument(
    "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
)

In [None]:
def get_genre_list():
    driver = webdriver.Chrome(service=service, options=chrome_options)
    url = "https://www.imdb.com/search/title/"
    driver.get(url)
    wait = WebDriverWait(driver, 10)
    try:
        time.sleep(5)

        genre_section = wait.until(EC.presence_of_element_located((By.ID, "accordion-item-genreAccordion")))

        genres = [
            button.text for button in genre_section.find_elements(By.TAG_NAME, "button")
        ]
        return genres
    except Exception as e:
        print(f"ko tim thấy: {e}")
        return None
    finally:
        driver.quit()

In [None]:
genres =get_genre_list()
print("Thể loại phim trên IMDb:")
for index,genre in enumerate(genres):
    print(f"{index+1}.{genre}")

In [None]:
TARGET_MOVIES = 2000

In [None]:
def get_links(genre,target_movies):
    driver = webdriver.Chrome(service=service, options=chrome_options)
    # https://www.imdb.com/search/title/?title_type=feature,tv_series&genres=animation
    url = f"https://www.imdb.com/search/title/?title_type=feature,tv_series&genres={genre.lower()}"
    driver.get(url)
    time.sleep(5)
    movie_urls = set()
    while len(movie_urls) < target_movies:
        ul_tag = driver.find_element(By.CSS_SELECTOR, "ul.ipc-metadata-list")
        movie_links = ul_tag.find_elements(By.CSS_SELECTOR, "a.ipc-title-link-wrapper")

        for link in movie_links:
            movie_urls.add(link.get_attribute("href"))

        if len(movie_urls) >= target_movies:
            break   

        try:
            more_button=driver.find_element(By.CSS_SELECTOR, "button.ipc-see-more__button")
            ActionChains(driver).move_to_element(more_button).click().perform()
            time.sleep(5)
        except:
            print("đã hết phim")
            break

    driver.quit()
    return list(movie_urls)

In [None]:
movies_by_genre = {}

genres=get_genre_list()
for index,genre in enumerate(genres):
    filename = f"./links/{genre}.txt"
    movies_by_genre[genre] = get_links(genre, target_movies=300)
    with open(filename, "w", encoding="utf-8") as f:
        for link in movies_by_genre[genre]:
            f.write(link + "\n")

    print(f"{index}. Đã lưu {len(movies_by_genre[genre] )} phim vào {filename}")
    


In [None]:
import os


def read_links_from_files():

    movies_by_genre = {}
    directory = "links"  

    if not os.path.exists(directory):
        print(f"Thư mục '{directory}' không tồn tại!")
        return movies_by_genre

    for filename in os.listdir(directory):
        if filename.endswith(".txt"): 
            genre = filename.replace(".txt", "")  # Lấy tên thể loại từ tên file
            file_path = os.path.join(directory, filename)  # Đường dẫn file

            # Đọc nội dung file
            with open(file_path, "r", encoding="utf-8") as f:
                movie_links = [
                    line.strip() for line in f.readlines()
                ]  # Đọc tất cả link

            # Lưu vào dictionary
            movies_by_genre[genre] = movie_links

    return movies_by_genre


movies_by_genre = read_links_from_files()


for genre, links in movies_by_genre.items():
    print(f"{genre}: {len(links)} phim")

In [None]:
def get_movie_details(url):
    service = ChromeService(ChromeDriverManager().install())
    chrome_options = ChromeOptions()
    chrome_options.add_argument("--headless=new")  # Chế độ headless
    chrome_options.add_argument(
        "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
    )
    driver = webdriver.Chrome(service=service, options=chrome_options)
    driver.get(url)
    time.sleep(5)

    try:
        name = driver.find_element(By.CSS_SELECTOR, "span.hero__primary-text").text

        genres =  ", ".join([
            g.text
            for g in driver.find_elements(
                By.CSS_SELECTOR, "div.ipc-chip-list__scroller span.ipc-chip__text"
            )
        ])

        try:
            rating = driver.find_element(By.CSS_SELECTOR, 'div[data-testid="hero-rating-bar__aggregate-rating__score"] span').text
        except:
            rating="NA"

        try:
            no_of_votes = driver.find_element(By.CLASS_NAME, "sc-d541859f-3").text
        except:
            no_of_votes = "NA"

        try:
            release_date = driver.find_element(
                By.CSS_SELECTOR,
                'li[data-testid="title-details-releasedate"] a.ipc-metadata-list-item__list-content-item--link',
            ).text
        except:
            release_date = "NA"

        try:
            gross = driver.find_element(
                By.CSS_SELECTOR,
                'li[data-testid="title-boxoffice-cumulativeworldwidegross"] span.ipc-metadata-list-item__list-content-item'
            ).text
        except:
            gross = "NA"

        try:
            countries =  ", ".join( [
                country.text
                for country in driver.find_elements(
                    By.CSS_SELECTOR, 'li[data-testid="title-details-origin"] a'
                )
            ])
        except:
            countries = "NA"

        try:
            budget = driver.find_element(
                By.CSS_SELECTOR,
                'li[data-testid="title-boxoffice-budget"] span.ipc-metadata-list-item__list-content-item',
            ).text
        except:
            budget = "NA"

        try:
            meta_score = driver.find_element(By.CSS_SELECTOR, "span.metacritic-score-box").text
        except:
            meta_score = "NA"

        try:
            el=driver.find_element(By.CSS_SELECTOR, 'section[data-testid="episodes-widget"]')
            type='TV Series'
        except:
            type = "Movie"
    except Exception as e:
        print(f" Lỗi khi lấy dữ liệu từ {url}: {e}")
        return None   

    driver.quit()
    return {
        "name": name,
        "genres": genres,
        "type": type,
        "rating": rating,
        "no_of_votes": no_of_votes,
        "meta_score": meta_score,
        "release_date": release_date,
        "gross": gross,
        "budget": budget,
        "countries": countries,
        "url": url,
    }

In [None]:
NUM_THREADS = 2

In [None]:
def scrape_movies(fileout='movies_data.csv'):
    movies_data = []
    for genre,movie_links in movies_by_genre.items():
        for index,url in enumerate(movie_links):
            data = get_movie_details(url)
            if data:
                movies_data.append(data)
                print(f"{index}. Đã lấy thông tin phim: {data['name']}")
    df = pd.DataFrame(movies_data)
    df.to_csv(fileout, mode="a", index=False)

In [None]:
def scrape_movie_parallel(fileout="movies_data.csv"):
    movies_data = []

    # Chạy song song với ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=4) as executor:
        future_to_url = {
            executor.submit(get_movie_details, url): (genre, index, url)
            for genre, movie_links in movies_by_genre.items()
            for index, url in enumerate(movie_links)
        }

        for future in as_completed(future_to_url):
            genre, index, url = future_to_url[future]
            try:
                data = future.result()
                if data:
                    movies_data.append(data)
                    print(
                        f"{index}. Đã lấy thông tin phim: {data['name']} (Thể loại: {genre})"
                    )
            except Exception as e:
                print(f" Lỗi khi lấy phim {url}: {e}")

    # Lưu kết quả vào CSV
    if movies_data:
        df = pd.DataFrame(movies_data)
        df.to_csv(fileout, mode="a", index=False)

    print(f"Hoàn thành! Đã lưu {len(movies_data)} phim vào {fileout}")

In [None]:
scrape_movies()

In [None]:
scrape_movie_parallel()