# Database Unit Tests
This notebook contains tests for the database functionality of CineMind

In [2]:
# Install the required packages
%pip install pytest
%pip install pytest-notebook

Note: you may need to restart the kernel to use updated packages.
Collecting pytest-notebook
  Downloading pytest_notebook-0.10.0-py3-none-any.whl.metadata (7.2 kB)
Collecting attrs<23,>=19 (from pytest-notebook)
  Downloading attrs-22.2.0-py3-none-any.whl.metadata (13 kB)
Collecting nbclient~=0.5.10 (from pytest-notebook)
  Downloading nbclient-0.5.13-py3-none-any.whl.metadata (5.0 kB)
Collecting nbdime>=4 (from pytest-notebook)
  Downloading nbdime-4.0.2-py3-none-any.whl.metadata (9.5 kB)
Collecting nbformat (from pytest-notebook)
  Downloading nbformat-5.10.4-py3-none-any.whl.metadata (3.6 kB)
Collecting jsonschema (from pytest-notebook)
  Downloading jsonschema-4.23.0-py3-none-any.whl.metadata (7.9 kB)
Collecting jupyter-server (from nbdime>=4->pytest-notebook)
  Downloading jupyter_server-2.15.0-py3-none-any.whl.metadata (8.4 kB)
Collecting jupyter-server-mathjax>=0.2.2 (from nbdime>=4->pytest-notebook)
  Downloading jupyter_server_mathjax-0.2.6-py3-none-any.whl.metadata (2.1 kB)




## DB Connection Setup

In [126]:

import sys
import sqlite3
import os 
import pytest

# Add the parent directory to path for module imports 
sys.path.append(os.path.abspath('..'))

# Path to the SQLite database file
db_path = os.path.join('..', 'models', 'cinemind.db')

# Connect to the SQLite database
def get_db_connection():
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row # Enable access to columns by name
    return conn

