***
# Data definitions
***

A python dataclass that defines the fields we will collect about the IMDB movie dataset.

In [None]:
from dataclasses import dataclass
from decimal import Decimal
import datetime

# Define our data type
@dataclass
class ImdbMovie:
    id: int
    title: str
    runtime: int | None
    user_rating: float | None
    votes: int | None
    mpaa_rating: str | None
    release_date: str | None
    budget: str | None
    opening_weekend: str | None
    gross_sales: str | None
    genres: str | None
    cast: str | None
    director: str | None
    producer: str | None
    company: str | None


***
# Cinemagoer Logic
***

A collection of utility functions that retrieve information from the Python Cinemagoer library.

In [None]:
import dateutil.parser as parser
import pprint

# create and instance of the IMDb class
from imdb import Cinemagoer

# Avengers endgame: movie = ia.get('4154796')
# cast - top 20
# Runtime ?
# Box Office - Budget
# Box Office - Cumulative Worldwide Gross
# Box Office - Opening Weekend United States
# rating
# votes
# year
# kind = movie (ignore TV)
# directors - top 20
# producers - top 20
# production companies - top 20

def try_float(value: str | None) -> float | None:
    if value is None:
        return None
    try:
        return float(value)
    except:
        return None

def try_int(value: str | None) -> int | None:
    if value is None:
        return None
    try:
        return int(value)
    except:
        return None

def get_top(items: list, count: int) -> list[str]:
    try:
        results = []
        for item in items:
            results.append(item['name'])
        return results[0:count]
    except:
        return []

def get_box_office(movie: dict, box_office_type: str) -> Decimal | None:
    try:
        bo = movie['box office']
        if box_office_type in bo:
            return bo[box_office_type]
        else:
            return None
    except:
        return None

def earliest_release_date(movie: dict) -> datetime.date | None:
    try:
        earliest = parser.parse("9999-12-31")
        ia = Cinemagoer()
        release_info = ia.get_movie_release_info(movie.movieID)
        for rrd in release_info['data']['raw release dates']:
            rawdate = rrd['date']
            date = parser.parse(rawdate)
            if date < earliest:
                earliest = date
        if earliest < datetime.datetime.now():
            return earliest.date()
        return None
    except:
        return None

def get_mpaa_rating(movie: dict) -> str:
    try:
        for cert in movie['certificates']:
            if cert.startswith("United States:"):
                return cert[14:]
    except:
        return ""

def safe_fetch_from_movie(movie: dict, keys: str, count: int) -> list[str]:
    for key in keys:
        values = movie.get(key, [])
        while("" in values) :
            values.remove("")
        if len(values) > 0:
            return get_top(values, count)
    return ["","","","",""]

def gather_movie_info(movie: dict) -> ImdbMovie:
    title = movie['title']
    id = movie.movieID

    # Check release date
    release_date = earliest_release_date(movie)
    if release_date is None:
        return None

    # Skip any movies released before 1978, when Richard Donner released Superman:
    if release_date.year < 1978:
        print(f"The movie {id} {title} predates Superman (1978).  Skipping.")
        return None

    # Gather other facts
    runtime_list = movie.get('runtimes', [])
    if len(runtime_list) > 0:
        runtime = try_int(runtime_list[0])
    else:
        runtime = None
    rating = try_float(movie.get('rating'))
    votes = try_int(movie.get('votes'))
    mpaa = get_mpaa_rating(movie)
    directors = safe_fetch_from_movie(movie, ['directors', 'director'], 5)
    producers = safe_fetch_from_movie(movie, ['producers', 'producer'], 5)
    companies = safe_fetch_from_movie(movie, ['production companies', 'production company'], 5)
    cast = safe_fetch_from_movie(movie, ['cast'], 10)

    # Skip any movies tagged with the genre "short", which might be an animated clip
    genres = movie.get('genres', [])
    if len(set(genres).intersection(["Short"])) > 0:
        print(f"The movie {id} {title} is a short.  Skipping.")
        return None

    # Capture monetary numbers
    budget = get_box_office(movie, 'Budget')
    opening_weekend = get_box_office(movie, 'Opening Weekend United States')
    gross_sales = get_box_office(movie, 'Cumulative Worldwide Gross')
    if gross_sales is None:
        gross_sales = get_box_office(movie, 'Gross worldwide')
    return ImdbMovie(id, title, runtime, rating, votes, mpaa, str(release_date), 
        budget, opening_weekend, gross_sales, ", ".join(genres), ", ".join(cast), ", ".join(directors), ", ".join(producers), ", ".join(companies))

# Retrieve information about the movie using the IMDB parser library
def gather_movie_info_by_id(id: int) -> list[any]:
    ia = Cinemagoer()
    movie = ia.get_movie(str(id))
    return gather_movie_info(movie)


***
# Data storage code
***

