# Assignment 3 — Queries


In [22]:
from DbConnector import DbConnector
from statistics import median
from itertools import combinations
from collections import Counter, defaultdict

import pandas as pd

conn = DbConnector()
db = conn.db
print("Connected to:", db.name)

def show_table(rows, limit=15):
    df = pd.DataFrame(rows)
    if len(df) > limit:
        display(df.head(limit))
        print(f"... {len(df) - limit} more rows not shown ...")
    else:
        display(df)
    return df


You are connected to the database: movies_db
-----------------------------------------------

Connected to: movies_db


## Q1 — Directors with 5 or more movies

This query looks at directors who have made at least five films and compares their box office performance and ratings.  
The idea is to identify who consistently directs successful movies both commercially and critically.  
It takes the crew data from `credits`, filters only those entries where the job is “Director”, and then joins them with the `movies` collection to get the revenue and average vote.  
Each director is grouped with all their movies, the number of films is counted, and the average TMDB rating is calculated.  
Finally, the median revenue is computed in Python since it is easier and avoids extra aggregation stages.


In [None]:
pipeline_q1 = [
    # 0) credits with crew (list)
    {"$match": {"crew": {"$type": "array"}}},

    # 1) explode crew
    {"$unwind": {"path": "$crew", "preserveNullAndEmptyArrays": False}},

    # 2) only directors (case-insensitive for robustness)
    {"$match": {"crew.job": {"$regex": "^director$", "$options": "i"}}},

    # 3) join key (TMDB id) from _id of credits
    {"$addFields": {"movie_id_int": {"$toInt": "$_id"}}},

    # 4) lookup a movies (coincide with _id in movies)
    {"$lookup": {
        "from": "movies",
        "localField": "movie_id_int",
        "foreignField": "_id",
        "as": "movie_info"
    }},
    {"$unwind": "$movie_info"},

    # 5) project necessary fields + valid revenue flag
    {"$project": {
        "_id": 0,
        "director": "$crew.name",
        "revenue": "$movie_info.revenue",
        "vote_average": "$movie_info.vote_average",
        "rev_valid": {
            "$and": [
                {"$ne": ["$movie_info.revenue", None]},
                {"$gt": ["$movie_info.revenue", 0]}
            ]
        }
    }},

    # 6) group by director
    {"$group": {
        "_id": "$director",
        "movies_total": {"$sum": 1},                               # all directed movies
        "revenues": {"$push": {"$cond": ["$rev_valid", "$revenue", None]}},
        "avg_vote": {"$avg": "$vote_average"}
    }},

    # 7) filter directors with ≥ 5 movies (regardless of valid revenue)
    {"$match": {"movies_total": {"$gte": 5}}}
]

data = list(db.credits.aggregate(pipeline_q1))
len(data), (data[0] if data else None)

(119,
 {'_id': 'Johnnie To',
  'movies_total': 6,
  'revenues': [None, None, None, None, None, None],
  'avg_vote': 5.1499999999999995})

In [None]:
# move to rows to calculate median
rows = []
for r in data:
    director = r["_id"]
    movies_total = r["movies_total"]
    avg_vote = r["avg_vote"]
    # filter None from the revenue list
    revs = [x for x in r["revenues"] if isinstance(x, (int, float))]
    if not revs:   # if there are no valid revenues, cannot contribute median
        continue
    for rev in revs:
        rows.append({"director": director, "revenue": rev,
                     "count_movies": movies_total, "mean_vote_average": avg_vote})

df = pd.DataFrame(rows)

summary = (
    df.groupby("director", as_index=False)
      .agg(median_revenue=("revenue", "median"),
           count_movies=("count_movies", "first"),
           mean_vote_average=("mean_vote_average", "first"))
      .sort_values("median_revenue", ascending=False)
      .head(10)
)
summary


Unnamed: 0,director,median_revenue,count_movies,mean_vote_average
20,Tyler Perry,60072596.0,8,5.8875
4,Francis Veber,29326868.0,6,6.316667
14,Peter Yates,12400000.0,5,6.24
2,Bruce Beresford,11950062.5,5,5.64
12,Paul Mazursky,9789900.0,5,4.28
10,Michael Apted,8453312.5,10,6.56
22,Çağan Irmak,6086224.0,5,7.14
18,Spike Lee,5731103.0,12,6.466667
21,Wong Jing,4762337.0,8,6.5
15,Pupi Avati,3006000.0,10,5.78