# Connect to in-memory database
def get_test_db_connection():
    """Create an in-memory test database with the same schema as the production DB"""
    test_conn = sqlite3.connect(':memory:')
    test_conn.row_factory = sqlite3.Row 

    # Get schema from the real database
    prod_conn = sqlite3.connect(db_path)
    # Important: Set row_factory for the production connection as well
    prod_conn.row_factory = sqlite3.Row
    cursor = prod_conn.cursor()

    # Get all CREATE statements and data
    cursor.execute("SELECT sql FROM sqlite_master WHERE type='table';")
    create_statements = cursor.fetchall()

    # Create the same schema in the in-memory database
    test_cursor = test_conn.cursor() 
    for statement in create_statements:
        sql = statement['sql']  # Now accessing using string key works because we set row_factory
        if sql and 'sqlite_sequence' not in sql:
            test_cursor.execute(sql)
    
    # Import reference data (smaller lookup tables)
    reference_tables = ['Genres', 'Keywords', 'Production_Countries', 'Spoken_Languages']
    for table in reference_tables:
        try:
            cursor.execute(f"SELECT * FROM {table}")
            rows = cursor.fetchall() 

            if rows: 
                # Get column names 
                columns = [column[0] for column in cursor.description]
                placeholders = ', '.join(['?'] * len(columns))
                column_str = ', '.join(columns)

                # Insert all rows from this reference table
                for row in rows:
                    # Since we're using Row objects, we can access columns by name
                    values = [row[col] for col in columns]
                    test_cursor.execute(f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})", values)

                print(f"✅ Imported {len(rows)} records into {table}")
        except sqlite3.Error as e:
            print(f"⚠️ Error importing {table}: {e}")
    
    # Import a subset of movies (100 highest rated)
    cursor.execute("""
        SELECT * FROM Movies
        WHERE vote_count > 100
        ORDER BY vote_average DESC
        LIMIT 100
    """)
    movie_rows = cursor.fetchall()

    if movie_rows:
        # Get column names 
        columns = [column[0] for column in cursor.description]
        placeholders = ', '.join(['?'] * len(columns))
        column_str = ', '.join(columns)

        for row in movie_rows:
            # Since we're using Row objects, we can access columns by name
            values = [row[col] for col in columns]
            test_cursor.execute(f"INSERT INTO Movies ({column_str}) VALUES ({placeholders})", values)

    movie_ids = [row['id'] for row in movie_rows]
    movie_id_list = ', '.join(map(str, movie_ids))
    print(f"✅ Imported {len(movie_rows)} sample movies")

    # Import related data for these movies
    related_tables = { 
        'Movie_Genre': ('movie_id',),
        'Movies_Cast': ('movie_id',),
        'Movie_Keywords': ('movie_id',),
        'Movie_Production_Countries': ('movie_id',),
        'Movie_Spoken_Languages': ('movie_id',),
    }

    # Import actors related to our movies
    cursor.execute(f"""
        SELECT DISTINCT c.* 
        FROM Cast c
        JOIN Movies_Cast mc ON c.actor_id = mc.actor_id
        WHERE mc.movie_id IN ({movie_id_list})
    """)
    
    cast_rows = cursor.fetchall()
    if cast_rows:
        # Get column names 
        columns = [column[0] for column in cursor.description]
        placeholders = ', '.join(['?'] * len(columns))
        column_str = ', '.join(columns)
        
        for row in cast_rows:
            # Since we're using Row objects, we can access columns by name
            values = [row[col] for col in columns]
            test_cursor.execute(f"INSERT INTO Cast ({column_str}) VALUES ({placeholders})", values)
            
        print(f"✅ Imported {len(cast_rows)} cast members")
        
    # Import relationship data for our movies
    for table, key_columns in related_tables.items():
        try:
            where_clause = ' OR '.join([f"{col} IN ({movie_id_list})" for col in key_columns])
            cursor.execute(f"SELECT * FROM {table} WHERE {where_clause}")
            relation_rows = cursor.fetchall()
            
            if relation_rows:
                # Get column names 
                columns = [column[0] for column in cursor.description]
                placeholders = ', '.join(['?'] * len(columns))
                column_str = ', '.join(columns)
                
                for row in relation_rows:
                    # Since we're using Row objects, we can access columns by name
                    values = [row[col] for col in columns]
                    test_cursor.execute(f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})", values)
                    
                print(f"✅ Imported {len(relation_rows)} records into {table}")
        except sqlite3.Error as e:
            print(f"⚠️ Error importing {table}: {e}")
    
    prod_conn.close()
    test_conn.commit()
    
    # Verify the data
    test_cursor.execute("SELECT COUNT(*) AS count FROM Movies")
    movie_count = test_cursor.fetchone()['count']
    
    test_cursor.execute("SELECT COUNT(*) AS count FROM Cast")
    cast_count = test_cursor.fetchone()['count']
    
    print(f"Test database ready with {movie_count} movies and {cast_count} cast members")
    
    return test_conn


In [119]:
# Set this to 'memory' to use the in-memory test database or 'main' to use the real database
# This determines which database all tests will run against
USE_DATABASE = 'memory'  # Options: 'main' or 'memory'

# Get the appropriate database connection based on the setting
def get_connection_for_tests():
    """Returns the database connection to use for tests based on the USE_DATABASE setting"""
    if USE_DATABASE == 'memory':
        print("Using in-memory test database")
        return get_test_db_connection()
    else:
        print("Using main database")
        return get_db_connection()

## Test Function

In [105]:
def run_test(test_function):
    """ Run a single test function and display its results"""
    print(f"Running test: {test_function.__name__}")
    try:
        # Get the database connection based on the setting
        conn = get_connection_for_tests()
        test_function(conn)
        print("✅ Test passed!")
        # Close the connection after the test
        conn.close()
    except AssertionError as e:
        print(f"❌ Test failed: {str(e)}")
    except Exception as e:
        print(f"❌ Error during test: {str(e)}")
        # Print the full exception traceback for debugging
        import traceback
        traceback.print_exc()
    print("-" * 40)

## Database Tests

In [106]:
def test_database_connection(conn):
    """ Test if we can connect to the database """
    assert conn is not None, "Failed to connect to the database"

