In [1]:
import re
import json
import sqlite3
import requests

from typing import Dict, List
from bs4 import BeautifulSoup

In [2]:
# Constant URLs for the Wikipedia base and the main page to scrape.

BASE_URL = "https://en.wikipedia.org"
MAIN_URL = "https://en.wikipedia.org/wiki/List_of_highest-grossing_films"

The `parse_main_page` function fetches and parses the main Wikipedia page to extract basic film data (title, release year, box office, and URL).
- It targets the `wikitable` class table, skips the header row, and processes each data row.
- Uses regex to extract a 4-digit year from the release year text.
- Returns a list of dictionaries, each representing a film with initial data and placeholders for additional details.

In [3]:
def parse_main_page(): 
    response = requests.get(MAIN_URL)
    
    if response.status_code != 200:
        raise Exception(f"Failed to load page {MAIN_URL}")
    
    main_soup = BeautifulSoup(response.content, "html.parser")
    table = main_soup.find("table", class_="wikitable")
    films = []

    # Loop through each table row (skip header)
    for row in table.find_all("tr")[1:]:
        cells = row.find_all(["th", "td"])
        if len(cells) < 5:
            continue
        # Extract title and film URL from the third cell.
        title_cell = cells[2]
        title_link = title_cell.find("a")
        if title_link:
            title = title_link.get_text(strip=True)
            relative_link = title_link.get("href")
            film_url = BASE_URL + relative_link
        else:
            title = title_cell.get_text(strip=True)
            film_url = None

        # Extract Box Office from the fourth cell.
        box_office = cells[3].get_text(strip=True)
        # Extract Release Year from the fifth cell.
        release_year_text = cells[4].get_text(strip=True)
        try:
            release_year = int(re.search(r'\d{4}', release_year_text).group())
        except Exception:
            release_year = None

        film_record = {
            "title": title,
            "release_year": release_year,
            "box_office": box_office,
            "film_url": film_url,
            "directors": None,
            "country": None,
            "production_companies": None,
            "image_url": None,
        }
        films.append(film_record)
    return films

In [4]:
async def fetch(session, url):
    """Fetches URL text asynchronously."""
    async with session.get(url) as response:
        if response.status != 200:
            print(f"Failed to load {url}: {response.status}")
            return None
        return await response.text()

The `scrape_film_page` scrapes additional details (directors, country, production companies, image URL) from each film’s Wikipedia page. This was necessary because the `parse_main_page` fn above doesn't complete retrieve all the relevant details.
- Uses the `infobox` table, handling edge cases like missing data or malformed URLs.
- Updates the input film dictionary directly and returns it, with error handling for failed requests.

In [5]:
def scrape_film_page(film):
    """
    Synchronously scrape additional film details from its URL and update the film dict.
    Args:
        film (dict): Dictionary containing at least 'film_url' and 'title'
    Returns:
        dict: Updated film dictionary with scraped details
    """
    url = film["film_url"]
    if not url:
        return film  # Return unchanged if no URL
    
    try:
        # Synchronous HTTP request
        response = requests.get(url, timeout=10)
        response.raise_for_status()  # Raise exception for bad status codes
        
        # Parse the HTML content
        soup = BeautifulSoup(response.text, 'html.parser')
        
        infobox = soup.find("table", class_="infobox")
        if not infobox:
            return film

        # Extract the image URL from the infobox.
        image_url = None
        image_cell = infobox.find("td", class_="infobox-image")
        if image_cell:
            img = image_cell.find("img")
            if img and img.has_attr("src"):
                src = img["src"]
                if src.startswith("//"):
                    image_url = "https:" + src
                elif src.startswith("http"):
                    image_url = src
                else:
                    image_url = BASE_URL + src

        directors = None
        country = None
        production_companies = None

        # Loop through each row in the infobox for additional details.
        for row in infobox.find_all("tr"):
            header = row.find("th")
            data_cell = row.find("td")
            if not header or not data_cell:
                continue
            header_text = header.get_text(strip=True)
            # Extract Directors
            if "Directed by" in header_text:
                directors = data_cell.get_text(separator=", ", strip=True)
            # Extract Country (only the first country)
            if header_text.lower() in ["country", "countries"]:
                raw_country = data_cell.get_text(separator=", ", strip=True)
                country = raw_country.split(",")[0].strip()
            # Extract Production Companies (optional)
            if "Production" in header_text and "company" in header_text.lower():
                production_companies = data_cell.get_text(separator=", ", strip=True)

        film["directors"] = directors
        film["country"] = country
        film["production_companies"] = production_companies
        film["image_url"] = image_url
        return film
    except requests.RequestException as e:
        print(f"Failed to scrape {film['title']}: {e}")
    
    return film