## Q2 — Actor pairs with at least three co-appearances

This query finds actor pairs that frequently appear together in movies.  
It unfolds the cast list twice for the same film to create all possible pairs and keeps only one order of each pair to avoid duplicates.  
The results are joined with `movies` to retrieve the rating of each film, and then the pairs are grouped by the two actor names.  
For every pair, the number of shared movies and their average rating are calculated, keeping only those who have worked together three or more times.


In [19]:
pipeline_q2 = [
    {"$match": {"cast": {"$type": "array"}}},
    {"$project": {
        "_id": 0,
        "movie_id": "$_id",
        "cast_names": {
            "$map": {
                "input": "$cast",
                "as": "c",
                "in": "$$c.name"
            }
        }
    }},
    {"$lookup": {
        "from": "movies",
        "localField": "movie_id",
        "foreignField": "_id",
        "as": "movie_info"
    }},
    {"$unwind": "$movie_info"},
    {"$project": {
        "movie_id": 1,
        "cast_names": 1,
        "vote_average": "$movie_info.vote_average"
    }}
]

movies_cast = list(db.credits.aggregate(pipeline_q2, allowDiskUse=True))
print("Movies retrieved:", len(movies_cast))
movies_cast[:2]


Movies retrieved: 45333


[{'movie_id': 2, 'cast_names': [], 'vote_average': 7.1},
 {'movie_id': 3, 'cast_names': [], 'vote_average': 7.1}]

In [20]:
pair_counts = Counter()
pair_votes = defaultdict(list)

for doc in movies_cast:
    cast = [c for c in (doc.get("cast_names") or []) if isinstance(c, str)]
    if len(cast) < 2:
        continue
    vote = doc.get("vote_average", None)
    # generar pares únicos sin orden
    for a1, a2 in combinations(sorted(set(cast)), 2):
        pair = (a1, a2)
        pair_counts[pair] += 1
        if isinstance(vote, (int, float)):
            pair_votes[pair].append(vote)

# Crear DataFrame
rows = []
for pair, count in pair_counts.items():
    if count >= 3:  # solo pares con ≥3 apariciones
        votes = pair_votes.get(pair, [])
        avg_vote = sum(votes) / len(votes) if votes else None
        rows.append({
            "actor_1": pair[0],
            "actor_2": pair[1],
            "co_appearances": count,
            "mean_vote_average": avg_vote
        })

df_q2 = pd.DataFrame(rows)
df_q2 = df_q2.sort_values(["co_appearances", "mean_vote_average"], ascending=[False, False]).head(20)
df_q2.reset_index(drop=True, inplace=True)
df_q2


Unnamed: 0,actor_1,actor_2,co_appearances,mean_vote_average
0,Barbara Hale,Raymond Burr,18,4.972222
1,Frank Welker,Grey Griffin,15,6.753333
2,Barbara Hale,William R. Moses,15,5.386667
3,Frank Welker,Mindy Cohn,13,6.707692
4,Grey Griffin,Mindy Cohn,13,6.707692
5,Pinto Colvig,Walt Disney,12,6.216667
6,Raymond Burr,William R. Moses,12,4.875
7,Clarence Nash,Walt Disney,10,6.21
8,Clarence Nash,Pinto Colvig,9,6.3
9,Frank Welker,Matthew Lillard,8,6.8


## Q3 — Actors with the widest genre variety

The goal of this query is to highlight actors who have worked across the most diverse set of genres.  
After connecting each cast member with the corresponding movie information, the list of genres for each movie is expanded.  
The query then groups by actor name and collects a set of distinct genre names to count how many different ones appear.  
Only actors with at least ten movies are included, and the final list is sorted by the number of distinct genres.


In [34]:
pipeline_q3 = [
    {"$match": {"cast": {"$type": "array"}}},
    {"$unwind": "$cast"},
    {"$lookup": {
        "from": "movies",
        "localField": "_id",
        "foreignField": "_id",
        "as": "movie_info"
    }},
    {"$unwind": "$movie_info"},
    {"$project": {
        "_id": 0,
        "movie_id": "$_id",
        "actor": "$cast.name",
        "genres": {
            "$map": {
                "input": "$movie_info.genres",
                "as": "g",
                "in": "$$g.name"
            }
        }
    }}
]

actor_genres = list(db.credits.aggregate(pipeline_q3, allowDiskUse=True))
print("Movies processed:", len(actor_genres))