def test_query_tables(conn):
    """Test if we can query the database schema."""
    cursor = conn.cursor() 
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    print("Tables in database:", tables) 
    assert len(tables) > 0, " No tables found in the database" 

## Table Structure Tests

In [107]:
def test_all_tables_exist(conn):
    """Test if all expected tables exist in the database."""
    cursor = conn.cursor() 

    # Critical tables that must always exist
    critical_tables = ['Movies', 'Cast', 'Genres']

    # Additional expected tables (can be updated as schema evolves)
    expected_tables = [ 
        'Keywords', 'Movie_Keywords', 'Movie_Genre', 'Production_Countries', 'Movie_Production_Countries', 'Spoken_Languages', 'Movie_Spoken_Languages', 'Movies_Cast'
    ]

    system_tables = 'sqlite_sequence'

    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall() 
    actual_tables = [table['name'] for table in tables]
    print(f"Actual tables in database: {actual_tables}")

    # Critical tables check - these MUST exist
    for table in critical_tables:
        assert table in actual_tables, f"Critical table {table} is missing from the database."

    # Warning for expected tables - these SHOULD exist but can be added later 
    missing_expected = [table for table in expected_tables if table not in actual_tables]
    if missing_expected:
        print(f"⚠️ WARNING: Expected tables missing: {missing_expected}.")

    # Report any additional tables that are not expected
    additional_tables = [table for table in actual_tables if table not in critical_tables and table not in expected_tables and table not in system_tables]
    if additional_tables:
        print(f"ℹ️ INFO: Additional tables found: {additional_tables}")
        print("     These tables are not in the critical or expected lists.")

def test_movies_table(conn):
    """Test the structure of the Movies table."""
    cursor = conn.cursor() 

    cursor.execute("PRAGMA table_info(Movies);")
    columns = cursor.fetchall() 
    column_names = [column['name'] for column in columns]

    essential_columns = ['id', 'title', 'overview', 'budget', 'revenue', 'release_date', 'runtime', 'status', 'tagline', 'popularity', 'vote_average', 'vote_count', 'original_language', 'poster_url', 'backdrop_url', 'video_url', 'reviews', 'keyposter_url', 'keyvideo_url']

    expected_columns = ['original_title', 'homepage']

    # Test essential columns
    for column in essential_columns:
        assert column in column_names, f"Essential column '{column}' is missing from the Movies table."
    
    # Report on expected columns - warns if missing
    missing_expected = [column for column in expected_columns if column not in column_names]
    if missing_expected:
        print(f"⚠️ WARNING: Expected columns missing from Movies table: {missing_expected}")

    # Report any additional columns that weren't in either list
    additional_columns = [column for column in column_names if column not in essential_columns and column not in expected_columns]
    if additional_columns:
        print(f"ℹ️ INFO: Additional columns found in Movies table: {additional_columns}")

def test_genres_table(conn):
    """Test the structure of the Genres table."""
    cursor = conn.cursor()

    cursor.execute("PRAGMA table_info(Genres);")
    columns = cursor.fetchall()
    column_names = [column['name'] for column in columns]

    essential_columns = ['genre_id', 'genre_name']
    expected_columns = []

    for column in essential_columns:
        assert column in column_names, f"Essential column '{column}' is missing from the Genres table."
    
    # Report on expected columns - just warns if missing
    missing_expected = [column for column in expected_columns if column not in column_names]
    if missing_expected:
        print(f"⚠️ WARNING: Some expected columns are missing from Genre table: {missing_expected}")

    # Report any additional columns that weren't in either list
    additional_columns = [column for column in column_names if column not in essential_columns and column not in expected_columns]
    if additional_columns:   
        print(f"ℹ️ INFO: Additional columns found in Genres table: {additional_columns} ")