A collection of functions that manage storage for movie data in SQL Server.

In [None]:
import json
import pymongo
import certifi
import dataclasses

# Retrieve credentials for SQL Server from our credentials file
with open('f:\\git\\credentials.json') as f:
    data = json.load(f)
    mongo_connection_string = data['mongodb']    

client = pymongo.MongoClient(mongo_connection_string, tlsCAFile=certifi.where())

# Fetch the database named "DA320"
da320_database = client['DA320']

# Create or open the collections we will use
imdb_data = da320_database['imdb_new']
imdb_id = da320_database['imdb_id']
imdb_badmovie = da320_database['imdb_badmovie']

# Check if this movie is already in the database
def movie_already_captured(id: int) -> bool:

    # Check if the movie was successfully captured
    existing_movie = imdb_data.find_one({"id": id})
    if existing_movie is not None:
        return True

    # Check if movie is un-capturable for some reason
    uncapturable_movie = imdb_badmovie.find_one({"id": id})
    if uncapturable_movie is not None:
        return True
    return False

# Save this row to the database
def record_bad_movie(id: int) -> None:
    imdb_badmovie.insert_one({"id": id})

# Save this row to the database
def write_to_database(movie: ImdbMovie) -> None:
    imdb_data.insert_one(dataclasses.asdict(movie))

# Check if this movie is already in the database
def get_id_list_from_year(year: int) -> list[str] | None:
    year_list = imdb_id.find_one({"year": year})
    if year_list is not None:
        return year_list['id_list']
    return None

# Save this row to the database
def write_id_list_to_database(year: int, id_list: list[str]) -> None:
    imdb_id.insert_one({"year": year, "id_list": id_list})



***
# Fetch a list of IMDB movie IDs by year
***

A function that collects a list of IMDB movie IDs since this option did not exist in Cinemagoer when this script was developed.

In [None]:
import urllib3
import certifi
import re

# Construct an HTTP pool for connections
http = urllib3.PoolManager(ca_certs=certifi.where())

def fetch_movie_list_by_year(year: int) -> list[str]:
    # Have we already retrieved the ID list for this year?  If so, save some time
    existing_id_list = get_id_list_from_year(year)
    if existing_id_list is not None:
        return existing_id_list

    # Fetch a list of movies using the filter criteria: 
    #  * Type = Movie
    #  * Votes > 1000
    ids = []
    url = f"https://www.imdb.com/search/keyword/?ref_=kw_ref_yr&mode=detail&title_type=movie&num_votes=1000%2C&release_date={year}%2C{year}"
    page = 1
    while True:
        if page % 10 == 0:
            print(f"Fetching page {page} ({len(ids)} found so far)")
        r = http.request('GET', f"{url}&page={page}", headers={'User-Agent': 'Mozilla/5.0'})
        datastring = str(r.data, "utf-8")
        idlist = re.findall("div class=\"lister-item-image ribbonize\" data-tconst=\"tt(\d+)\"", datastring)
        if len(idlist) == 0:
            break
        ids = ids + idlist
        page = page + 1
    print(f"Found {len(ids)} movies for year {year}.")
    write_id_list_to_database(year, ids)
    return ids


***
# Scraping logic
***

Using the above code, we will collect information about IMDB movies with more than 1000 votes for the years 1978 through 2021.  

The algorithm runs in repeatable mode, and it will check whether a movie has already been scraped into SQL before scraping.

In [5]:
import dateutil.parser as parser
import datetime

# Iterate through the years for this research project
allmovies = []
for year in range(1978, 2021, 1):
    movies_checked = 0
    movies_captured = 0
    movies_written = 0
    bad_movies = 0
    print(f"Fetching IDs of movies with > 1000 votes for year {year}...")
    ids = fetch_movie_list_by_year(year)
    print(f"Retrieving {len(ids)} movies for year {year}...")

    # Gather movie information
    for id in ids:
        movies_checked += 1

        # Avoid recapturing a movie if it's already in the DB; this makes the code restartable
        if not movie_already_captured(id):
            info = gather_movie_info_by_id(id)
            movies_captured += 1
            if info is None:
                record_bad_movie(id)
                bad_movies += 1
            else:
                write_to_database(info)
                movies_written += 1

    # Blank row at the end
    print(f"Captured data for {len(ids)} movies for year {year}. ({movies_checked}/{movies_captured}/{movies_written}/{bad_movies})")


***
# Tests
***

This code block includes some tests for known bad situations.

In [None]:

# Test superman (1978)
from pprint import pprint
superman = gather_movie_info_by_id(78346)
print(pprint(superman))


# Test Devilman (2004) - this should not have a budget amount since it is in ¥ rather than $,
# but it should have a gross sales amount
from pprint import pprint
devilman = gather_movie_info_by_id(373786)
print(pprint(devilman))