Movies processed: 62935


In [35]:
actor_movies = defaultdict(set)
actor_genres_all = defaultdict(set)

for doc in actor_genres:
    actor = doc["actor"]
    movie_id = doc["movie_id"]
    genres = [g for g in (doc.get("genres") or []) if isinstance(g, str)]
    actor_movies[actor].add(movie_id)
    actor_genres_all[actor].update(genres)

rows = []
for actor in actor_movies:
    num_movies = len(actor_movies[actor])
    if num_movies < 10:
        continue
    genres = actor_genres_all[actor]
    rows.append({
        "actor": actor,
        "movie_count": num_movies,
        "distinct_genres": len(genres),
        "example_genres": list(genres)[:5]
    })

df_q3 = (
    pd.DataFrame(rows)
    .sort_values("distinct_genres", ascending=False)
    .head(10)
    .reset_index(drop=True)
)

df_q3

Unnamed: 0,actor,movie_count,distinct_genres,example_genres
0,Christopher Lloyd,20,18,"[Western, Science Fiction, History, Action, Ho..."
1,Donald Sutherland,26,18,"[Science Fiction, Action, Horror, War, Mystery]"
2,Billy Zane,15,18,"[Western, Science Fiction, History, Action, Ho..."
3,Alec Baldwin,24,18,"[Western, Science Fiction, Action, War, Mystery]"
4,Michael Gambon,13,17,"[Action, Horror, War, Mystery, Thriller]"
5,Michael McKean,11,17,"[Western, Science Fiction, History, Action, War]"
6,Charlton Heston,20,16,"[Science Fiction, History, Action, Horror, War]"
7,John Goodman,25,16,"[Western, Science Fiction, Action, Mystery, Th..."
8,Jon Voight,11,16,"[Science Fiction, Action, Horror, War, Thriller]"
9,Martin Sheen,20,16,"[Western, Science Fiction, Action, Horror, War]"


## Q4 — Top movie collections by total revenue

This query focuses on film collections or sagas and measures their financial success.  
It selects movies that belong to a collection, groups them by the collection name, and sums their total revenue.  
The aggregation also counts how many films belong to each collection and gathers all their ratings.  
The median of these ratings is calculated afterwards in Python, and the final result displays the ten most profitable collections.


In [None]:
pipeline_q4 = [
    # 1. Show only movies that belong to a collection
    {"$match": {
        "belongs_to_collection": {"$ne": None},
        "belongs_to_collection.name": {"$exists": True, "$ne": ""}
    }},

    # 2. Group by collection name
    {"$group": {
        "_id": "$belongs_to_collection.name",
        "movie_count": {"$sum": 1},
        "revenues": {"$push": "$revenue"},
        "votes": {"$push": "$vote_average"},
        "release_dates": {"$push": "$release_date"}
    }},

    # 3. Filter collections with ≥ 3 movies
    {"$match": {"movie_count": {"$gte": 3}}}
]

rows_q4 = list(db.movies.aggregate(pipeline_q4, allowDiskUse=True))
print("Collections found:", len(rows_q4))

Collections found: 474


In [41]:
rows_clean = []
for r in rows_q4:
    name = r["_id"]
    revs = [x for x in r["revenues"] if isinstance(x, (int, float)) and x > 0]
    votes = [x for x in r["votes"] if isinstance(x, (int, float))]
    dates = [d for d in r.get("release_dates", []) if d]

    if not revs:
        continue

    row = {
        "collection": name,
        "movie_count": r["movie_count"],
        "total_revenue": sum(revs),
        "median_vote_average": round(median(votes), 2) if votes else None,
        "earliest_release_date": min(dates) if dates else None,
        "latest_release_date": max(dates) if dates else None
    }
    rows_clean.append(row)

df_q4 = (
    pd.DataFrame(rows_clean)
      .sort_values("total_revenue", ascending=False)
      .head(10)
      .reset_index(drop=True)
)

df_q4