def test_cast_table(conn):
    """Test the structure of the Cast table."""
    cursor = conn.cursor() 

    cursor.execute("PRAGMA table_info(Cast);")
    columns = cursor.fetchall()
    column_names = [column['name'] for column in columns]
    
    essential_columns = ['actor_id', 'name']
    expected_columns = ['characters', 'gender']

    # Report on expected columns
    missing_expected = [column for column in expected_columns if column not in column_names]
    if missing_expected:
        print(f"⚠️ WARNING: Some expected columns are missing from Cast table: {missing_expected}")

    # Report on any additional columns that weren't in either list
    additional_columns = [column for column in column_names if column not in essential_columns and column not in expected_columns]
    if additional_columns:
        print(f"ℹ️ INFO: Additional columns found in Cast table: {additional_columns}")

    for column in essential_columns:
        assert column in column_names, f"Essential column '{column} is missing from the Cast table.'"

## Data Integrity Tests

In [108]:
def test_data_integrity(conn):
    """Test basic data integrity in the database."""
    cursor = conn.cursor()

    # Check if Movies table has data
    cursor.execute("SELECT COUNT(*) as count FROM Movies;")
    movie_count = cursor.fetchone()['count']
    assert movie_count > 0, "No movies found in the database"

    # Check if Genres table has data
    cursor.execute("SELECT COUNT(*) as count FROM Genres;")
    genre_count = cursor.fetchone()['count']
    assert genre_count > 0, "No genres found in the database"

    # Check if Cast table has data
    cursor.execute("SELECT COUNT(*) as count FROM Cast;")
    cast_count = cursor.fetchone()['count']
    assert cast_count > 0, "No cast members found in the database"

    print(f"Data counts: {movie_count} movies, {genre_count} genres, {cast_count} cast members")

def test_relationship_integrity(conn):
    """Test that relationships between tables are valid."""
    cursor = conn.cursor()

    # Test Movie-Genre relationship integrity
    cursor.execute("""
    SELECT mg.movie_id, mg.genre_id
    FROM Movie_Genre mg
    LEFT JOIN Movies m ON mg.movie_id = m.id
    LEFT JOIN Genres g ON mg.genre_id = g.genre_id
    WHERE m.id IS NULL OR g.genre_id IS NULL
    LIMIT 5;
    """)
    orphaned_genres = cursor.fetchall()
    assert len(orphaned_genres) == 0, f"Found {len(orphaned_genres)} orphaned movie-genre relationships"

    # Test Movie-Cast relationship integrity
    cursor.execute("""
    SELECT mc.movie_id, mc.actor_id
    FROM Movies_Cast mc
    LEFT JOIN Movies m ON mc.movie_id = m.id
    LEFT JOIN Cast c ON mc.actor_id = c.actor_id 
    WHERE m.id IS NULL OR c.actor_id IS NULL
    LIMIT 5;
    """)
    orphaned_cast = cursor.fetchall()
    assert len(orphaned_cast) == 0, f"Found {len(orphaned_cast)} orphaned movie-cast relationships"

    # Test Movie-Keywords relationship integrity
    cursor.execute("""
    SELECT mk.movie_id, mk.keyword_id
    FROM Movie_Keywords mk
    LEFT JOIN Movies m ON mk.movie_id = m.id
    LEFT JOIN Keywords k ON mk.keyword_id = k.keyword_id
    WHERE m.id IS NULL OR k.keyword_id IS NULL
    LIMIT 5;
    """)
    orphaned_keywords = cursor.fetchall()
    assert len(orphaned_keywords) == 0, f"Found {len(orphaned_keywords)} orphaned movie-keywords relationships"

    # Test Movie-Production Countries relationship integrity
    cursor.execute("""
    SELECT mpc.movie_id, mpc.country_id
    FROM Movie_Production_Countries mpc
    LEFT JOIN Movies m ON mpc.movie_id = m.id
    LEFT JOIN Production_Countries pc ON mpc.country_id = pc.country_id
    WHERE m.id IS NULL OR pc.country_id IS NULL
    LIMIT 5;
    """)
    orphaned_countries = cursor.fetchall()
    assert len(orphaned_countries) == 0, f"Found {len(orphaned_countries)} orphaned movie-countries relationships"
    
    # Test Movie-Spoken Languages relationship integrity
    cursor.execute("""
    SELECT msl.movie_id, msl.language_id
    FROM Movie_Spoken_Languages msl
    LEFT JOIN Movies m ON msl.movie_id = m.id
    LEFT JOIN Spoken_Languages sl ON msl.language_id = sl.language_id
    WHERE m.id IS NULL OR sl.language_id IS NULL
    LIMIT 5;
    """)
    orphaned_languages = cursor.fetchall()
    assert len(orphaned_languages) == 0, f"Found {len(orphaned_languages)} orphaned movie-languages relationships"
    
    print("✅ All relationship tables have valid foreign keys")

