## Python Movie Data Collection Script

This script is devised to fetch movie-related data from TheMovieDB API and, using the `imdb_id`, obtain metadata from the OMDB API, merge the datasets, and store the collected data in Parquet format. Additionally, it creates a dataset of credits from TheMovieDB API. Below is a breakdown of its functionalities:

### 1. **Importing Required Libraries**:
   - Essential libraries such as `requests`, `os`, `backoff`, and `pandas` are imported to handle HTTP requests, manage environment variables, implement retry logic, and manipulate data respectively.

### 2. **Header Configuration**:
   - HTTP headers are configured for authorization using an API token retrieved from environment variables. Obtain the token access from [TheMovieDB API](https://developer.themoviedb.org/reference/intro/getting-started) and the OMDB API key from [OMDB API](https://www.omdbapi.com/apikey.aspx). Note: The OMDB API key is limited to 1000 calls per day for free.
   - After acquiring the keys, create a `.env` file in the root folder and populate it with the following values:

        ```plaintext
        API_KEY=
        API_TOKEN=
        OMDB_KEY=
        ```

### 3. **Retry Logic**:
   - Custom retry logic is defined using the `backoff` library to handle HTTP errors, particularly focusing on status code 429 (Too Many Requests).

### 4. **API Call Function**:
   - A function `call_get(url)` is outlined to make GET requests to the specified URL and to raise exceptions for unsuccessful responses.

### 5. **Movie Data Retrieval Functions**:
   - `get_latest_movie()`: Fetches the latest movie data from TheMovieDB API.
   - `get_movie_credits_by_id(id)`: Obtains movie credits based on the movie ID from TheMovieDB API, with error handling for non-existent movie IDs.
   - `get_movie_by_id(id)`: Retrieves movie data by ID from TheMovieDB API, also with error handling for non-existent movie IDs.
   - `get_movie_from_omdb(imdb_id, api_key)`: Fetches movie data from the OMDB API using the IMDb ID.

### 6. **Bulk Data Collection Functions**:
   - `fetch_all_movies(start_id, last_id)` and `fetch_all_credits(start_id, last_id)` are crafted to loop through a range of movie IDs, collecting and merging movie data from both TheMovieDB and OMDB APIs, and movie credits from TheMovieDB API respectively.

### 7. **Data Normalization and Storage**:
   - The script utilizes `pandas` to normalize the collected JSON data into a tabular format and subsequently stores the data in Parquet files with gzip compression, for both movies and credits data.

This script showcases a systematic approach to collecting, normalizing, and storing movie-related data from various online sources through API interactions. It employs robust error handling and retry logic to ensure the reliability of the data collection process.


In [4]:
import requests
import os
import backoff
import pandas as pd
from dotenv import load_dotenv
from typing import Optional, Union, Dict

load_dotenv('../.env')

headers = {
    "accept": "application/json",
    "Authorization": f"Bearer {os.getenv('API_TOKEN')}"
}


# Custom retry condition to handle HTTP 429 status code (Too Many Requests)
def giveup(exc: requests.exceptions.HTTPError) -> bool:
    """Determines whether to give up a retry attempt.
    
    Args:
        exc (requests.exceptions.HTTPError): The exception raised during the HTTP request.
        
    Returns:
        bool: True if the exception's status code is not 429 - Too Many Requests, otherwise False.
    """
    return exc.response.status_code != 429


@backoff.on_exception(
    backoff.expo,  # Exponential backoff strategy for Too Many Requests error
    requests.exceptions.HTTPError,  # Exception to look for
    max_tries=10,  # Maximum retry attempts
    giveup=giveup  # Function to determine if retry should be aborted
)
def call_get(url: str) -> Union[Dict, None]:
    """Makes a GET request to a specified URL and handles potential errors.
    
    Args:
        url (str): The URL to send a GET request to.
        
    Returns:
        dict: The JSON response from the GET request.
    """
    response = requests.get(url, headers=headers)
    response.raise_for_status() 
    return response.json()


def get_latest_movie() -> Dict:
    """Fetches the latest movie data from TheMovieDB API.
    
    Returns:
        dict: The latest movie data.
    """
    url = "https://api.themoviedb.org/3/movie/latest"
    return call_get(url)


def get_movie_credits_by_id(id: int) -> Optional[Dict]:
    """Obtains movie credits based on the movie ID from TheMovieDB API.
    
    Args:
        id (int): The ID of the movie.
        
    Returns:
        dict: The movie credits data, or None if not found.
    """

    url = f"https://api.themoviedb.org/3/movie/{id}/credits?language=en-US"
    try:
        return call_get(url)
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            try:
                error_response = e.response.json()
                if error_response.get('status_code') == 34:
                    print(f"No credits found for ID: {id}")
                    return None
                
            except ValueError:
                print(f"Received unexpected response: {e.response.text}")
        else:
            print(f"An error occurred: {e}")
            raise e


def get_movie_by_id(id: int) -> Optional[Dict]:
    """Retrieves movie data by ID from TheMovieDB API.
    
    Args:
        id (int): The ID of the movie.
        
    Returns:
        dict: The movie data, or None if not found.
    """
    url = f"https://api.themoviedb.org/3/movie/{id}?language=en-US"
    try:
        return call_get(url)
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            try:
                error_response = e.response.json()
                if error_response.get('status_code') == 34:
                    print(f"No movie found for ID: {id}")
                    return None
                
            except ValueError:
                print(f"Received unexpected response: {e.response.text}")
        else:
            print(f"An error occurred: {e}")
            raise e

def get_movie_from_omdb(imdb_id: str, api_key: str = os.getenv('OMDB_KEY')) -> Dict:
    """Fetches movie data from the OMDB API using the IMDb ID.
    
    Args:
        imdb_id (str): The IMDb ID of the movie.
        api_key (str): The API key for OMDB API.
        
    Returns:
        dict: The movie data from OMDB API.
    """
    url = f"https://www.omdbapi.com/?i={imdb_id}&apikey={api_key}"
    response = requests.get(url)
    response.raise_for_status() 
    return response.json()

def fetch_all_movies(start_id: int, last_id: int) -> List[Dict]:
    """Loops through a range of movie IDs, collecting and merging movie data from both APIs.
    
    Args:
        start_id (int): The starting movie ID.
        last_id (int): The ending movie ID.
        
    Returns:
        list: A list of merged movie data dictionaries.
    """
    all_movies = [] 
    for id in range(start_id, last_id + 1): 
        movie = get_movie_by_id(id)
        if movie is not None and movie["imdb_id"] and \
            movie['revenue'] !=0 and movie['status'] == 'Released' and \
            movie ['budget'] !=0:
            # check this movie in omdb
            try: 
                omdb_json = get_movie_from_omdb(movie["imdb_id"])
                merged = {**movie, **omdb_json}
            except requests.exceptions.HTTPError as e:
                print(f"OMDB Received response: {e.response.text}")
            all_movies.append(merged)
    return all_movies

def fetch_all_credits(start_id: int, last_id: int) -> List[Dict]:
    """Loops through a range of movie IDs, collecting movie credits data from TheMovieDB API.
    
    Args:
        start_id (int): The starting movie ID.
        last_id (int): The ending movie ID.
        
    Returns:
        list: A list of movie credits data dictionaries.
    """
    all_credits = [] 
    for id in range(start_id, last_id + 1):  
        credit = get_movie_credits_by_id(id)
        if credit is not None:
            all_credits.append(credit)
    return all_credits


Get the lastest avaliable movie id from TMDB API

In [3]:
last_movie = get_latest_movie()['id']
last_movie

1193709

In [12]:
# Set the ID of the last movie to be fetched
last_movie = 43000

# Set the ID of the movie to start fetching
start_movie = 40001

# Fetch all movies metadata from ID start_movie to last_movie
all_movies = fetch_all_movies(start_movie, last_movie)

df = pd.json_normalize(all_movies)
df.to_parquet(f"../tmdb/movies_{last_movie}.parquet", compression='gzip')


No movie found for ID: 40050
No movie found for ID: 40051
No movie found for ID: 40063
No movie found for ID: 40068
No movie found for ID: 40086
No movie found for ID: 40098
No movie found for ID: 40101
No movie found for ID: 40116
No movie found for ID: 40117
No movie found for ID: 40128
No movie found for ID: 40133
No movie found for ID: 40138
No movie found for ID: 40143
No movie found for ID: 40148
No movie found for ID: 40152
No movie found for ID: 40174
No movie found for ID: 40176
No movie found for ID: 40178
No movie found for ID: 40182
No movie found for ID: 40183
No movie found for ID: 40184
No movie found for ID: 40189
No movie found for ID: 40193
No movie found for ID: 40245
No movie found for ID: 40259
No movie found for ID: 40265
No movie found for ID: 40267
No movie found for ID: 40335
No movie found for ID: 40379
No movie found for ID: 40381
No movie found for ID: 40384
No movie found for ID: 40385
No movie found for ID: 40395
No movie found for ID: 40396
No movie found

In [15]:
# Set the ID of the last movie to be fetched
last_movie = 43000

# Set the ID of the movie to start fetching
start_movie = 24500

# Fetch all credits from ID start_movie to last_movie
all_movies = fetch_all_credits(start_movie, last_movie)

df = pd.json_normalize(all_movies)
df.to_parquet(f"../credits/credits_{last_movie}.parquet", compression='gzip')


No credits found for ID: 24501
No credits found for ID: 24507
No credits found for ID: 24509
No credits found for ID: 24511
No credits found for ID: 24529
No credits found for ID: 24530
No credits found for ID: 24531
No credits found for ID: 24532
No credits found for ID: 24533
No credits found for ID: 24534
No credits found for ID: 24535
No credits found for ID: 24536
No credits found for ID: 24537
No credits found for ID: 24538
No credits found for ID: 24539
No credits found for ID: 24540
No credits found for ID: 24541
No credits found for ID: 24542
No credits found for ID: 24543
No credits found for ID: 24544


In [13]:
import pandas as pd
import pyarrow.parquet as pq
import os

def read_parquet_files_to_dataframe(directory_path: str) -> pd.DataFrame:
    """Reads all parquet files from a specified directory and merges them into a single DataFrame.
    
    Args:
        directory_path (str): The path to the directory containing the parquet files.
        
    Returns:
        pd.DataFrame: A DataFrame containing the merged data from all parquet files.
    """
    # List all files in the directory
    files = [f for f in os.listdir(directory_path) if f.endswith('.parquet')]
    
    dataframes = []
    
    # Loop through the files and read each one into a DataFrame
    for file in files:
        file_path = os.path.join(directory_path, file)
        parquet_file = pq.ParquetFile(file_path)
        df = parquet_file.read().to_pandas()
        dataframes.append(df)
    
    # Concatenate all the DataFrames into a single DataFrame
    merged_dataframe = pd.concat(dataframes, ignore_index=True)
    
    return merged_dataframe



In [14]:
# MOVIES
# Call the function to read and merge all parquet files in the specified directory into a DataFrame
movies_dataframe = read_parquet_files_to_dataframe('../tmdb/')
# Remove duplicate rows based on the 'id' column of the DataFrame
movies_dataframe=movies_dataframe.drop_duplicates(subset=['id'])
# TODO: remove this line and tmdb and credits folder
movies_dataframe = movies_dataframe[(movies_dataframe['revenue'] != 0) & (movies_dataframe['budget'] != 0)
                                    & (movies_dataframe['status'] == 'Released')]
# Save the deduplicated DataFrame to a new Parquet file with gzip compression in the specified directory
movies_dataframe.to_parquet('../data/movies.parquet', compression='gzip')

In [None]:
# CREDITS
# Call the function to read and merge all parquet files in the specified directory into a DataFrame
credits_dataframe = read_parquet_files_to_dataframe('../credits/')
# Remove duplicate rows based on the 'id' column of the DataFrame
credits_dataframe=credits_dataframe.drop_duplicates(subset=['id'])
# Save the deduplicated DataFrame to a new Parquet file with gzip compression in the specified directory
credits_dataframe.to_parquet('../data/credits.parquet', compression='gzip')