# MongoDB Queries & Aggregation Pipelines

**Datasets:** sample_mflix (movies) & sample_airbnb (listings)

### Objectives
- Perform basic and advanced queries
- Build aggregation pipelines
- Use common aggregation operators
- Use cases

---

In [None]:
!pip install pymongo

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

if not os.environ.get("MONGO_CONNECTION_STRING"):
    print("Connection string for MONGO is not set. Please check your .env file.")
else:
    print("MONGO_CONNECTION_STRING loaded successfully.")

In [None]:
import pymongo
from pprint import pprint

MONGO_CONNECTION_STRING = os.environ.get("MONGO_CONNECTION_STRING")
mongo_client = pymongo.MongoClient(MONGO_CONNECTION_STRING)

try:
    mongo_client.admin.command('ping')
    print("✅ Connected successfully!")
except Exception as e:
    print("❌ Connection failed:", e)

airbnb_db = mongo_client["sample_airbnb"]
listings = airbnb_db.listingsAndReviews

mflix_db = mongo_client["sample_mflix"]
movies = mflix_db.movies


In [None]:
print(f"Movies count: {movies.count_documents({})}")
print(f"Listings count: {listings.count_documents({})}")

---
## Basic Queries

### Simple Find Operations

In [None]:
# Find one movie
one_movie = movies.find_one()
pprint(one_movie)

In [None]:
# Find movies from a specific year
movies_2010 = movies.find({"year": 2010}).limit(3)

for movie in movies_2010:
    print(f"{movie['title']} ({movie['year']})")

### 2 Query Operators


In [None]:
# Find movies with rating greater than 8.5
high_rated = movies.find(
    {"imdb.rating": {"$gt": 8.5}},
    {"title": 1, "imdb.rating": 1, "year": 1}
).limit(5)

print("High-rated movies:")
for movie in high_rated:
    print(f"{movie['title']} - Rating: {movie.get('imdb', {}).get('rating', 'N/A')}")

In [None]:
# Find movies in specific genres (using $in operator)
action_drama = movies.find(
    {"genres": {"$in": ["Action", "Drama"]}},
    {"title": 1, "genres": 1}
).limit(5)

print("Action or Drama movies:")
for movie in action_drama:
    print(f"{movie['title']} - {movie.get('genres', [])}")

In [None]:
# Find movies released between 2000 and 2010
movies_range = movies.find(
    {"year": {"$gte": 2000, "$lte": 2010}},
    {"title": 1, "year": 1}
).limit(5)

print("Movies from 2000-2010:")
for movie in movies_range:
    print(f"{movie['title']} ({movie.get('year', 'N/A')})")

In [None]:
# Find movies that *do not* have an 'awards.text' field
movies_no_awards_text = movies.find(
    # Query filter using $exists: false
    {"awards.text": {"$exists": False}},
    {"title": 1, "year": 1, "awards.text": 1}
).limit(5)

print("Movies without an 'awards.text' field:")
for movie in movies_no_awards_text:
    awards_text = movie.get('awards', {}).get('text', 'N/A (Field Missing)')
    print(f"{movie['title']} ({movie.get('year', 'N/A')}) | Awards Text: {awards_text}")

In [None]:
# Find movies that *do not* a description field

query = {
    "$or": [
        # 1. 'description' field is completely missing
        {"description": {"$exists": False}},

        # 2. 'description' field exists but its value is an empty string
        {"description": ""},

        # 3. 'description' field exists but its value is null
        # In PyMongo, use None for BSON null
        {"description": None}
    ]
}

movies_no_description = movies.find(query).limit(5)
for movie in movies_no_description:
    print(f"{movie['title']} ({movie.get('description', 'N/A')})")

In [None]:
# Find movies with 3 actors.

query = {
    "cast": {
        "$exists": True,
        "$size": 3
    }
}

movies_wiht_actors = movies.find(query).limit(5)
for movie in movies_wiht_actors:
    print(f"{movie['title']} ({movie.get('cast', [])})")

In [None]:
# Find beach houses