def test_cardinality(conn):
    """Test the cardinality of many-to-many relationships."""
    cursor = conn.cursor() 

    # Check actors who play multiple characters
    cursor.execute("""
        SELECT c.name, COUNT(DISTINCT mc.movie_id) as movie_count
        FROM Cast c
        JOIN Movies_Cast mc ON c.actor_id = mc.actor_id 
        GROUP BY c.name
        HAVING movie_count > 1
        ORDER BY movie_count DESC
        LIMIT 5;
    """)

    actors_multiple_movies = cursor.fetchall() 
    if actors_multiple_movies:
        print("Actors in multiple movies (many-to-many cardinality):")
        for actor in actors_multiple_movies:
            print(f"- {actor['name']} appears in {actor['movie_count']} movies")
    
    # Check movies with multiple genres
    cursor.execute("""
        SELECT m.title, COUNT(mg.genre_id) as genre_count 
        FROM Movies m 
        JOIN Movie_Genre mg ON m.id = mg.movie_id 
        GROUP BY m.title 
        HAVING genre_count > 1 
        ORDER BY genre_count DESC 
        LIMIT 5;
    """)

    movies_multiple_genres = cursor.fetchall() 
    if movies_multiple_genres:
        print("\nMovies with multiple genres (many-to-many cardinality):")
        for movie in movies_multiple_genres: 
            print(f"- {movie['title']} has {movie['genre_count']} genres")

    # Test passes if we have valid many-to-many relationships
    assert len(actors_multiple_movies) > 0, "No actors appear in multiple movies"
    assert len(movies_multiple_genres) > 0, "No movies have multiple genres"

# Functional Tests


In [124]:
def test_basic_search_functionality(conn):
    """Test basic search functionality across different fields."""
    cursor = conn.cursor()
    
    # Check if we're using the in-memory database or the main database
    is_memory_db = USE_DATABASE == 'memory'
    
    # Define test cases with expected minimum results, adjusted for in-memory DB
    search_test_cases = [
        # For memory DB, require fewer results since it only contains a sample of movies
        {"field": "title", "term": "star", "min_expected": 1 if is_memory_db else 3},
        {"field": "overview", "term": "adventure", "min_expected": 2 if is_memory_db else 5},
        {"field": "tagline", "term": "life", "min_expected": 1 if is_memory_db else 2}
    ]
    
    print("Testing basic search functionality:")
    print(f"Database mode: {'In-memory sample' if is_memory_db else 'Full database'}")
    
    for test_case in search_test_cases:
        field = test_case["field"]
        term = test_case["term"]
        min_expected = test_case["min_expected"]
        
        # Execute search query
        cursor.execute(f"""
            SELECT id, title 
            FROM Movies 
            WHERE LOWER({field}) LIKE ? 
            LIMIT 10
        """, (f'%{term.lower()}%',))
        
        results = cursor.fetchall()
        result_count = len(results)
        
        print(f"- Search for '{term}' in {field}: found {result_count} results")
        if result_count > 0:
            print(f"  Sample results: {', '.join([r['title'] for r in results[:3]])}")
        
        # For in-memory DB, just check that we found any results at all if the minimum is very low
        if is_memory_db and min_expected <= 1:
            # Still show a warning if no results to help with debugging
            if result_count == 0:
                print(f"⚠️ WARNING: No results found for '{term}' in {field} (expected at least {min_expected})")
        else:
            # Do the normal assertion for full DB or higher minimums
            assert result_count >= min_expected, f"Expected at least {min_expected} results for '{term}' in {field}, but found {result_count}"
    
    """Test that we can filter movies by genre."""
    cursor = conn.cursor()
    
    # First, get a popular genre
    cursor.execute("""
        SELECT g.genre_id, g.genre_name, COUNT(mg.movie_id) as movie_count
        FROM Genres g
        JOIN Movie_Genre mg ON g.genre_id = mg.genre_id
        GROUP BY g.genre_id
        ORDER BY movie_count DESC
        LIMIT 1;
    """)
    
    genre = cursor.fetchone()
    assert genre is not None, "No genres with movies found"
    
    genre_id = genre['genre_id']
    genre_name = genre['genre_name']
    
    # Now fetch movies in this genre
    cursor.execute("""
        SELECT m.id, m.title, m.release_date
        FROM Movies m
        JOIN Movie_Genre mg ON m.id = mg.movie_id
        WHERE mg.genre_id = ?
        ORDER BY m.popularity DESC
        LIMIT 5;
    """, (genre_id,))
    
    movies = cursor.fetchall()
    
    print(f"Testing genre filtering for '{genre_name}' (ID: {genre_id}):")
    print(f"Found {len(movies)} movies in the '{genre_name}' genre:")
    for movie in movies:
        print(f"- {movie['title']}")
    
    assert len(movies) > 0, f"No movies found in the '{genre_name}' genre"