Unnamed: 0,collection,movie_count,total_revenue,median_vote_average,earliest_release_date,latest_release_date
0,Harry Potter Collection,8,7707367000.0,7.5,2001-11-16,2011-07-07
1,Star Wars Collection,8,7434495000.0,7.45,1977-05-25,2016-12-14
2,James Bond Collection,26,7106970000.0,6.3,1962-10-04,2015-10-26
3,The Fast and the Furious Collection,8,5125099000.0,6.65,2001-06-22,2017-04-12
4,Pirates of the Caribbean Collection,5,4521577000.0,6.9,2003-07-09,2017-05-23
5,Transformers Collection,5,4366101000.0,6.1,2007-06-27,2017-06-21
6,Despicable Me Collection,6,3691070000.0,6.9,2010-07-08,2017-06-15
7,The Twilight Collection,5,3342107000.0,5.8,2008-11-20,2012-11-13
8,Ice Age Collection,5,3216709000.0,6.5,2002-03-10,2016-06-23
9,Jurassic Park Collection,4,3031484000.0,6.35,1993-06-11,2015-06-09


## Q5 — Median runtime per decade and primary genre

This query explores how movie runtimes vary by decade and genre.  
The release year of each film is converted into a decade label such as “1980s” or “2000s”.  
The first genre listed in the movie data is treated as the primary genre for grouping purposes.  
Movies are grouped by both decade and genre, and for each group the total number of films is recorded.  
The median runtime is then computed in Python to give a clean summary of the central tendency for each category.


In [None]:
pipeline_q5 = [
    # Only movies with valid runtime, release_date and genres
    {"$match": {
        "runtime": {"$ne": None, "$gt": 0},
        "release_date": {"$ne": None},
        "genres": {"$type": "array", "$ne": []}
    }},

    # First genre as "primary_genre"
    {"$addFields": {
        "primary_genre": {"$arrayElemAt": ["$genres.name", 0]}
    }},

    # Year and decade (e.g. "1980s")
    {"$addFields": {"year": {"$year": "$release_date"}}},
    {"$addFields": {
        "decade": {
            "$concat": [
                {"$toString": {"$multiply": [{"$floor": {"$divide": ["$year", 10]}}, 10]}},
                "s"
            ]
        }
    }},

    # Group by decade + genre
    {"$group": {
        "_id": {"decade": "$decade", "genre": "$primary_genre"},
        "runtimes": {"$push": "$runtime"},
        "movie_count": {"$sum": 1}
    }},

    # Remove empty genres
    {"$match": {"_id.genre": {"$ne": None, "$ne": ""}}}
]

data = list(db.movies.aggregate(pipeline_q5, allowDiskUse=True))
print("Groups:", len(data))

Groups: 230


In [None]:
# Calculate median_runtime
rows = []
for r in data:
    decade = r["_id"]["decade"]
    genre = r["_id"]["genre"]
    runtimes = [x for x in r["runtimes"] if isinstance(x, (int, float)) and x > 0]
    if not runtimes:
        continue
    rows.append({
        "decade": decade,
        "genre": genre,
        "movie_count": r["movie_count"],
        "median_runtime": round(median(runtimes), 1)
    })

df = pd.DataFrame(rows)

# Identify top genre per decade
top_genres = (
    df.loc[df.groupby("decade")["movie_count"].idxmax()]
    .sort_values("decade")
    .reset_index(drop=True)
)

top_genres

Unnamed: 0,decade,genre,movie_count,median_runtime
0,1880s,Documentary,2,1.0
1,1890s,Documentary,27,1.0
2,1900s,Comedy,17,3.0
3,1910s,Comedy,53,27.0
4,1920s,Drama,164,90.0
5,1930s,Drama,382,85.0
6,1940s,Drama,466,98.0
7,1950s,Drama,627,98.0
8,1960s,Drama,714,102.0
9,1970s,Drama,860,102.0


## Q6 — Share of women in top-5 billed cast by decade

This query measures how the presence of women among the top-billed actors has changed over time.  
It keeps only cast entries where the order is less than five, which corresponds to the top of the cast list.  
For each movie, it counts how many of these actors are women using the `gender` value equal to 1.  
The ratio of female actors within the top five is then averaged by decade based on each movie’s release date.