query = {
    "description": {
        "$regex": "beach house",
        # Options: 'i' for case-insensitive matching
        "$options": "i"
    }
}

listing_beach_houses = listings.find(query).limit(5)
for listing in listing_beach_houses:
    print(f"{listing['name']} - {listing.get('description')})")

### Exercise 1

Write queries to:
1. Find all movies directed by "Christopher Nolan"
2. Find listings in sample_airbnb with more than 3 bedrooms
3. Find movies with "Comedy" genre AND year after 2015

In [None]:
# Exercise 1.1: Christopher Nolan movies
nolan_movies = movies.find(
    # YOUR CODE HERE
)

for movie in nolan_movies:
    print(movie['title'])

In [None]:
# Exercise 1.2: Listings with more than 3 bedrooms
large_listings = listings.find(
    # YOUR CODE HERE
).limit(5)

for listing in large_listings:
    print(f"{listing['name']} - {listing['bedrooms']} bedrooms")

In [None]:
# Exercise 1.3: Recent comedy movies
recent_comedies = movies.find(
    # YOUR CODE HERE
).limit(5)

for movie in recent_comedies:
    print(f"{movie['title']} ({movie['year']})")

---
## Introduction to Aggregation Pipelines

Aggregation pipelines allows data processing in stages, similar to Unix pipes.

### 1 Basic Aggregation Structure

In [None]:
# Count movies by year
pipeline = [
    {"$match": {"year": {"$gte": 2010}}},  # Filter stage
    {"$group": {                            # Group stage
        "_id": "$year",
        "count": {"$sum": 1}
    }},
    {"$sort": {"_id": -1}},                 # Sort stage
    {"$limit": 5}                           # Limit results
]

result = movies.aggregate(pipeline)

print("Movies per year (2010+):")
for doc in result:
    print(f"Year {doc['_id']}: {doc['count']} movies")

### 2 ```$match``` and ```$project``` Stages

In [None]:
# Get movie titles and ratings
pipeline = [
    {"$match": {"imdb.rating": {"$exists": True,"$gt": 0}}},
    {"$project": {
        "_id": 0,
        "title": 1,
        "rating": "$imdb.rating",
        "year": 1
    }},
    {"$sort": {"rating": -1}},
    {"$limit": 5}
]

result = movies.aggregate(pipeline)

print("Top rated movies:")
for movie in result:
    pprint(movie)

### 3 ```$group``` Stage - Aggregation Functions

In [None]:
# Average rating by genre
pipeline = [
    {"$match": {"genres": {"$exists": True}, "imdb.rating": {"$exists": True}}},
    {"$unwind": "$genres"},  # Separate array elements into individual documents
    {"$limit": 10}
]

result = movies.aggregate(pipeline)


for genre in result:
    print(f"{genre['_id']} - {genre['title']} - {genre['genres']} - {genre['imdb']['rating']}")

In [None]:
# Average rating by genre
pipeline = [
    {"$match": {"genres": {"$exists": True}, "imdb.rating": {"$exists": True}}},
    {"$unwind": "$genres"},  # Separate array elements into individual documents
    {"$group": {
        "_id": "$genres",
        "avg_rating": {"$avg": "$imdb.rating"},
        "count": {"$sum": 1},
        "max_rating": {"$max": "$imdb.rating"}
    }},
    {"$sort": {"avg_rating": -1}},
    {"$limit": 10}
]

result = movies.aggregate(pipeline)

print("Average ratings by genre:")
for genre in result:
    print(f"{genre['_id']}: {genre['avg_rating']:.2f} (from {genre['count']} movies)")

### 4 Working with Arrays - $unwind

In [None]:
# Count movies per director
pipeline = [
    {"$match": {"directors": {"$exists": True}}},
    {"$unwind": "$directors"},
    {"$group": {
        "_id": "$directors",
        "movie_count": {"$sum": 1},
        "movies": {"$push": "$title"}  # Collect movie titles
    }},
    {"$sort": {"movie_count": -1}},
    {"$limit": 5}
]

result = movies.aggregate(pipeline)