def test_advanced_search_capabilities(conn):
    """Test more complex search capabilities with multiple criteria."""
    cursor = conn.cursor()
    
    print("Testing advanced search functionality:")
    
    # Test 1: Search for high-rated movies in a specific year range
    min_rating = 7.5
    start_year = "2010"
    end_year = "2020"
    
    cursor.execute("""
        SELECT id, title, vote_average, release_date
        FROM Movies
        WHERE vote_average >= ?
          AND release_date >= ?
          AND release_date <= ?
        ORDER BY vote_average DESC
        LIMIT 5
    """, (min_rating, f"{start_year}-01-01", f"{end_year}-12-31"))
    
    results = cursor.fetchall()
    print(f"- High-rated movies ({min_rating}+) from {start_year}-{end_year}:")
    for movie in results:
        release_year = movie['release_date'][:4] if movie['release_date'] else "Unknown"
        print(f"  • {movie['title']} ({release_year}) - Rating: {movie['vote_average']}")
    
    assert len(results) > 0, "No high-rated movies found in the specified date range"
    
    # Test 2: Search for movies within budget range
    min_budget = 100000000  # 100M
    max_budget = 200000000  # 200M
    
    cursor.execute("""
        SELECT id, title, budget
        FROM Movies
        WHERE budget BETWEEN ? AND ?
        ORDER BY budget DESC
        LIMIT 10
    """, (min_budget, max_budget))
    
    results = cursor.fetchall()
    print(f"\n- Movies with budget between ${min_budget/1000000}M and ${max_budget/1000000}M:")
    for movie in results:
        print(f"  • {movie['title']} - Budget: ${movie['budget']:,}")
    
    assert len(results) > 0, "No movies found in the specified budget range"