In [None]:
pipeline_q6 = [
    # 1) Secure cast as array and unwind
    {"$match": {"cast": {"$type": "array"}}},
    {"$unwind": "$cast"},

    # 2) Top-5 per 'order' (0..4).
    {"$match": {"cast.order": {"$gte": 0, "$lte": 4}}},

    # 3) Mark known genre and collect by movie
    {"$project": {
        "_id": 1,
        "gender": "$cast.gender",
        "is_known": {"$in": ["$cast.gender", [1, 2]]}  # 1=female, 2=male, others/None = unknown
    }},

    {"$group": {
        "_id": "$_id",  # movie_id (TMDB)
        "known_count": {"$sum": {"$cond": ["$is_known", 1, 0]}},
        "female_count": {"$sum": {"$cond": [{"$eq": ["$gender", 1]}, 1, 0]}}
    }},

    # 4) Ignore movies whose top 5 has no known genre
    {"$match": {"known_count": {"$gt": 0}}},

    # 5) Female proportion by movie
    {"$addFields": {"female_prop": {"$divide": ["$female_count", "$known_count"]}}},

    # 6) Join with movies to get release_date and decade
    {"$lookup": {
        "from": "movies",
        "localField": "_id",
        "foreignField": "_id",
        "as": "mv"
    }},
    {"$unwind": "$mv"},
    {"$match": {"mv.release_date": {"$ne": None}}},

    {"$addFields": {"year": {"$year": "$mv.release_date"}}},
    {"$addFields": {
        "decade": {
            "$concat": [
                {"$toString": {"$multiply": [{"$floor": {"$divide": ["$year", 10]}}, 10]}},
                "s"
            ]
        }
    }},

    # 7) Added by decade
    {"$group": {
        "_id": "$decade",
        "avg_female_prop": {"$avg": "$female_prop"},
        "movie_count": {"$sum": 1}
    }},

    # 8) Sort by highest female proportion
    {"$sort": {"avg_female_prop": -1}}
]

rows_q6 = list(db.credits.aggregate(pipeline_q6, allowDiskUse=True))
df_q6 = (
    pd.DataFrame(rows_q6)
      .rename(columns={"_id": "decade"})
      .assign(avg_female_prop=lambda d: d["avg_female_prop"].round(3))
      .sort_values(["avg_female_prop", "decade"], ascending=[False, True])
      .reset_index(drop=True)
)

df_q6

Unnamed: 0,decade,avg_female_prop,movie_count
0,2020s,0.8,1
1,1910s,0.533,7
2,2010s,0.361,2639
3,2000s,0.36,2384
4,1930s,0.353,121
5,1940s,0.349,202
6,1990s,0.344,1091
7,1920s,0.34,17
8,1950s,0.317,326
9,1960s,0.304,331


## Q7 — Noir and neo-noir movies (regex search)

The goal of this query is to find movies described as noir or neo-noir in their overview or tagline.  
Since there is no text index in the database, regular expressions are used to match both words in a case-insensitive way.  
To avoid irrelevant titles, only movies with at least fifty votes are included.  
The output lists the top results sorted by rating.

In [62]:
# Regex: "neo-noir" o "neo noir" o "noir", case-insensitive, with word boundaries where applicable
neo_noir = {"$regex": r"(?i)\bneo[- ]?noir\b"}
noir     = {"$regex": r"(?i)\bnoir\b"}

pipeline_q7 = [
    {"$match": {
        "vote_count": {"$gte": 50},
        "$or": [
            {"overview": neo_noir},
            {"tagline":  neo_noir},
            {"overview": noir},
            {"tagline":  noir},
        ]
    }},
    {"$project": {
        "_id": 0,
        "title": 1,
        "year": {"$cond": [
            {"$ne": ["$release_date", None]},
            {"$year": "$release_date"},
            None
        ]},
        "vote_average": 1,
        "vote_count": 1
    }},
    {"$sort": {"vote_average": -1, "vote_count": -1, "title": 1}},
    {"$limit": 20}
]

rows_q7 = list(db.movies.aggregate(pipeline_q7, allowDiskUse=True))
df_q7 = pd.DataFrame(rows_q7)
df_q7

Unnamed: 0,title,vote_average,vote_count,year
0,The Bad Sleep Well,7.7,57,1960
1,Drunken Angel,7.7,54,1948
2,Elevator to the Gallows,7.6,85,1958
3,Synchronicity,5.7,114,2015
4,Frank & Lola,5.7,51,2016


## Q8 — Best director–actor collaborations

This query highlights pairs of directors and actors who have frequently worked together and achieved strong average ratings.  
It first extracts each movie’s director, joins the same movie’s cast to create all possible director–actor pairs,  
and then connects to the `movies` collection to include vote counts.  
Only movies with at least one hundred votes are considered to keep the sample meaningful,  
and only pairs with at least three collaborations are kept in the final ranking.