print("Most prolific directors:")
for director in result:
    print(f"\n{director['_id']}: {director['movie_count']} movies")
    print(f"  Sample titles: {', '.join(director['movies'][:3])}...")

### Exercise 2:

Create an aggregation pipeline to:
1. Find how many properties are listed in each country
2. Find the average price per night by property type in the Airbnb dataset
2. Sort by average price (descending)
3. Show only the top 10 property types

In [None]:
# Exercise 2: Number of properties per country
pipeline = [
    # YOUR CODE HERE
]

result = listings.aggregate(pipeline)

for doc in result:
    pprint(doc)

In [None]:
# Exercise 2: Average Airbnb price by property type
pipeline = [
    # YOUR CODE HERE
    # Hint: Use $match to filter where price exists
    # Use $group to group by property_type
    # Calculate average using $avg
]

result = listings.aggregate(pipeline)

for doc in result:
    pprint(doc)

---
## Advanced Aggregation

### 1. $lookup - Joining Collections

In [None]:
# Join movies with their comments
pipeline = [
    {"$match": {"title": "The Matrix"}},
    {"$lookup": {
        "from": "comments",
        "localField": "_id",
        "foreignField": "movie_id",
        "as": "movie_comments"
    }},
    {"$project": {
        "title": 1,
        "year": 1,
        "comment_count": {"$size": "$movie_comments"}
    }}
]

result = movies.aggregate(pipeline)

for doc in result:
    pprint(doc)

### 2 ```$addFields``` and ```$bucket``` - Data Transformation

In [None]:
# Categorize movies by rating ranges
pipeline = [
    {"$match": {"imdb.rating": {"$exists": True}}},
    {"$bucket": {
        "groupBy": "$imdb.rating",
        "boundaries": [0, 5, 7, 8, 9, 10],
        "default": "Other", # If a movie’s rating doesn’t fall into any of the specified, it goes into a bucket labeled "Other".
        "output": {
            "count": {"$sum": 1},
            "avg": {"$avg": "$imdb.rating"},
            "titles": {"$push": "$title"} #list of all movies in the bucket
        }
    }}
]

result = movies.aggregate(pipeline)

print("Movies by rating range:")
for bucket in result:
    print(f"Rating {bucket['_id']}: {bucket['count']} movies  {(bucket['avg'] or 0):.2f} avg rating")

### 3 ```$facet``` - Multiple Aggregations in One

In [None]:
# Get multiple statistics at once
pipeline = [
    {"$match": {"year": {"$gte": 2010}}},
    {"$facet": {
        "by_year": [
            {"$group": {"_id": "$year", "count": {"$sum": 1}}},
            {"$sort": {"_id": -1}},
            {"$limit": 5}
        ],
        "by_genre": [
            {"$unwind": "$genres"},
            {"$group": {"_id": "$genres", "count": {"$sum": 1}}},
            {"$sort": {"count": -1}},
            {"$limit": 5}
        ],
        "stats": [
            {"$group": {
                "_id": None,
                "total": {"$sum": 1},
                "avg_rating": {"$avg": "$imdb.rating"}
            }}
        ]
    }}
]

result = list(movies.aggregate(pipeline))[0]

print("Movies by Year:")
pprint(result['by_year'])
print("\nMovies by Genre:")
pprint(result['by_genre'])
print("\nOverall Stats:")
pprint(result['stats'])

### 4 Text Search and Computed Fields

In [None]:
# Calculate revenue efficiency (rating per dollar)
pipeline = [
    {"$match": {
        "price": {"$exists": True, "$gt": 0},
        "review_scores.review_scores_rating": {"$exists": True}
    }},
    {"$addFields": {
        "value_score": {
            "$divide": [
                "$review_scores.review_scores_rating",
                "$price"
            ]
        }
    }},
    {"$sort": {"value_score": -1}},
    {"$limit": 5},
    {"$project": {
        "name": 1,
        "price": 1,
        "rating": "$review_scores.review_scores_rating",
        "value_score": {"$round": ["$value_score", 2]}
    }}
]