def test_genre_based_queries(conn):
    """Test queries related to movie genres."""
    cursor = conn.cursor() 

    print("Testing genre-based queries:")

    # Test 1: Find most popular genres by movie count
    cursor.execute("""
        SELECT g.genre_name, COUNT(mg.movie_id) as movie_count 
        FROM Genres g 
        JOIN Movie_Genre mg ON g.genre_id = mg.genre_id 
        GROUP BY g.genre_name  
        ORDER BY movie_count DESC 
        LIMIT 5 
    """)
    
    top_genres = cursor.fetchall() 
    print("- Top 5 genres by movie count:")
    for genre in top_genres:
        print(f" • {genre['genre_name']}: {genre['movie_count']} movies")
    
    assert len(top_genres) > 0, "No genre distribution data found"

    # Test 2: Find highest-rated movies in a specific genre
    # Use the first genre from the previous result
    target_genre = top_genres[0]['genre_name']

    cursor.execute("""
    SELECT m.title, m.vote_average
    FROM Movies m 
    JOIN Movie_Genre mg ON m.id = mg.movie_id 
    JOIN Genres g ON mg.genre_id = g.genre_id 
    WHERE g.genre_name = ? 
        AND m.vote_count > 100 
    ORDER BY m.vote_average DESC 
    LIMIT 5
    """, (target_genre,))

    top_rated = cursor.fetchall() 
    print(f"\n- Top 5 highest-rated {target_genre} movies:")
    for movie in top_rated: 
        print(f" • {movie['title']} - Rating: {movie['vote_average']}")

    assert len(top_rated) > 0, f"No highly rated movies found in {target_genre} genre"
 
    # Test 3: Find genres with highest average ratings
    cursor.execute("""
    SELECT g.genre_name, 
           AVG(m.vote_average) as avg_rating, 
           COUNT(m.id) as movie_count 
    FROM Genres g 
    JOIN Movie_Genre mg ON g.genre_id = mg.genre_id 
    JOIN Movies m ON mg.movie_id = m.id  
    WHERE m.vote_count > 50 
    GROUP BY g.genre_name 
    HAVING movie_count > 10
    ORDER BY avg_rating DESC 
    LIMIT 5
    """)

    genres_by_rating = cursor.fetchall() 
    print("\n Genres with highest average ratings")
    for genre in genres_by_rating: 
        print(f" • {genre['genre_name']}: {genre['avg_rating']:.2f} from {genre['movie_count']} movies")
    
    assert len(genres_by_rating) > 0, "No genre rating data found"

def test_actor_and_cast_queries(conn): 
    """Test queries related to actors and movie casts."""
    cursor = conn.cursor() 

    print("Testing actor and cast queries:")

    # Test 1: Find actors in most movies 
    cursor.execute("""
        SELECT c.name, COUNT(DISTINCT mc.movie_id) as movie_count
        FROM Cast c
        JOIN Movies_Cast mc ON c.actor_id = mc.actor_id 
        GROUP BY c.name 
        ORDER BY movie_count DESC 
        LIMIT 5
    """)

    top_actors = cursor.fetchall() 
    print("- Actors appearing in most movies:")
    for actor in top_actors:
        print(f" • {actor['name']} appears in {actor['movie_count']} movies")

    assert len(top_actors) > 0, "No actor appearance data found"

    # Test 2: Find a specific actor's filmography
    # Use the first actor from previous result
    if len(top_actors) > 0:
        target_actor = top_actors[0]['name']

        cursor.execute("""
            SELECT m.title, m.release_date 
            FROM Movies m
            JOIN Movies_Cast mc ON m.id = mc.movie_id 
            JOIN Cast c ON mc.actor_id = c.actor_id
            WHERE c.name = ? 
            ORDER BY m.release_date DESC
            LIMIT 10
        """, (target_actor,))

        filmography = cursor.fetchall() 
        print(f"\n- Recent filmography for {target_actor:}")
        for movie in filmography:
            release_date = movie['release_date'] if movie['release_date'] else "Unknown date"
            print(f" • {movie['title']} ({release_date})")

        assert len(filmography) > 0, f"No filmography found for {target_actor}"