In [None]:
pipeline_q8 = [
    {"$match": {"crew": {"$type": "array"}, "cast": {"$type": "array"}}},

    # director → actor
    {"$unwind": "$crew"},
    {"$match": {"crew.job": {"$regex": "^director$", "$options": "i"}}},
    {"$unwind": "$cast"},

    # keys and cleaning
    {"$project": {
        "movie_id": "$_id",
        "director": "$crew.name",
        "actor": "$cast.name"
    }},
    {"$match": {
        "director": {"$ne": None, "$ne": ""},
        "actor": {"$ne": None, "$ne": ""}
    }},

    # Lookup by id with cast to int explicit on both sides
    {"$lookup": {
        "from": "movies",
        "let": {"cid": {"$toInt": "$movie_id"}},
        "pipeline": [
            {"$addFields": {"mid": {"$toInt": "$_id"}}},
            {"$match": {"$expr": {"$eq": ["$mid", "$$cid"]}}},
            {"$project": {"vote_average": 1, "vote_count": 1, "revenue": 1}}
        ],
        "as": "mv"
    }},
    {"$unwind": "$mv"},

    # Votes (force type before filtering)
    {"$addFields": {"vote_count_num": {"$toInt": "$mv.vote_count"}}},
    {"$match": {"vote_count_num": {"$gte": 100}}},

    # Group ONLY to feature unique collaborations
    {"$group": {
        "_id": {"director": "$director", "actor": "$actor"},
        "movie_ids": {"$addToSet": "$movie_id"},
        "avg_vote": {"$avg": "$mv.vote_average"},
        "avg_revenue": {"$avg": {
            "$cond": [
                {"$and": [
                    {"$ne": ["$mv.revenue", None]},
                    {"$gt": ["$mv.revenue", 0]}
                ]},
                "$mv.revenue",
                None
            ]
        }}
    }},

    # >= 3 collaborations
    {"$addFields": {"movie_count": {"$size": "$movie_ids"}}},
    {"$match": {"movie_count": {"$gte": 3}}},

    # output
    {"$project": {
        "_id": 0,
        "director": "$_id.director",
        "actor": "$_id.actor",
        "movie_count": 1,
        "mean_vote_average": {"$round": ["$avg_vote", 2]},
        "mean_revenue": {
            "$cond": [
                {"$eq": ["$avg_revenue", None]},
                None,
                {"$round": ["$avg_revenue", 0]}
            ]
        }
    }},
    {"$sort": {"mean_vote_average": -1, "movie_count": -1, "director": 1, "actor": 1}},
    {"$limit": 20}
]
rows_q8 = list(db.credits.aggregate(pipeline_q8, allowDiskUse=True))
print("Results:", len(rows_q8))
rows_q8[:5]


Results: 0


[]

In [None]:
probe_counts = [
  {"$match": {"crew": {"$type": "array"}, "cast": {"$type": "array"}}},
  {"$unwind": "$crew"},
  {"$match": {"crew.job": {"$regex": "^director$", "$options": "i"}}},
  {"$unwind": "$cast"},
  {"$project": {"movie_id": "$_id", "director": "$crew.name", "actor": "$cast.name"}},
  {"$match": {"director": {"$ne": ""}, "actor": {"$ne": ""}}},
  {"$lookup": {
      "from": "movies",
      "let": {"cid": {"$toInt": "$movie_id"}},
      "pipeline": [
        {"$addFields": {"mid": {"$toInt": "$_id"}}},
        {"$match": {"$expr": {"$eq": ["$mid", "$$cid"]}}},
        {"$project": {"vote_count": 1, "vote_average": 1, "revenue": 1}}
      ],
      "as": "mv"
  }},
  {"$unwind": "$mv"},
  {"$addFields": {"vote_count_num": {"$toInt": "$mv.vote_count"}}},
  {"$match": {"vote_count_num": {"$gte": 100}}},

  # Group ONLY to feature unique collaborations
  {"$group": {
      "_id": {"director": "$director", "actor": "$actor"},
      "movie_ids": {"$addToSet": "$movie_id"}
  }},
  {"$project": {"_id": 0, "director": "$_id.director", "actor": "$_id.actor",
                "movie_count": {"$size": "$movie_ids"}}},
  {"$sort": {"movie_count": -1}},
  {"$limit": 20}
]

top_counts = list(db.credits.aggregate(probe_counts, allowDiskUse=True))
top_counts


