### **Data Wrangling and Visualization Assignment 1**

#### **Author**: Egor Chernobrovkin (B23-DS-02)
#### **Contact**: 
#### - Email: e.chernobrovkin@innopolis.university  
#### - Telegram: @lolyhop


### **Import necessary libraries**

In [11]:
# Standard libraries
import os
from dataclasses import dataclass
from typing import List, Union, Optional, Dict, Any, Tuple
from dotenv import load_dotenv
import requests
import json
import re

# 3rd party libraries
from bs4 import BeautifulSoup, Tag
import psycopg2

# Load environment variables
load_dotenv()

True

### **URLs and Film Model**

In [2]:
# Wikipedia URLs
wikipedia_url: str = "https://en.wikipedia.org/wiki/List_of_highest-grossing_films"
wikipedia_base_url: str = "https://en.wikipedia.org"

# Film model
@dataclass
class Film:
    """
    Represents a film with the following attributes:
    - title: The title of the film
    - film_url: The URL of the film on Wikipedia
    - release_year: The release year of the film
    - director: The director(s) of the film
    - box_office: The box office revenue of the film
    - country: The country(ies) of the film
    """
    title: Optional[str] = None
    film_url: Optional[str] = None
    release_year: Optional[int] = None
    director: Optional[Union[str, List[str]]] = None
    box_office: Optional[float] = None
    country: Optional[Union[str, List[str]]] = None

    def to_dict(self) -> Dict[str, Any]:
        """
        Converts the Film object to a dictionary.
        """
        return {
            'title': self.title,
            'film_url': self.film_url,
            'release_year': self.release_year,
            'director': self.director,
            'box_office': self.box_office,
            'country': self.country
        }
    
    def to_tuple(self) -> Tuple[str, str, int, str, float, str]:
        """
        Converts the Film object to a tuple.
        """
        return (self.title, self.film_url, self.release_year, ','.join(self.director), self.box_office, ','.join(self.country))


### **Define helper functions** to get the country and director of the film

In [3]:
def get_film_country(film_url: str) -> List[str]:
    """
    Gets the country(ies) of the film using the film URL.
    """
    country_response = requests.get(wikipedia_base_url + film_url)
    country_soup = BeautifulSoup(country_response.content, 'html.parser')
    country_table = country_soup.find('table', {'class': 'infobox'})
    
    if not country_table:
        return []
    
    # Find row where th contains "Country" or "Countries"
    country_row: Optional[Tag] = None
    for row in country_table.find_all('tr'):
        th = row.find('th')
        if th and ('country' in th.text.lower() or 'countries' in th.text.lower()):
            country_row = row
            break
            
    country: Optional[str] = country_row.find('td').text.strip() if country_row else None
    
    if not country:
        return []

    country: str = re.sub(r'\[\d+\]', '', country)
    return country.split('\n')

def get_film_director(film_url: str) -> List[str]:
    """
    Gets the director(s) of the film using the film URL.
    """
    director_response = requests.get(wikipedia_base_url + film_url)
    director_soup = BeautifulSoup(director_response.content, 'html.parser')
    director_table = director_soup.find('table', {'class': 'infobox'})
    
    if not director_table:
        return []
    
    # Find row where th contains "Directed by"
    director_row: Optional[Tag] = None
    for row in director_table.find_all('tr'):
        th = row.find('th')
        if th and ('directed' in th.text.lower()):
            director_row = row
            break
            
    if not director_row:
        return []
    
    # Get the director from the row
    director: str = director_row.find('td').text.strip()
    return director.split('\n')

def preprocess_box_office(box_office: Optional[str]) -> Optional[float]:
    """
    Returns the box office of the film as a float.
    """
    if box_office is None:
        return None
    box_office: str = box_office[box_office.find('$') + 1:]
    box_office: str = box_office.replace(',', '')
    # Remove all non-numeric characters
    box_office: str = re.sub(r'[^0-9.]', '', box_office)
    return float(box_office)