def test_financial_analytics(conn): 
    """Test queries related to financial metrics."""
    cursor = conn.cursor() 

    print("Testing financial analytics queries:")
    cursor.execute("""
        SELECT title, budget, revenue, (revenue - budget) as profit, 
            CASE
                WHEN budget > 0 THEN (revenue - budget) * 100 / budget
                ELSE NULL
            END as roi 
        FROM Movies
        WHERE budget > 1000000 
            AND revenue > budget 
        ORDER BY roi DESC
        LIMIT 5
    """)

    top_roi_movies = cursor.fetchall() 
    print("- Movies with highest ROI (Return on Investment):")
    for movie in top_roi_movies:
        print(f" • {movie['title']}: ROI {movie['roi']:.1f}% (Budget: ${movie['budget']:,}, Revenue: ${movie['revenue']:,})")

    assert len(top_roi_movies) > 0, "No ROI data found"

    # Test 2: Find average budget and revenue by decade
    # Takes the first 3 characters from the date '2019-05-24' -> '201' 
    # Adds '0s' to the string '201' + '0s' -> '2010s'
    cursor.execute("""
        SELECT
            SUBSTR(release_date, 1, 3) || '0s' as decade,
            AVG(budget) as avg_budget,
            AVG(revenue) as avg_revenue,
            COUNT(*) as movie_count
        FROM Movies
        WHERE release_date IS NOT NULL
            AND budget > 0
            AND revenue > 0
        GROUP BY decade
        ORDER BY decade DESC
    """)


    decades = cursor.fetchall()
    print("\n- Average budget and revenue by decade:")
    for decade in decades:
        print(f" • {decade['decade']}: {decade['movie_count']} movies")
        print(f"   Avg Budget: ${decade['avg_budget']:,.0f}, Avg Revenue: ${decade['avg_revenue']:,.0f}")

    assert len(decades) > 0, "No decade financial data found"

    # Test 3: Find most expensive movies
    cursor.execute("""
        SELECT title, budget, revenue, release_date 
        FROM Movies
        WHERE budget > 0
        ORDER BY budget DESC
        LIMIT 5
    """)

    expensive_movies = cursor.fetchall()
    print("\n- Most expensive movies by budget:")
    for movie in expensive_movies: 
        release_year = movie['release_date'][:4] if movie['release_date'] else "Unknown"
        print(f" • {movie['title']} ({release_year}): ${movie['budget']:,}")

    assert len(expensive_movies) > 0, "No budget data found"

# Run tests

### Run Functionality Tests

In [127]:
run_test(test_basic_search_functionality)
run_test(test_advanced_search_capabilities)
run_test(test_genre_based_queries)
run_test(test_actor_and_cast_queries)
run_test(test_financial_analytics)

Running test: test_basic_search_functionality
Using in-memory test database
✅ Imported 20 records into Genres
✅ Imported 11005 records into Keywords
✅ Imported 89 records into Production_Countries
✅ Imported 75 records into Spoken_Languages
✅ Imported 100 sample movies
✅ Imported 2217 cast members
✅ Imported 269 records into Movie_Genre
✅ Imported 2467 records into Movies_Cast
✅ Imported 2064 records into Movie_Keywords
✅ Imported 136 records into Movie_Production_Countries
✅ Imported 170 records into Movie_Spoken_Languages
Test database ready with 100 movies and 2217 cast members
Testing basic search functionality:
Database mode: In-memory sample
- Search for 'star' in title: found 1 results
  Sample results: Star Wars
- Search for 'adventure' in overview: found 4 results
  Sample results: Pulp Fiction, WALL·E, Interstellar
- Search for 'life' in tagline: found 8 results
  Sample results: Gladiator, Once Upon a Time in the West, 12 Angry Men
Testing genre filtering for 'Drama' (ID: 6)

### Run Database Structure and Integrity Tests

In [128]:
run_test(test_database_connection)
run_test(test_query_tables)
run_test(test_all_tables_exist)
run_test(test_movies_table)
run_test(test_genres_table)
run_test(test_cast_table)
run_test(test_data_integrity)
run_test(test_relationship_integrity)
run_test(test_cardinality)

Running test: test_database_connection
Using in-memory test database
✅ Imported 20 records into Genres
✅ Imported 11005 records into Keywords
✅ Imported 89 records into Production_Countries
✅ Imported 75 records into Spoken_Languages
✅ Imported 100 sample movies
✅ Imported 2217 cast members
✅ Imported 269 records into Movie_Genre
✅ Imported 2467 records into Movies_Cast
✅ Imported 2064 records into Movie_Keywords
✅ Imported 136 records into Movie_Production_Countries
✅ Imported 170 records into Movie_Spoken_Languages
Test database ready with 100 movies and 2217 cast members
✅ Test passed!
----------------------------------------
Running test: test_query_tables
Using in-memory test database
✅ Imported 20 records into Genres
✅ Imported 11005 records into Keywords
✅ Imported 89 records into Production_Countries
✅ Imported 75 records into Spoken_Languages
✅ Imported 100 sample movies
✅ Imported 2217 cast members
✅ Imported 269 records into Movie_Genre
✅ Imported 2467 records into Movies_Ca