result = listings.aggregate(pipeline)

print("Best value Airbnb listings:")
for listing in result:
    pprint(listing)

### Exercise 3

Build a pipeline to analyze movie trends:
1. Filter movies from 2000-2020
2. Group by year and calculate:
   - Average rating
   - Total number of movies
   - Most common genre (hint: use $unwind and $first)
3. Sort by year

In [None]:
# Exercise 3: Movie trends analysis
pipeline = [
    # YOUR CODE HERE
]

result = movies.aggregate(pipeline)

for doc in result:
    pprint(doc)

---
## Use cases

### 1 Geospatial Queries

In [None]:
# Find listings near a specific location (e.g., near a point in Barcelona)
pipeline = [
    {"$match": {
        "address.location": {
            "$near": {
                "$geometry": {
                    "type": "Point",
                    "coordinates": [2.1734, 41.3851]  # Barcelona coordinates
                },
                "$maxDistance": 5000  # 5km radius
            }
        }
    }},
    {"$limit": 5},
    {"$project": {
        "name": 1,
        "price": 1,
        "address.street": 1
    }}
]

# Note: This requires a geospatial index
listings.create_index([("address.location", "2dsphere")])

try:
    result = listings.aggregate(pipeline)
    print("Nearby listings:")
    for listing in result:
        pprint(listing)
except Exception as e:
    print(f"Note: Geospatial query requires 2dsphere index: {e}")

### 2 Time-Based Analysis

In [None]:
# Analyze movies by decade
pipeline = [
    {"$match": {"year": {"$gte": 1950}}},
    {"$addFields": {
        "decade": {
            "$multiply": [
                {"$floor": {"$divide": ["$year", 10]}},
                10
            ]
        }
    }},
    {"$group": {
        "_id": "$decade",
        "count": {"$sum": 1},
        "avg_rating": {"$avg": "$imdb.rating"},
        "total_votes": {"$sum": "$imdb.votes"}
    }},
    {"$sort": {"_id": 1}}
]

result = movies.aggregate(pipeline)

print("Movies by decade:")
for decade in result:
    print(f"{decade['_id']}s: {decade['count']} movies, "
          f"Avg rating: {decade.get('avg_rating', 0):.2f}")

### 3 Multi-Collection Analysis

In [None]:
# Find most commented movies with their details
pipeline = [
    {"$group": {
        "_id": "$movie_id",
        "comment_count": {"$sum": 1}
    }},
    {"$sort": {"comment_count": -1}},
    {"$limit": 5},
    {"$lookup": {
        "from": "movies",
        "localField": "_id",
        "foreignField": "_id",
        "as": "movie_details"
    }},
    {"$unwind": "$movie_details"},
    {"$project": {
        "title": "$movie_details.title",
        "year": "$movie_details.year",
        "rating": "$movie_details.imdb.rating",
        "comments": "$comment_count"
    }}
]

comments = mflix_db.comments
result = comments.aggregate(pipeline)

print("Most discussed movies:")
for movie in result:
    pprint(movie)

### Exercise 4

Create a report for Airbnb hosts showing:
1. Average price by number of bedrooms
2. Most popular amenities (hint: $unwind amenities)
3. Correlation between number of reviews and rating
4. Best performing property types

---
## Performance & Best Practices

### 1 Using Indexes

In [None]:
# Check existing indexes
print("Indexes on movies collection:")
for index in movies.list_indexes():
    pprint(index)

In [None]:
# Create a compound index for better query performance
movies.create_index([("year", 1), ("imdb.rating", -1)])
print("Index created for year (ascending) and rating (descending)")

### 2 Using explain() to Analyze Queries

In [None]:
# Analyze query performance
pipeline = [
    {"$match": {"year": 1900, "imdb.rating": {"$gt": 7}}},
    {"$sort": {"imdb.rating": -1}},
    {"$limit": 10}
]

# Get execution stats
explain_result = mflix_db.command(
    'aggregate',
    'movies',
    pipeline=pipeline,
    explain=True
)

