In [1]:
import requests
from bs4 import BeautifulSoup, NavigableString, Tag
import time
import polars as pl
from tqdm import tqdm
headers = {
    "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:125.0) Gecko/20100101 Firefox/125.0"
}


In [2]:
from collections import namedtuple
field_names=(
    "title",
    "description",
    "slug",
    "url",
    "rating",
    "duration",
    "genres",
    "actors",
    "directors",
    "release_date",
    "image"
)

Movie = namedtuple("Movie", field_names=field_names, defaults=[None for _  in field_names])

In [3]:
def get_movie_card_info(movie_card:Tag | NavigableString)->Movie:
    title = movie_card.find('h2', class_='th-title').text.strip()
    genres = movie_card.find('div', class_='th-cat').text.strip().split(", ")
    image_url = movie_card.find('img')['data-src']
    link = movie_card.find('a', class_='th-in js-tip')['href']
    return Movie(title=title, genres=genres, image=image_url, url=link)

In [4]:
def get_movie_detail_info(movie_url: str)-> Tag | NavigableString | None:
    try:
        resp = requests.get(headers=headers, url=movie_url)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.content, "html.parser")
        return soup.find('div', class_='fright')
    except requests.exceptions.HTTPError as e:
        print(e)
        return None

In [5]:
def update_movie_detail_info(movie: Movie, movie_info:Tag | NavigableString)->Movie:
    return Movie(
        title=movie.title, genres=movie.genres, image=movie.image, url=movie.url,
        duration = movie_info.find_all('div', class_='short-info')[1].contents[-1].strip(),
        release_date = movie_info.find_all('div', class_='short-info')[2].contents[-1].strip(),
        directors = movie_info.find_all('div', class_='short-info')[3].contents[-1].strip(),
        actors = movie_info.find_all('div', class_='short-info')[4].contents[-1].strip().split(", "),
        rating = movie_info.find_all('div', class_='short-info')[5].contents[-1].strip(),
        description = movie_info.find('div', class_='fdesc full-text clearfix').text.strip(),
    )

In [6]:
def get_movie(movie_card)->Movie:
    movie = get_movie_card_info(movie_card)
    if movie_info := get_movie_detail_info(movie.url):
        movie = update_movie_detail_info(movie, movie_info)
    return movie

In [7]:
def first_test():
    resp = requests.get(headers=headers, url="https://filmoflix.to/")
    soup = BeautifulSoup(resp.content, "html.parser")
    movie_cards = iter(soup.find_all('div', class_="th-item"))
    movie_card = next(iter(movie_cards))
    print(movie_card)
    return get_movie(movie_card)

In [8]:
# first_test()

In [9]:
def test_download():
    resp = requests.get(headers=headers, url="https://filmoflix.to/")
    soup = BeautifulSoup(resp.content, "html.parser")
    pl.LazyFrame((get_movie(mc) for mc in soup.find_all('div', class_="th-item"))).sink_parquet("movies-first.parquet")

In [10]:
# test_download()

In [11]:
def get_movies():
    for i in tqdm(range(2, 605), total=len(range(2, 605))):
        try:
            resp = requests.get(headers=headers, url=f"https://filmoflix.to/film/page/{i}")
            resp.raise_for_status()
        except requests.exceptions.HTTPError:
            continue
        time.sleep(0.1)
        soup = BeautifulSoup(resp.content, "html.parser")
        movie_cards = soup.find_all(class_="th-item")
        for movie_card in movie_cards:
            yield get_movie(movie_card)

In [12]:
schema={
    "title": pl.Utf8,
    "description": pl.Utf8,
    "slug": pl.Utf8,
    "url": pl.Utf8,
    "rating": pl.Utf8,
    "duration": pl.Utf8,
    "genres": pl.List(pl.Utf8),
    "actors": pl.List(pl.Utf8),
    "directors": pl.Utf8,
    "release_date": pl.Utf8,
    "image": pl.Utf8
}
(
    pl.LazyFrame(
        get_movies(),
        schema=schema,
    )
    .sink_parquet("movies-full.parquet")
)

  0%|          | 0/603 [00:00<?, ?it/s]

100%|██████████| 603/603 [49:10<00:00,  4.89s/it]
