diff --git a/server/python/main.py b/server/python/main.py index 9dbeeb4..61e6ca4 100644 --- a/server/python/main.py +++ b/server/python/main.py @@ -1,33 +1,28 @@ +from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from src.routers import movies from src.utils.errorHandler import register_error_handlers from src.database.mongo_client import db, get_collection -import traceback + import os from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() -app = FastAPI() -# Add CORS middleware -cors_origins = os.getenv("CORS_ORIGINS", "http://localhost:3000,http://localhost:3001").split(",") -app.add_middleware( - CORSMiddleware, - allow_origins=[origin.strip() for origin in cors_origins], # Load from environment variable - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -register_error_handlers(app) -app.include_router(movies.router, prefix="/api/movies", tags=["movies"]) +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup: Create search indexes + await ensure_search_index() + await vector_search_index() + yield + # Shutdown: Clean up resources if needed + # Add any cleanup code here -@app.on_event("startup") -async def initialize_database_indexes(): +async def ensure_search_index(): try: movies_collection = db.get_collection("movies") comments_collection = db.get_collection("comments") @@ -37,70 +32,43 @@ async def initialize_database_indexes(): indexes = [idx async for idx in result] index_names = [index["name"] for index in indexes] if "movieSearchIndex" in index_names: - print("MongoDB Search index already exists.") - else: - # Create a mapping if the movieSearchIndex does not exist - index_definition = { - "mappings": { - "dynamic": False, - "fields": { - "plot": {"type": "string", "analyzer": "lucene.standard"}, - "fullplot": {"type": "string", "analyzer": "lucene.standard"}, - "directors": {"type": "string", "analyzer": "lucene.standard"}, - "writers": {"type": "string", "analyzer": "lucene.standard"}, - "cast": {"type": "string", "analyzer": "lucene.standard"} - } + return + + # Create a mapping if the movieSearchIndex does not exist + index_definition = { + "mappings": { + "dynamic": False, + "fields": { + "plot": {"type": "string", "analyzer": "lucene.standard"}, + "fullplot": {"type": "string", "analyzer": "lucene.standard"}, + "directors": {"type": "string", "analyzer": "lucene.standard"}, + "writers": {"type": "string", "analyzer": "lucene.standard"}, + "cast": {"type": "string", "analyzer": "lucene.standard"} } } - # Creates movieSearchIndex on the movies collection - await db.command({ - "createSearchIndexes": "movies", - "indexes": [{ - "name": "movieSearchIndex", - "definition": index_definition - }] - }) - print("MongoDB Search index created.") - - # Check and create index on movie_id field in comments collection - # This index will significantly improve $lookup performance in aggregations - cursor = await comments_collection.list_indexes() - existing_indexes = await cursor.to_list(length=None) - movie_id_index_exists = any( - "movie_id" in index.get("key", {}) for index in existing_indexes - ) - - if not movie_id_index_exists: - # Create index on movie_id field for better aggregation performance - await comments_collection.create_index("movie_id") - print("Index on 'movie_id' field in comments collection created.") - else: - print("Index on 'movie_id' field in comments collection already exists.") - - # Also create a compound index on movie_id and date for even better performance - # when sorting comments by date within each movie - compound_index_exists = any( - index.get("key", {}).get("movie_id") == 1 and index.get("key", {}).get("date") == -1 - for index in existing_indexes - ) - - if not compound_index_exists: - await comments_collection.create_index([("movie_id", 1), ("date", -1)]) - print("Compound index on 'movie_id' and 'date' fields in comments collection created.") - else: - print("Compound index on 'movie_id' and 'date' fields already exists.") - + } + # Creates movieSearchIndex on the movies collection + await db.command({ + "createSearchIndexes": "movies", + "indexes": [{ + "name": "movieSearchIndex", + "definition": index_definition + }] + }) except Exception as e: - print(f"Error creating indexes: {e}") + raise RuntimeError( + f"Failed to create search index 'movieSearchIndex': {str(e)}. " + f"Search functionality may not work properly. " + f"Please check your MongoDB Atlas configuration and ensure the cluster supports search indexes." + ) + -@app.on_event("startup") async def vector_search_index(): """ Creates vector search index on application startup if it doesn't already exist. This ensures the index is ready before any vector search requests are made. """ try: - embedded_movies_collection = get_collection("embedded_movies") # Get list of existing indexes - convert AsyncCommandCursor to list @@ -128,10 +96,29 @@ async def vector_search_index(): } # Create the index - result = await embedded_movies_collection.create_search_index(index_definition) - print("Vector search index 'vector_index' ready to query.") + await embedded_movies_collection.create_search_index(index_definition) except Exception as e: - print(f"Error during vector search index setup: {str(e)}") - print(f"Error type: {type(e).__name__}") + raise RuntimeError( + f"Failed to create vector search index 'vector_index': {str(e)}. " + f"Vector search functionality will not be available. " + f"Please check your MongoDB Atlas configuration, ensure the cluster supports vector search, " + f"and verify the 'embedded_movies' collection exists with the required embedding field." + ) + + +app = FastAPI(lifespan=lifespan) + +# Add CORS middleware +cors_origins = os.getenv("CORS_ORIGINS", "http://localhost:3000,http://localhost:3001").split(",") +app.add_middleware( + CORSMiddleware, + allow_origins=[origin.strip() for origin in cors_origins], # Load from environment variable + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +register_error_handlers(app) +app.include_router(movies.router, prefix="/api/movies", tags=["movies"]) diff --git a/server/python/src/routers/movies.py b/server/python/src/routers/movies.py index a1b3d3f..797f7d2 100644 --- a/server/python/src/routers/movies.py +++ b/server/python/src/routers/movies.py @@ -1,9 +1,8 @@ from fastapi import APIRouter, Query, Path, Body -from src.database.mongo_client import db, get_collection, voyage_ai_available -from src.models.models import VectorSearchResult, CreateMovieRequest, Movie, MovieFilter, SuccessResponse, UpdateMovieRequest, SearchMoviesResponse, BatchUpdateRequest, BatchDeleteRequest +from src.database.mongo_client import get_collection, voyage_ai_available +from src.models.models import VectorSearchResult, CreateMovieRequest, Movie, SuccessResponse, UpdateMovieRequest, SearchMoviesResponse -from typing import List -from datetime import datetime +from typing import Any, List from src.utils.errorHandler import create_success_response, create_error_response from bson import ObjectId import re @@ -15,6 +14,10 @@ This file contains all the business logic for movie operations. Each method demonstrates different MongoDB operations using the PyMongo driver. +The /search and /vector-search endpoints are at the top of the file because they must be +before the /{id} endpoint to avoid route conflicts where the /search and /vector-search +endpoints match the /{id} pattern rather than the intended paths. + Implemented Endpoints: - GET /api/movies/search : @@ -25,15 +28,6 @@ Search movies using MongoDB Vector Search to enable semantic search capabilities over the plot field. -- GET /api/movies/aggregations/reportingByComments : - Aggregate movies with their most recent comments using MongoDB $lookup aggregation. - -- GET /api/movies/aggregations/reportingByYear : - Aggregate movies by year with average rating and movie count. - -- GET /api/movies/aggregations/reportingByDirectors : - Aggregate directors with the most movies and their statistics. - - GET /api/movies/{id} : Retrieve a single movie by its ID. @@ -62,6 +56,15 @@ - DELETE /api/movies/{id}/find-and-delete : Find and delete a movie in a single atomic operation. +- GET /api/movies/aggregations/reportingByComments : + Aggregate movies with their most recent comments using MongoDB $lookup aggregation. + +- GET /api/movies/aggregations/reportingByYear : + Aggregate movies by year with average rating and movie count. + +- GET /api/movies/aggregations/reportingByDirectors : + Aggregate directors with the most movies and their statistics. + Helper Functions: - execute_aggregation(pipeline): Executes a MongoDB aggregation pipeline and returns the results. @@ -246,9 +249,20 @@ async def search_movies( details=str(e) ) - # Extract total count and movies from facet results - facet_result = results[0] if results else {} - total_count = facet_result.get("totalCount", [{}])[0].get("count", 0) + # Extract total count and movies from facet results with proper bounds checking + if not results or len(results) == 0: + return create_success_response( + SearchMoviesResponse(movies=[], totalCount=0), + "No movies found matching the search criteria." + ) + + facet_result = results[0] + + # Safely extract total count + total_count_array = facet_result.get("totalCount", []) + total_count = total_count_array[0].get("count", 0) if total_count_array else 0 + + # Safely extract movies data movies_data = facet_result.get("results", []) # Convert ObjectId to string for each movie in the results @@ -388,458 +402,78 @@ async def vector_search_movies( code="INTERNAL_SERVER_ERROR", details=str(e) ) - + +#------------------------------------ +# Place get_movie_by_id endpoint here +#------------------------------------ + """ - GET /api/movies/aggregations/reportingByComments - Aggregate movies with their most recent comments using MongoDB $lookup aggregation. - Joins movies with comments collection to show recent comment activity. - Query Parameters: - limit (int, optional): Number of results to return (default: 10, max: 50). - movie_id (str, optional): Filter by specific movie ObjectId. + GET /api/movies/{id} + Retrieve a single movie by its ID. + Path Parameters: + id (str): The ObjectId of the movie to retrieve. Returns: - SuccessResponse[List[dict]]: A response object containing movies with their most recent comments. + SuccessResponse[Movie]: A response object containing the movie data. """ -@router.get("/aggregations/reportingByComments", - response_model=SuccessResponse[List[dict]], +@router.get("/{id}", + response_model=SuccessResponse[Movie], status_code=200, - summary="Aggregate movies with their most recent comments.") -async def aggregate_movies_recent_commented( - limit: int = Query(default=10, ge=1, le=50), - movie_id: str = Query(default=None) -): - - # Add a multi-stage aggregation that: - # 1. Filters movies by valid year range - # 2. Joins with comments collection (like SQL JOIN) - # 3. Filters to only movies that have comments - # 4. Sorts comments by date and extracts most recent ones - # 5. Sorts movies by their most recent comment date - # 6. Shapes the final output with transformed comment structure - - pipeline = [ - # STAGE 1: $match - Initial Filter - # Filter movies to only those with valid year data - # Tip: Use $match early to reduce the initial dataset for better performance - { - "$match": { - "year": {"$type": "number", "$gte": 1800, "$lte": 2030} - } - } - ] + summary="Retrieve a single movie by its ID.") +async def get_movie_by_id(id: str): + # Validate ObjectId format + try: + object_id = ObjectId(id) + except InvalidId: + return create_error_response( + message="Invalid movie ID format", + code="INTERNAL_SERVER_ERROR", + details=f"The provided ID '{id}' is not a valid ObjectId" + ) - # Add movie_id filter if provided - if movie_id: - try: - object_id = ObjectId(movie_id) - pipeline[0]["$match"]["_id"] = object_id - except Exception: - return create_error_response( - message="Invalid movie ID format", - code="INTERNAL_SERVER_ERROR", - details="The provided movie_id is not a valid ObjectId" - ) - - # Add remaining pipeline stages - pipeline.extend([ - # STAGE 2: $lookup - Join with the 'comments' Collection - # This gives each movie document a 'comments' array containing all its comments - { - "$lookup": { - "from": "comments", - "localField": "_id", - "foreignField": "movie_id", - "as": "comments" - } - }, - # STAGE 3: $match - Filter Movies with at Least One Comment - # This helps reduces dataset to only movies with user engagement - { - "$match": { - "comments": {"$ne": []} - } - }, - # STAGE 4: $addFields - Add New Computed Fields - { - "$addFields": { - # Add computed field 'recentComments' that extracts only the N most recent comments (up to 'limit') - "recentComments": { - "$slice": [ - { - "$sortArray": { - "input": "$comments", - "sortBy": {"date": -1} # -1 = descending (newest first) - } - }, - limit # Number of comments to keep - ] - }, - # Add computed field 'mostRecentCommentDate' that gets the date of the most recent comment (to use in the next $sort stage) - "mostRecentCommentDate": { - "$max": "$comments.date" - } - } - }, - # STAGE 5: $sort - Sort Movies by Most Recent Comment Date - { - "$sort": {"mostRecentCommentDate": -1} - }, - # STAGE 6: $limit - Restrict Result Set Size - # - If querying single movie: return up to 50 results - # - If querying all movies: return up to 20 results - # Tip: This prevents overwhelming the client with too much data - { - "$limit": 20 if movie_id else 5 - }, - # STAGE 7: $project - Shape Final Response Output - { - "$project": { - # Include basic movie fields - "title": 1, - "year": 1, - "genres": 1, - "_id": 1, - # Extract nested field: imdb.rating -> imdbRating - "imdbRating": "$imdb.rating", - # Use $map to reshape computed 'recentComments' field with cleaner field names - "recentComments": { - "$map": { - "input": "$recentComments", - "as": "comment", - "in": { - "userName": "$$comment.name", # Rename: name -> userName - "userEmail": "$$comment.email", # Rename: email -> userEmail - "text": "$$comment.text", # Keep: text - "date": "$$comment.date" # Keep: date - } - } - }, - # Calculate the total number of comments into 'totalComments' (not just 'recentComments') - # Used in display (e.g., "Showing 5 of 127 comments") - "totalComments": {"$size": "$comments"} - } - } - ]) - # Execute the aggregation + movies_collection = get_collection("movies") try: - results = await execute_aggregation(pipeline) + movie = await movies_collection.find_one({"_id": object_id}) except Exception as e: return create_error_response( - message="Database error occurred during aggregation", + message="Database error occurred", code="INTERNAL_SERVER_ERROR", details=str(e) ) - # Convert ObjectId to string for response - for result in results: - if "_id" in result: - result["_id"] = str(result["_id"]) - - # Calculate total comments from all movies - total_comments = sum(result.get("totalComments", 0) for result in results) - - return create_success_response( - results, - f"Found {total_comments} comments from movie{'s' if len(results) != 1 else ''}" - ) + if movie is None: + return create_error_response( + message="Movie not found", + code="INTERNAL_SERVER_ERROR", + details=f"No movie found with ID: {id}" + ) + movie["_id"] = str(movie["_id"]) # Convert ObjectId to string + + return create_success_response(movie, "Movie retrieved successfully") """ - GET /api/movies/aggregations/reportingByYear - Aggregate movies by year with average rating and movie count. - Reports yearly statistics including average rating and total movies per year. + GET /api/movies/ + + Retrieve a list of movies with optional filtering, sorting, and pagination. + + Query Parameters: + q (str, optional): Text search query (searches title, plot, fullplot). + genre (str, optional): Filter by genre. + year (int, optional): Filter by year. + min_rating (float, optional): Minimum IMDB rating. + max_rating (float, optional): Maximum IMDB rating. + limitNum (int, optional): Number of results to return (default: 20, max: 100). + skipNum (int, optional): Number of documents to skip for pagination (default: 0). + sortBy (str, optional): Field to sort by (default: "title"). + sort_order (str, optional): Sort direction, "asc" or "desc" (default: "asc"). + Returns: - SuccessResponse[List[dict]]: A response object containing yearly movie statistics. + SuccessResponse[List[Movie]]: A response object containing the list of movies and metadata. """ -@router.get("/aggregations/reportingByYear", - response_model=SuccessResponse[List[dict]], - status_code=200, - summary="Aggregate movies by year with average rating and movie count.") -async def aggregate_movies_by_year(): - # Define aggregation pipeline to group movies by year with statistics - # This pipeline demonstrates grouping, statistical calculations, and data cleaning - - # Add a multi-stage aggregation that: - # 1. Filters movies by valid year range (data quality filter) - # 2. Groups movies by release year and calculates statistics per year - # 3. Shapes the final output with clean field names and rounded averages - # 4. Sorts results by year (newest first) for chronological presentation - - pipeline = [ - # STAGE 1: $match - Data Quality Filter - # Clean data: ensure year is an integer and within reasonable range - # Tip: Filter early to reduce dataset size and improve performance - { - "$match": { - "year": {"$type": "number", "$gte": 1800, "$lte": 2030} - } - }, - - # STAGE 2: $group - Aggregate Movies by Year - # Group all movies by their release year and calculate various statistics - { - "$group": { - "_id": "$year", # Group by year field - "movieCount": {"$sum": 1}, # Count total movies per year - - # Calculate average rating (only for valid numeric ratings) - "averageRating": { - "$avg": { - "$cond": [ - {"$and": [ - {"$ne": ["$imdb.rating", None]}, # Not null - {"$ne": ["$imdb.rating", ""]}, # Not empty string - {"$eq": [{"$type": "$imdb.rating"}, "double"]} # Is numeric - ]}, - "$imdb.rating", # Include valid IMDB ratings - "$$REMOVE" # Exclude invalid IMDB ratings - ] - } - }, - - # Find highest rating for the year (same validation as average rating) - "highestRating": { - "$max": { - "$cond": [ - {"$and": [ - {"$ne": ["$imdb.rating", None]}, - {"$ne": ["$imdb.rating", ""]}, - {"$eq": [{"$type": "$imdb.rating"}, "double"]} - ]}, - "$imdb.rating", - "$$REMOVE" - ] - } - }, - - # Find lowest rating for the year (same validation as average and highest rating) - "lowestRating": { - "$min": { - "$cond": [ - {"$and": [ - {"$ne": ["$imdb.rating", None]}, - {"$ne": ["$imdb.rating", ""]}, - {"$eq": [{"$type": "$imdb.rating"}, "double"]} - ]}, - "$imdb.rating", - "$$REMOVE" - ] - } - }, - - # Sum total votes across all movies in the year - "totalVotes": {"$sum": "$imdb.votes"} - } - }, - - # STAGE 3: $project - Shape Final Output - # Transform the grouped data into a clean, readable format - { - "$project": { - "year": "$_id", # Rename _id back to year because grouping was done by year but values were stored in _id - "movieCount": 1, - "averageRating": {"$round": ["$averageRating", 2]}, # Round to 2 decimal places - "highestRating": 1, - "lowestRating": 1, - "totalVotes": 1, - "_id": 0 # Exclude the _id field from output - } - }, - - # STAGE 4: $sort - Sort by Year (Newest First) - # Sort results in descending order to show most recent years first - {"$sort": {"year": -1}} # -1 = descending order - ] - - # Execute the aggregation - try: - results = await execute_aggregation(pipeline) - except Exception as e: - return create_error_response( - message="Database error occurred during aggregation", - code="INTERNAL_SERVER_ERROR", - details=str(e) - ) - - return create_success_response( - results, - f"Aggregated statistics for {len(results)} years" - ) - - -""" - GET /api/movies/aggregations/reportingByDirectors - Aggregate directors with the most movies and their statistics. - Reports directors sorted by number of movies directed. - Query Parameters: - limit (int, optional): Number of results to return (default: 20, max: 100). - Returns: - SuccessResponse[List[dict]]: A response object containing director statistics. -""" - -@router.get("/aggregations/reportingByDirectors", - response_model=SuccessResponse[List[dict]], - status_code=200, - summary="Aggregate directors with the most movies and their statistics.") -async def aggregate_directors_most_movies( - limit: int = Query(default=20, ge=1, le=100) -): - # Define aggregation pipeline to find directors with the most movies - # This pipeline demonstrates array unwinding, filtering, and ranking - - # Add a multi-stage aggregation that: - # 1. Filters movies with valid directors and year data (data quality filter) - # 2. Unwinds directors array to create separate documents per director - # 3. Cleans director names by filtering out null/empty names - # 4. Groups movies by individual director and calculates statistics per director - # 5. Sorts directors based on movie count - # 6. Limits results to top N directors - # 7. Shapes the final output with clean field names and rounded averages - - pipeline = [ - # STAGE 1: $match - Initial Data Quality Filter - # Filter movies that have director information and valid years - { - "$match": { - "directors": {"$exists": True, "$ne": None, "$ne": []}, # Has directors array - "year": {"$type": "number", "$gte": 1800, "$lte": 2030} # Valid year range - } - }, - - # STAGE 2: $unwind - Flatten Directors Array - # Convert each movie's directors array into separate documents - # Example: Movie with ["Director A", "Director B"] becomes 2 documents - { - "$unwind": "$directors" - }, - - # STAGE 3: $match - Clean Director Names - # Filter out any null or empty director names after unwinding - { - "$match": { - "directors": {"$ne": None, "$ne": ""} - } - }, - - # STAGE 4: $group - Aggregate by Director - # Group all movies by director name and calculate statistics - { - "$group": { - "_id": "$directors", # Group by individual director name - "movieCount": {"$sum": 1}, # Count movies per director - "averageRating": {"$avg": "$imdb.rating"} # Average rating of director's movies - } - }, - - # STAGE 5: $sort - Rank Directors by Movie Count - # Sort directors by number of movies (highest first) - {"$sort": {"movieCount": -1}}, # -1 = descending (most movies first) - - # STAGE 6: $limit - Restrict Results - # Limit to top N directors based on user input - {"$limit": limit}, - - # STAGE 7: $project - Shape Final Output - # Transform the grouped data into a clean, readable format - { - "$project": { - "director": "$_id", # Rename _id to director - "movieCount": 1, - "averageRating": {"$round": ["$averageRating", 2]}, # Round to 2 decimal places - "_id": 0 # Exclude the _id field from output - } - } - ] - - # Execute the aggregation - try: - results = await execute_aggregation(pipeline) - except Exception as e: - return create_error_response( - message="Database error occurred during aggregation", - code="INTERNAL_SERVER_ERROR", - details=str(e) - ) - - return create_success_response( - results, - f"Found {len(results)} directors with most movies" - ) - - - -#------------------------------------ -# Place get_movie_by_id endpoint here -#------------------------------------ - -""" - GET /api/movies/{id} - Retrieve a single movie by its ID. - Path Parameters: - id (str): The ObjectId of the movie to retrieve. - Returns: - SuccessResponse[Movie]: A response object containing the movie data. -""" - -@router.get("/{id}", - response_model=SuccessResponse[Movie], - status_code=200, - summary="Retrieve a single movie by its ID.") -async def get_movie_by_id(id: str): - # Validate ObjectId format - try: - object_id = ObjectId(id) - except InvalidId: - return create_error_response( - message="Invalid movie ID format", - code="INTERNAL_SERVER_ERROR", - details=f"The provided ID '{id}' is not a valid ObjectId" - ) - - movies_collection = get_collection("movies") - try: - movie = await movies_collection.find_one({"_id": object_id}) - except Exception as e: - return create_error_response( - message="Database error occurred", - code="INTERNAL_SERVER_ERROR", - details=str(e) - ) - - if movie is None: - return create_error_response( - message="Movie not found", - code="INTERNAL_SERVER_ERROR", - details=f"No movie found with ID: {id}" - ) - - movie["_id"] = str(movie["_id"]) # Convert ObjectId to string - - return create_success_response(movie, "Movie retrieved successfully") - -""" - GET /api/movies/ - - Retrieve a list of movies with optional filtering, sorting, and pagination. - - Query Parameters: - q (str, optional): Text search query (searches title, plot, fullplot). - genre (str, optional): Filter by genre. - year (int, optional): Filter by year. - min_rating (float, optional): Minimum IMDB rating. - max_rating (float, optional): Maximum IMDB rating. - limitNum (int, optional): Number of results to return (default: 20, max: 100). - skipNum (int, optional): Number of documents to skip for pagination (default: 0). - sortBy (str, optional): Field to sort by (default: "title"). - sort_order (str, optional): Sort direction, "asc" or "desc" (default: "asc"). - - Returns: - SuccessResponse[List[Movie]]: A response object containing the list of movies and metadata. -""" - -@router.get("/", - response_model=SuccessResponse[List[Movie]], +@router.get("/", + response_model=SuccessResponse[List[Movie]], status_code=200, summary="Retrieve a list of movies with optional filtering, sorting, and pagination.") # Validate the query parameters using FastAPI's Query functionality. @@ -1016,32 +650,25 @@ async def create_movies_batch(movies: List[CreateMovieRequest]) ->SuccessRespons movies_dicts = [] for movie in movies: - movies_dicts.append(movie.model_dump(exclude_unset=True, exclude_none=True)) + movie_dict = movie.model_dump(exclude_unset=True, exclude_none=True) + # Remove _id if it exists to let MongoDB generate it automatically + movie_dict.pop('_id', None) + movies_dicts.append(movie_dict) try: result = await movies_collection.insert_many(movies_dicts) + return create_success_response({ + "insertedCount": len(result.inserted_ids), + "insertedIds": [str(_id) for _id in result.inserted_ids] + }, + f"Successfully created {len(result.inserted_ids)} movies." + ) except Exception as e: return create_error_response( - message="An error occurred while creating movies.", - code="DATABASE_ERROR", - details=str(e) - ) - - try: - result = await movies_collection.insert_many(movies_dicts) - except Exception as e: - return create_error_response( - message="Database error occurred during batch creation", + message="Database error occurred", code="INTERNAL_SERVER_ERROR", details=str(e) ) - - return create_success_response({ - "insertedCount": len(result.inserted_ids), - "insertedIds": [str(_id) for _id in result.inserted_ids] - }, - f"Successfully created {len(result.inserted_ids)} movies." - ) #------------------------------------ # Place update_movie endpoint here @@ -1113,7 +740,6 @@ async def update_movie( ) updatedMovie = await movies_collection.find_one({"_id": movie_id}) - print(updatedMovie) updatedMovie["_id"] = str(updatedMovie["_id"]) return create_success_response(updatedMovie, f"Movie updated successfully. Modified {len(update_dict)} fields.") @@ -1296,8 +922,6 @@ async def delete_movies_batch(request_body: dict = Body(...)) -> SuccessResponse f'Delete operation completed. Removed {result.deleted_count} movies.' ) - - #------------------------------------ # Place find_and_delete_movie endpoint here #------------------------------------ @@ -1349,6 +973,381 @@ async def find_and_delete_movie(id: str): return create_success_response(deleted_movie, "Movie found and deleted successfully") +""" + GET /api/movies/aggregations/reportingByComments + Aggregate movies with their most recent comments using MongoDB $lookup aggregation. + Joins movies with comments collection to show recent comment activity. + Query Parameters: + limit (int, optional): Number of results to return (default: 10, max: 50). + movie_id (str, optional): Filter by specific movie ObjectId. + Returns: + SuccessResponse[List[dict]]: A response object containing movies with their most recent comments. +""" + +@router.get("/aggregations/reportingByComments", + response_model=SuccessResponse[List[dict]], + status_code=200, + summary="Aggregate movies with their most recent comments.") +async def aggregate_movies_recent_commented( + limit: int = Query(default=10, ge=1, le=50), + movie_id: str = Query(default=None) +): + + # Add a multi-stage aggregation that: + # 1. Filters movies by valid year range + # 2. Joins with comments collection (like SQL JOIN) + # 3. Filters to only movies that have comments + # 4. Sorts comments by date and extracts most recent ones + # 5. Sorts movies by their most recent comment date + # 6. Shapes the final output with transformed comment structure + + pipeline: list[dict[str, Any]] =[ + # STAGE 1: $match - Initial Filter + # Filter movies to only those with valid year data + { + "$match": { + "year": {"$type": "number", "$gte": 1800, "$lte": 2030} + } + } + ] + + # Add movie_id filter if provided + if movie_id: + try: + object_id = ObjectId(movie_id) + pipeline[0]["$match"]["_id"] = object_id + except Exception: + return create_error_response( + message="Invalid movie ID format", + code="INTERNAL_SERVER_ERROR", + details="The provided movie_id is not a valid ObjectId" + ) + + # Add remaining pipeline stages + pipeline.extend([ + # STAGE 2: $lookup - Join with the 'comments' Collection + # This gives each movie document a 'comments' array containing all its comments + { + "$lookup": { + "from": "comments", + "localField": "_id", + "foreignField": "movie_id", + "as": "comments" + } + }, + # STAGE 3: $match - Filter Movies with at Least One Comment + # This helps reduces dataset to only movies with user engagement + { + "$match": { + "comments": {"$ne": []} + } + }, + # STAGE 4: $addFields - Add New Computed Fields + { + "$addFields": { + # Add computed field 'recentComments' that extracts only the N most recent comments (up to 'limit') + "recentComments": { + "$slice": [ + { + "$sortArray": { + "input": "$comments", + "sortBy": {"date": -1} # -1 = descending (newest first) + } + }, + limit # Number of comments to keep + ] + }, + # Add computed field 'mostRecentCommentDate' that gets the date of the most recent comment (to use in the next $sort stage) + "mostRecentCommentDate": { + "$max": "$comments.date" + } + } + }, + # STAGE 5: $sort - Sort Movies by Most Recent Comment Date + { + "$sort": {"mostRecentCommentDate": -1} + }, + # STAGE 6: $limit - Restrict Result Set Size + # - If querying single movie: return up to 50 results + # - If querying all movies: return up to 20 results + # Tip: This prevents overwhelming the client with too much data + { + "$limit": 50 if movie_id else 20 + }, + # STAGE 7: $project - Shape Final Response Output + { + "$project": { + # Include basic movie fields + "title": 1, + "year": 1, + "genres": 1, + "_id": 1, + # Extract nested field: imdb.rating -> imdbRating + "imdbRating": "$imdb.rating", + # Use $map to reshape computed 'recentComments' field with cleaner field names + "recentComments": { + "$map": { + "input": "$recentComments", + "as": "comment", + "in": { + "userName": "$$comment.name", # Rename: name -> userName + "userEmail": "$$comment.email", # Rename: email -> userEmail + "text": "$$comment.text", # Keep: text + "date": "$$comment.date" # Keep: date + } + } + }, + # Calculate the total number of comments into 'totalComments' (not just 'recentComments') + # Used in display (e.g., "Showing 5 of 127 comments") + "totalComments": {"$size": "$comments"} + } + } + ]) + # Execute the aggregation + try: + results = await execute_aggregation(pipeline) + except Exception as e: + return create_error_response( + message="Database error occurred during aggregation", + code="INTERNAL_SERVER_ERROR", + details=str(e) + ) + + # Convert ObjectId to string for response + for result in results: + if "_id" in result: + result["_id"] = str(result["_id"]) + + # Calculate total comments from all movies + total_comments = sum(result.get("totalComments", 0) for result in results) + + return create_success_response( + results, + f"Found {total_comments} comments from movie{'s' if len(results) != 1 else ''}" + ) + +""" + GET /api/movies/aggregations/reportingByYear + Aggregate movies by year with average rating and movie count. + Reports yearly statistics including average rating and total movies per year. + Returns: + SuccessResponse[List[dict]]: A response object containing yearly movie statistics. +""" + +@router.get("/aggregations/reportingByYear", + response_model=SuccessResponse[List[dict]], + status_code=200, + summary="Aggregate movies by year with average rating and movie count.") +async def aggregate_movies_by_year(): + # Define aggregation pipeline to group movies by year with statistics + # This pipeline demonstrates grouping, statistical calculations, and data cleaning + + # Add a multi-stage aggregation that: + # 1. Filters movies by valid year range (data quality filter) + # 2. Groups movies by release year and calculates statistics per year + # 3. Shapes the final output with clean field names and rounded averages + # 4. Sorts results by year (newest first) for chronological presentation + + pipeline = [ + # STAGE 1: $match - Data Quality Filter + # Clean data: ensure year is an integer and within reasonable range + # Tip: Filter early to reduce dataset size and improve performance + { + "$match": { + "year": {"$type": "number", "$gte": 1800, "$lte": 2030} + } + }, + + # STAGE 2: $group - Aggregate Movies by Year + # Group all movies by their release year and calculate various statistics + { + "$group": { + "_id": "$year", # Group by year field + "movieCount": {"$sum": 1}, # Count total movies per year + + # Calculate average rating (only for valid numeric ratings) + "averageRating": { + "$avg": { + "$cond": [ + {"$and": [ + {"$ne": ["$imdb.rating", None]}, # Not null + {"$ne": ["$imdb.rating", ""]}, # Not empty string + {"$eq": [{"$type": "$imdb.rating"}, "double"]} # Is numeric + ]}, + "$imdb.rating", # Include valid IMDB ratings + "$$REMOVE" # Exclude invalid IMDB ratings + ] + } + }, + + # Find highest rating for the year (same validation as average rating) + "highestRating": { + "$max": { + "$cond": [ + {"$and": [ + {"$ne": ["$imdb.rating", None]}, + {"$ne": ["$imdb.rating", ""]}, + {"$eq": [{"$type": "$imdb.rating"}, "double"]} + ]}, + "$imdb.rating", + "$$REMOVE" + ] + } + }, + + # Find lowest rating for the year (same validation as average and highest rating) + "lowestRating": { + "$min": { + "$cond": [ + {"$and": [ + {"$ne": ["$imdb.rating", None]}, + {"$ne": ["$imdb.rating", ""]}, + {"$eq": [{"$type": "$imdb.rating"}, "double"]} + ]}, + "$imdb.rating", + "$$REMOVE" + ] + } + }, + + # Sum total votes across all movies in the year + "totalVotes": {"$sum": "$imdb.votes"} + } + }, + + # STAGE 3: $project - Shape Final Output + # Transform the grouped data into a clean, readable format + { + "$project": { + "year": "$_id", # Rename _id back to year because grouping was done by year but values were stored in _id + "movieCount": 1, + "averageRating": {"$round": ["$averageRating", 2]}, # Round to 2 decimal places + "highestRating": 1, + "lowestRating": 1, + "totalVotes": 1, + "_id": 0 # Exclude the _id field from output + } + }, + + # STAGE 4: $sort - Sort by Year (Newest First) + # Sort results in descending order to show most recent years first + {"$sort": {"year": -1}} # -1 = descending order + ] + + # Execute the aggregation + try: + results = await execute_aggregation(pipeline) + except Exception as e: + return create_error_response( + message="Database error occurred during aggregation", + code="INTERNAL_SERVER_ERROR", + details=str(e) + ) + + return create_success_response( + results, + f"Aggregated statistics for {len(results)} years" + ) + +""" + GET /api/movies/aggregations/reportingByDirectors + Aggregate directors with the most movies and their statistics. + Reports directors sorted by number of movies directed. + Query Parameters: + limit (int, optional): Number of results to return (default: 20, max: 100). + Returns: + SuccessResponse[List[dict]]: A response object containing director statistics. +""" + +@router.get("/aggregations/reportingByDirectors", + response_model=SuccessResponse[List[dict]], + status_code=200, + summary="Aggregate directors with the most movies and their statistics.") +async def aggregate_directors_most_movies( + limit: int = Query(default=20, ge=1, le=100) +): + # Define aggregation pipeline to find directors with the most movies + # This pipeline demonstrates array unwinding, filtering, and ranking + + # Add a multi-stage aggregation that: + # 1. Filters movies with valid directors and year data (data quality filter) + # 2. Unwinds directors array to create separate documents per director + # 3. Cleans director names by filtering out null/empty names + # 4. Groups movies by individual director and calculates statistics per director + # 5. Sorts directors based on movie count + # 6. Limits results to top N directors + # 7. Shapes the final output with clean field names and rounded averages + + pipeline = [ + # STAGE 1: $match - Initial Data Quality Filter + # Filter movies that have director information and valid years + { + "$match": { + "directors": {"$exists": True, "$ne": None, "$ne": []}, # Has directors array + "year": {"$type": "number", "$gte": 1800, "$lte": 2030} # Valid year range + } + }, + + # STAGE 2: $unwind - Flatten Directors Array + # Convert each movie's directors array into separate documents + # Example: Movie with ["Director A", "Director B"] becomes 2 documents + { + "$unwind": "$directors" + }, + + # STAGE 3: $match - Clean Director Names + # Filter out any null or empty director names after unwinding + { + "$match": { + "directors": {"$ne": None, "$ne": ""} + } + }, + + # STAGE 4: $group - Aggregate by Director + # Group all movies by director name and calculate statistics + { + "$group": { + "_id": "$directors", # Group by individual director name + "movieCount": {"$sum": 1}, # Count movies per director + "averageRating": {"$avg": "$imdb.rating"} # Average rating of director's movies + } + }, + + # STAGE 5: $sort - Rank Directors by Movie Count + # Sort directors by number of movies (highest first) + {"$sort": {"movieCount": -1}}, # -1 = descending (most movies first) + + # STAGE 6: $limit - Restrict Results + # Limit to top N directors based on user input + {"$limit": limit}, + + # STAGE 7: $project - Shape Final Output + # Transform the grouped data into a clean, readable format + { + "$project": { + "director": "$_id", # Rename _id to director + "movieCount": 1, + "averageRating": {"$round": ["$averageRating", 2]}, # Round to 2 decimal places + "_id": 0 # Exclude the _id field from output + } + } + ] + + # Execute the aggregation + try: + results = await execute_aggregation(pipeline) + except Exception as e: + return create_error_response( + message="Database error occurred during aggregation", + code="INTERNAL_SERVER_ERROR", + details=str(e) + ) + + return create_success_response( + results, + f"Found {len(results)} directors with most movies" + ) + #------------------------------------ #Helper Functions #------------------------------------ @@ -1365,18 +1364,12 @@ async def find_and_delete_movie(id: str): async def execute_aggregation(pipeline: list) -> list: """Helper function to execute aggregation pipeline and return results""" - print(f"Executing pipeline: {pipeline}") # Debug logging movies_collection = get_collection("movies") # For the async Pymongo driver, we need to await the aggregate call cursor = await movies_collection.aggregate(pipeline) results = await cursor.to_list(length=None) # Convert cursor to list to collect all data at once rather than processing data per document - print(f"Aggregation returned {len(results)} results") # Debug logging - if len(results) <= 3: # Log first few results for debugging - for i, doc in enumerate(results): - print(f"Result {i+1}: {doc}") - return results """ @@ -1392,17 +1385,10 @@ async def execute_aggregation(pipeline: list) -> list: async def execute_aggregation_on_collection(collection, pipeline: list) -> list: """Helper function to execute aggregation pipeline on a specified collection and return results""" - print(f"Executing pipeline: {pipeline}") # Debug logging - print(f"Collection: {collection.name if hasattr(collection, 'name') else 'embedded_movies'}") cursor = await collection.aggregate(pipeline) results = await cursor.to_list(length=None) # Convert cursor to list - print(f"Aggregation returned {len(results)} results") # Debug logging - if len(results) <= 3: # Log first few results for debugging - for i, doc in enumerate(results): - print(f"Result {i+1}: {doc}") - return results """