print("Query Execution Plan:")
print(explain_result)

### MongoDB Pipelines Optimizations

### 1. Pipeline Reordering
MongoDB automatically reorders pipeline stages for better performance:

**Example:** If you write:
```javascript
[
  { $project: { title: 1, year: 1, rating: "$imdb.rating" } },
  { $match: { year: { $gte: 2010 } } }
]
```

MongoDB will **reorder** it to:
```javascript
[
  { $match: { year: { $gte: 2010 } } },  // Moved BEFORE $project
  { $project: { title: 1, year: 1, rating: "$imdb.rating" } }
]
```

**Why?** Filtering first reduces the number of documents that need projection.

### 2. Stage Coalescence (Merging)
MongoDB merges adjacent stages when possible:

**Multiple $match stages** get combined:
```javascript
{ $match: { year: 2010 } }
{ $match: { genre: "Action" } }
// Becomes: { $match: { year: 2010, genre: "Action" } }
```

**Multiple $project stages** get merged:
```javascript
{ $project: { title: 1, year: 1 } }
{ $project: { title: 1 } }
// Becomes: { $project: { title: 1 } }
```

### 3. Index Utilization
MongoDB pushes operations to the index when possible:

- **```$match```**: Uses indexes for filtering (if available)
- **```$sort```**: Can use indexes to avoid in-memory sorting
- **Combined ```$match``` + ```$sort```**: Most efficient when covered by a compound index

```javascript
// With index on { year: 1, "imdb.rating": -1 }
[
  { $match: { year: { $gte: 2010 } } },
  { $sort: { "imdb.rating": -1 } }
]
// Both stages use the index - no in-memory operations!
```

### 4. Projection Pushdown
MongoDB pushes $project stages as early as possible to reduce document size:

- Reduces memory usage
- Speeds up data transfer between pipeline stages
- Particularly important for large documents

### 5. Early Pipeline Termination
**`$limit optimization`**: When `$limit` appears early, MongoDB:
- Stops processing once the limit is reached
- Passes the limit constraint to earlier stages
- Can use indexes more efficiently (LIMIT + SKIP pattern)

```javascript
[
  { $sort: { rating: -1 } },
  { $limit: 10 }
]
// MongoDB only needs to track top 10, not sort everything!
```

### 6. Pipeline Splitting
Some operations cannot be optimized and create a "split point":

**Blocking operations** (prevent further optimization):
- `$group` - Requires all documents
- `$sort` (without index) - Requires full collection scan
- `$lookup` - Joins require materialization
- `$facet` - Multiple pipelines need separate execution

**Strategy**: Place filtering ($match) BEFORE these expensive operations!

### 7. Memory Management
- Default: 100MB per pipeline stage
- Exceeding this limit causes an error
- Use `allowDiskUse: true` for operations that exceed memory:

```python
collection.aggregate(pipeline, allowDiskUse=True)
```

**Note**: Disk usage is slower but allows processing larger datasets.

### 8. Sharded Cluster Optimization
In sharded environments, MongoDB intelligently routes pipeline stages:

- **Shard stage**: Operations run on each shard in parallel
- **Merge stage**: Results combined on a single shard/mongos

**Example**:
```javascript
[
  { $match: {...} },     // Runs on each shard
  { $group: {...} },     // Partial groups on each shard
  { $sort: {...} },      // Merge and sort on primary shard
  { $limit: 100 }
]
```

### Practical Optimizations

1. **Put `$match` first**: Always filter as early as possible
2. **Use `$project` early**: Reduce document size before expensive operations
3. **Add `$limit early`**: Stop processing unnecessary documents
4. **Create appropriate indexes**: Especially for $match and $sort
5. **Avoid `$lookup` when possible**: It's expensive; consider denormalization
6. **Use explain()**: Check if your pipeline uses indexes
7. **Monitor memory**: Use allowDiskUse for large datasets
8. **Batch `$group operations`**: Group by fewer fields first, then refine


---
### Cleanup

In [None]:
# Close the connection
mongo_client.close()
print("Connection closed. Workshop complete!")