## SQLite database setup
`create_films_table`: Sets up an SQLite table with columns matching the film data structure and clears it for a fresh run.
- `push_to_sqlite`: Inserts film records into the table, using `.get()` to handle missing keys with defaults.
- Both functions manage database connections properly with `commit` and `close`.

In [6]:
# Define the SQLite database file
DB_FILE = "films.db"

def create_films_table():
    """Create the films table in SQLite if it doesn't exist."""
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    
    # Create table with appropriate columns
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS films (
            id INTEGER PRIMARY KEY,
            title TEXT NOT NULL,
            release_year TEXT,
            box_office TEXT,
            directors TEXT,
            country TEXT,
            production_companies TEXT,
            image_url TEXT
        )
    ''')
    
    # Clear the table for a fresh run (similar to delete_many in MongoDB)
    cursor.execute('DELETE FROM films')
    
    conn.commit()
    conn.close()

In [7]:
def push_to_sqlite(films: List[Dict]):
    """Push the film records into SQLite."""
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    
    # Prepare and insert film records
    for idx, film in enumerate(films, start=1):
        cursor.execute('''
            INSERT INTO films (id, title, release_year, box_office, directors, country, production_companies, image_url)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            idx,
            film.get('title', ''),
            film.get('release_year', ''),
            film.get('box_office', ''),
            film.get('directors', 'Not found'),
            film.get('country', 'Not found'),
            film.get('production_companies', 'Not found'),
            film.get('image_url', 'Not found')
        ))
    
    conn.commit()
    print(f"Inserted {len(films)} records into SQLite.")
    conn.close()

The `export_to_json` fn exports the SQLite data to a JSON file by fetching and converting the entries dictionaries, and writing them with proper formatting. UTF-8 encoding to ensure compatibility with special characters.

In [8]:
def export_to_json(output_file: str = "films.json"):
    """Read all data from SQLite and save to JSON."""
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    
    # Fetch all records
    cursor.execute('SELECT * FROM films')
    columns = [description[0] for description in cursor.description]
    rows = cursor.fetchall()
    
    # Convert to list of dictionaries
    films_data = [dict(zip(columns, row)) for row in rows]
    
    # Save to JSON
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(films_data, f, indent=4)
    
    print(f"Exported {len(films_data)} records to {output_file}")
    conn.close()

In [9]:
films = parse_main_page() 
# Synchronous scraping loop
updated_films = []
for film in films:
    if film["film_url"]:
        # Pass the entire film dict, assuming scrape_film_page updates it
        updated_film = scrape_film_page(film)  # Adjusted to pass film dict
        updated_films.append(updated_film)

# Update the film records
films_dict = {film["title"]: film for film in films}
for film in updated_films:
    if film:  # Ensure film is not None
        films_dict[film["title"]] = film
films = list(films_dict.values())

# Print the films
for film in films:
    print("--------------------------------------------------")
    print(f"Title: {film['title']}")
    print(f"Release Year: {film['release_year']}")
    print(f"Box Office: {film['box_office']}")
    print(f"Directed by: {film.get('directors', 'Not found')}")
    print(f"Country: {film.get('country', 'Not found')}")
    print(f"Production Companies: {film.get('production_companies', 'Not found')}")
    print(f"Image URL: {film.get('image_url', 'Not found')}")
    print("--------------------------------------------------\n")

# Create table and push data to SQLite
create_films_table()
push_to_sqlite(films)

# Export to JSON
export_to_json()

--------------------------------------------------
Title: Avatar
Release Year: 2009
Box Office: $2,923,706,026
Directed by: James Cameron
Country: United Kingdom
Production Companies: None
Image URL: https://upload.wikimedia.org/wikipedia/en/thumb/d/d6/Avatar_%282009_film%29_poster.jpg/220px-Avatar_%282009_film%29_poster.jpg
--------------------------------------------------

--------------------------------------------------
Title: Avengers: Endgame
Release Year: 2019
Box Office: $2,797,501,328
Directed by: Anthony Russo, Joe Russo
Country: United States
Production Companies: Marvel Studios
Image URL: https://upload.wikimedia.org/wikipedia/en/0/0d/Avengers_Endgame_poster.jpg
--------------------------------------------------

--------------------------------------------------
Title: Avatar: The Way of Water
Release Year: 2022
Box Office: $2,320,250,281
Directed by: James Cameron
Country: United States
Production Companies: None
Image URL: https://upload.wikimedia.org/wikipedia/en/thum