### **Fetch data about tables on the Wikipedia page**

In [4]:
# Fetch the Wikipedia page
response = requests.get(wikipedia_url)
soup = BeautifulSoup(response.content, 'html.parser')

# Find all the tables on the wikipedia page
raw_tables: List[Tag] = soup.find_all('table')

print(f"Found {len(raw_tables)} tables on the wikipedia page!")

# Find all the tables with films in the caption
tables_with_films_in_caption: List[Tag] = []
for table in raw_tables:
    if table.caption and 'film' in table.caption.text.lower():
        tables_with_films_in_caption.append(table)

print(f"Found {len(tables_with_films_in_caption)} tables with films in the caption!")

Found 89 tables on the wikipedia page!
Found 5 tables with films in the caption!


### **Parse the first table into Film objects**

In [5]:
# List of Film objects
films: List[Film] = []

# Parse first table into Film objects
table: Tag = tables_with_films_in_caption[0]
header_row: Tag = table.find('tr')
column_names: List[str] = [col.text.strip().lower().replace(' ', '_') for col in header_row.find_all('th')]
rows: List[Tag] = table.find_all('tr')[1:]

for idx, row in enumerate(rows):
    # Extract data from each row
    cells: List[Tag] = row.find_all('td')
    title_object: Tag = row.find('th')
    title: str = title_object.text.replace('\n', '')
    film_url: str = title_object.find('a')['href']

    # Get the release year of the film
    try:
        release_year_idx: int = column_names.index('year') - 1
        release_year: str = cells[release_year_idx].text.strip()
    except Exception:
        print(f"No year found for {title} | {release_year_idx}")
        release_year: str = None

    # Get the box office of the film
    box_office: Optional[str] = None
    box_office_idx: Optional[int] = None
    for i, col in enumerate(column_names):
        if 'gross' in col.lower():
            box_office_idx: int = i - 1
            break
    
    if box_office_idx is not None:
        box_office: str = cells[box_office_idx].text.strip()
        
    # Preprocess main elements of the Film object
    country: List[str] = get_film_country(film_url)
    director: List[str] = get_film_director(film_url)
    box_office: Optional[float] = preprocess_box_office(box_office)
    
    # Create a Film object and append it to the list
    film: Film = Film(title=title,
                      film_url=wikipedia_base_url + film_url,
                      release_year=release_year,
                      box_office=box_office,
                      country=country,
                      director=director)
    films.append(film)

### **Transfer films to Postgres database**

#### We will create a table in local Postgres database and transfer the films to it

In [None]:
# Connect to the database
creds: Dict[str, str] = {
    "host": os.getenv("HOST"),
    "port": os.getenv("PORT"),
    "database": os.getenv("DATABASE"),
    "user": os.getenv("USER"),
    "password": os.getenv("PASSWORD")
}


conn = psycopg2.connect(**creds)

# Create a cursor
cursor = conn.cursor()

In [14]:
# Create a table
cursor.execute("""
    CREATE TABLE IF NOT EXISTS films (
        id SERIAL PRIMARY KEY,
        title TEXT,
        film_url TEXT,
        release_year INTEGER,
        director TEXT,
        box_office FLOAT,
        country TEXT
    )
""")

# Clear the table if it's not empty
cursor.execute("DELETE FROM films")

# Insert the films into the table
for film in films:
    cursor.execute("""
        INSERT INTO films (title, film_url, release_year, director, box_office, country)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, film.to_tuple())

# Commit the changes
conn.commit()

### **Read the data from the database and save it to JSON file**

In [15]:
cursor.execute("SELECT * FROM films")
films: List[Film] = [Film(*film[1:]) for film in cursor.fetchall()]
films_dicts: List[Dict[str, Any]] = [film.to_dict() for film in films]

# Save the films to a JSON file
with open('films.json', 'w') as f:
    json.dump(films_dicts, f, indent=4, ensure_ascii=False)

In [16]:
print(f"Saved {len(films_dicts)} films to films.json")

Saved 50 films to films.json