[{'director': 'Eric Lavaine', 'actor': 'Jérôme Commandeur', 'movie_count': 2},
 {'director': 'Eric Lavaine', 'actor': 'Stéphane De Groodt', 'movie_count': 1},
 {'director': 'Michael Winterbottom',
  'actor': 'Jessica Alba',
  'movie_count': 1},
 {'director': 'Eric Toledano', 'actor': 'Julie Fournier', 'movie_count': 1},
 {'director': 'Michael Winterbottom',
  'actor': 'Elias Koteas',
  'movie_count': 1},
 {'director': 'Bernard Campan', 'actor': 'Sofia Lesaffre', 'movie_count': 1},
 {'director': 'François Ozon', 'actor': 'Isild Le Besco', 'movie_count': 1},
 {'director': 'François Ozon', 'actor': 'François Ozon', 'movie_count': 1},
 {'director': 'Derrick Borte', 'actor': 'Amber Heard', 'movie_count': 1},
 {'director': 'Cameron Crowe',
  'actor': 'Crystal the Monkey',
  'movie_count': 1},
 {'director': 'Dave McKean', 'actor': 'Gina McKee', 'movie_count': 1},
 {'director': 'Francis Veber', 'actor': 'Dany Boon', 'movie_count': 1},
 {'director': 'Uwe Boll', 'actor': 'Kristanna Loken', 'movi

## Q9 — Non-English languages connected to the United States

This query checks which non-English original languages appear most often in movies that have a connection to the United States.  
A connection is counted if the movie lists the United States either as a production country or as the origin of one of its production companies.  
The aggregation counts how many movies match each non-English language and returns the ten most common ones.


In [None]:
pipeline_q9 = [
    # 1. Movies with non-English original language and valid production data
    {"$match": {
        "original_language": {"$ne": "en"},
        "production_companies": {"$type": "array"},
        "production_countries": {"$type": "array"}
    }},

    # 2. Extract country names and codes
    {"$addFields": {
        "prod_country_names": {
            "$map": {
                "input": "$production_countries",
                "as": "c",
                "in": {"$ifNull": ["$$c.name", ""]}
            }
        },
        "prod_country_codes": {
            "$map": {
                "input": "$production_countries",
                "as": "c",
                "in": {"$ifNull": ["$$c.iso_3166_1", ""]}
            }
        },
        "prod_company_countries": {
            "$map": {
                "input": "$production_companies",
                "as": "p",
                "in": {"$ifNull": ["$$p.origin_country", ""]}
            }
        }
    }},

    # 3. Filter movies with US participation
    {"$match": {
        "$or": [
            {"prod_country_names": {"$in": ["United States of America", "USA"]}},
            {"prod_country_codes": {"$in": ["US"]}},
            {"prod_company_countries": {"$in": ["US"]}}
        ]
    }},

    # 4. Group by original language
    {"$group": {
        "_id": "$original_language",
        "movie_count": {"$sum": 1},
        "example_title": {"$first": "$title"}
    }},

    # 5. Sort and limit
    {"$sort": {"movie_count": -1}},
    {"$limit": 10},

    # 6. Final projection
    {"$project": {
        "_id": 0,
        "original_language": "$_id",
        "movie_count": 1,
        "example_title": 1
    }}
]

rows_q9 = list(db.movies.aggregate(pipeline_q9, allowDiskUse=True))
df_q9 = pd.DataFrame(rows_q9)
df_q9

Unnamed: 0,movie_count,example_title,original_language
0,112,Wings of Courage,fr
1,72,Bitter Sugar,es
2,56,Frankie Starlight,it
3,51,Cold Fever,de
4,30,Godzilla 1985,ja
5,15,Senseless,pt
6,13,Quest for Fire,xx
7,12,"Come On, Rangers",nl
8,11,Dark Eyes,ru
9,10,Eat Drink Man Woman,zh


## Q10 — Per-user statistics: number of ratings, variance, and genre diversity

The last query calculates a few basic statistics for each user.  
It joins `ratings` with `links` and then `movies` to bring the genre information of every rated movie.  
Each user’s set of distinct genres is collected to count how varied their watched content is.  
The population standard deviation of the ratings is used to compute the variance.  
To keep the results stable, only users with at least twenty ratings are included.


In [None]:
pipeline_user_stats = [
    {"$addFields": {
        "userId": {"$toInt": "$userId"},
        "movieId": {"$toInt": "$movieId"},
        "rating": {"$toDouble": "$rating"}
    }},
    {"$group": {
        "_id": "$userId",                 # _id = userId 
        "n_ratings": {"$sum": 1},
        "std_pop": {"$stdDevPop": "$rating"},
        "movie_ids": {"$addToSet": "$movieId"}
    }},
    {"$match": {"n_ratings": {"$gte": 20}}},
    {"$addFields": {"variance": {"$multiply": ["$std_pop", "$std_pop"]}}},
    {"$merge": {
        "into": "user_stats_tmp",
        "whenMatched": "replace",
        "whenNotMatched": "insert"
    }}
]

list(db.ratings.aggregate(pipeline_user_stats, allowDiskUse=True))
print(db.user_stats_tmp.estimated_document_count())
print(db.user_stats_tmp.find_one())


166444
{'_id': 1, 'n_ratings': 27, 'std_pop': 1.1732617823301597, 'movie_ids': [2918, 99114, 69844, 92439, 81834, 96821, 112552, 4878, 1968, 110, 858, 91500, 2959, 68358, 147, 98809, 1246, 1221, 54503, 59315, 58559, 4226, 91542, 2762, 73017, 5577, 33794], 'variance': 1.3765432098765429}


In [None]:
pipeline_genres = [
    # NO delete _id; we will use it as merge key
    {"$lookup": {
        "from": "movies",
        "let": {"ids": "$movie_ids"},
        "pipeline": [
            {"$match": {"$expr": {"$in": ["$_id", "$$ids"]}}},
            {"$unwind": "$genres"},
            {"$group": {"_id": None, "genres_set": {"$addToSet": "$genres.name"}}},
            {"$project": {
                "_id": 0,
                "genre_count": {"$size": "$genres_set"},
                "example_genres": {"$slice": ["$genres_set", 5]}
            }}
        ],
        "as": "g"
    }},
    {"$addFields": {
        "genre_count": {"$ifNull": [{"$first": "$g.genre_count"}, 0]},
        "example_genres": {"$ifNull": [{"$first": "$g.example_genres"}, []]}
    }},
    {"$project": {
        # keep _id for merge
        "n_ratings": 1,
        "variance": 1,
        "genre_count": 1,
        "example_genres": 1
    }},
    {"$merge": {
        "into": "user_stats_tmp",
        "on": "_id",                   # stable key (userId)
        "whenMatched": "merge",
        "whenNotMatched": "discard"
    }}
]

list(db.user_stats_tmp.aggregate(pipeline_genres, allowDiskUse=True))


[]

In [None]:
# Top 10 users with most diverse tastes
rows_diverse = list(db.user_stats_tmp.aggregate([
    {"$project": {
        "_id": 0,
        "userId": "$_id",
        "n_ratings": 1,
        "variance": 1,
        "genre_count": 1,
        "example_genres": 1
    }},
    {"$sort": {"genre_count": -1, "n_ratings": -1, "userId": 1}},
    {"$limit": 10}
]))

# Top 10 users with highest variance
rows_var = list(db.user_stats_tmp.aggregate([
    {"$project": {
        "_id": 0,
        "userId": "$_id",
        "n_ratings": 1,
        "variance": 1,
        "genre_count": 1,
        "example_genres": 1
    }},
    {"$sort": {"variance": -1, "n_ratings": -1, "userId": 1}},
    {"$limit": 10}
]))


In [None]:
df_var = pd.DataFrame(rows_var)
display(df_var)

Unnamed: 0,n_ratings,variance,example_genres,genre_count,userId
0,34,5.0625,"[Thriller, Science Fiction, Romance, Crime, TV...",13,167241
1,26,5.0625,"[Family, Crime, Romance, Music, Animation]",13,185889
2,20,5.0625,"[Thriller, Horror, Romance, Crime, Science Fic...",14,18703
3,20,5.0625,"[Adventure, Action, Mystery, Drama, Comedy]",10,57842
4,20,5.0625,"[Thriller, Science Fiction, Romance, Crime, We...",13,83102
5,20,5.0625,"[Science Fiction, Fantasy, Thriller, Crime, An...",11,153882
6,83,5.061765,"[Adventure, Action, Comedy, Drama, Horror]",13,15395
7,27,5.055556,"[Drama, Romance, Comedy, Thriller, Science Fic...",5,71009
8,21,5.05102,"[Fantasy, Romance, Action, History, Animation]",12,179759
9,120,5.04,"[Fantasy, Crime, War, History, Western]",17,221535
