In [1]:
# Import libraries
from pymongo import MongoClient
from pprint import pprint
from collections import OrderedDict
from itertools import groupby
from operator import itemgetter # Used in a sort options

In [2]:
# Client connects to "localhost" by default
client = MongoClient()

# Connect to "nobel" database on the fly
db = client["nobel"]

# 04. Aggregation Pipelines: Let the Server Do It For You

You've used projection, sorting, indexing, and limits to speed up data fetching. But there are still annoying performance bottlenecks in your analysis pipelines. You still need to fetch a ton of data. Thus, network bandwidth and downstream processing and memory capacity still impact performance. This chapter is about using MongoDB to perform aggregations for you on the server.

## 04.01 Intro to Aggregation

1. Intro to Aggregation: From Query Components to Aggregation Stages
>There are cases where you may want to avoid having to fetch and iterate over lots of data client-side. In this chapter, we'll learn how MongoDB can do a good chunk of your data analysis and aggregation for you. In this first lesson, we'll reproduce what we already know how to do with the "find" method of a collection. By doing so, we'll see how the implicit stages of a query can map to explicit stages of an aggregation pipeline.

2. Queries have implicit stages
>Here, we iterate over a cursor to yield prize-year information for a few USA-born laureates. I used indentation in this code to demarcate implicit stages. Also, I passed the arguments to "find" as keyword arguments to name these stages. The first stage filters for documents that match an expression. The second stage projects out fields I need downstream for output. Finally, the last stage limits the number of documents retrieved. With an aggregation pipeline, I make these stages explicit. An aggregation pipeline is a list, a sequence of stages, and it looks like this. Each stage involves a stage operator. Here's an aggregation that produces the same result as our call to "find" on the left. To filter for documents matching an expression, I use the match operator. To project fields, I use project. And to limit results, I use limit. This pipeline, in particular, has three stages. It matches documents for USA-born laureates. It strips the documents of all but prize years. And it yields only the first three.

3. Adding sort and skip stages
>Sorting and skipping are also available as pipeline stages. Here, we project prize years for USA-born laureates. We yield them in chronological order. Furthermore, we skip the first year and collect only the second, third, and fourth. One quirk of the sort stage in pymongo is that it requires a dictionary-like object. We can use the OrderedDict class in Python's included collections module. This class yields field-direction pairs in the order they are input. In the case of sorting by only one key, we can of course use a plain dictionary. I use the more general form here so that you know how to ensure compound indices.

4. But can I count?
>Finally, we can use a "count" stage to count the number of documents passed in from the previous stage. This count gets assigned to a field of your choosing. Here, I count the number of USA-born laureates. This aggregation, of course, is the same as the "count_documents" method of a collection. The other convenience method we know about for aggregation is "distinct". This method has a counterpart aggregation stage as well, which we'll cover in the next lesson.

5. Let's practice!
>You can now translate collection and cursor methods to aggregation pipeline stages. You've seen how to do this for all but the "distinct" method, which we'll cover later. Let's practice doing these translations before we learn about more-advanced aggregation capabilities.

In [3]:
# Exploring the data
print('One document from "laureates" collection...')
pprint(db.laureates.find_one({}))

One document from "laureates" collection...
{'_id': ObjectId('6035cd48354dd8e354623266'),
 'born': '1853-07-18',
 'bornCity': 'Arnhem',
 'bornCountry': 'the Netherlands',
 'bornCountryCode': 'NL',
 'died': '1928-02-04',
 'diedCountry': 'the Netherlands',
 'diedCountryCode': 'NL',
 'firstname': 'Hendrik Antoon',
 'gender': 'male',
 'id': '2',
 'prizes': [{'affiliations': [{'city': 'Leiden',
                               'country': 'the Netherlands',
                               'name': 'Leiden University'}],
             'category': 'physics',
             'motivation': '"in recognition of the extraordinary service they '
                           'rendered by their researches into the influence of '
                           'magnetism upon radiation phenomena"',
             'share': '2',
             'year': '1902'}],
 'surname': 'Lorentz'}


In [4]:
# Queries have implicit stages
cursor = db.laureates.find(
            filter={"bornCountry": "USA"},
            projection={"prizes.year": 1},
            limit=3
         )
for doc in cursor:
    print(doc["prizes"])

[{'year': '1972'}]
[{'year': '1972'}]
[{'year': '1975'}]


In [5]:
# Queries have implicit stages - aggregation
cursor = db.laureates.aggregate([
            {"$match": {"bornCountry": "USA"}},
            {"$project": {"prizes.year": 1}},
            {"$limit": 3}
         ])
for doc in cursor:
    print(doc["prizes"])

[{'year': '1972'}]
[{'year': '1972'}]
[{'year': '1975'}]


In [6]:
# Adding sort and skip stages
cursor = list(db.laureates.aggregate([
                {"$match": {"bornCountry": "USA"}},
                {"$project": {"prizes.year": 1, "_id": 0}},
                {"$sort": OrderedDict([("prizes.year", 1)])},
                {"$skip": 1},
                {"$limit": 3}
         ]))
pprint(cursor)

[{'prizes': [{'year': '1912'}]},
 {'prizes': [{'year': '1914'}]},
 {'prizes': [{'year': '1919'}]}]


In [7]:
# But can I count? - aggregation
cursor = list(db.laureates.aggregate([
                {"$match": {"bornCountry": "USA"}},
                {"$count": "n_USA-born-laureates"}
         ]))
print(cursor)

# But can I count? - count_documents
print(db.laureates.count_documents({"bornCountry": "USA"}))

[{'n_USA-born-laureates': 269}]
269


# 04.02 Sequencing stages

Here is a cursor, followed by four aggregation pipeline stages:

<code>
cursor = (db.laureates.find(
                    projection={"firstname": 1, "prizes.year": 1, "_id": 0},
                    filter={"gender": "org"})
            .limit(3).sort("prizes.year", -1))

project_stage = {"$project": {"firstname": 1, "prizes.year": 1, "_id": 0}}
match_stage = {"$match": {"gender": "org"}}
limit_stage = {"$limit": 3}
sort_stage = {"$sort": {"prizes.year": -1}}
</code>

**Instructions**

What sequence pipeline of the above four stages can produce a cursor db.laureates.aggregate(pipeline) equivalent to cursor above?

**Possible Answers**
1. [project_stage, match_stage, limit_stage, sort_stage]
2. [project_stage, match_stage, sort_stage, limit_stage]
3. [match_stage, project_stage, limit_stage, sort_stage]
4. __[match_stage, project_stage, sort_stage, limit_stage]__

**Results**

<font color=darkgreen>Yay! Note that you may need to put a $sort stage before a $project stage if you are sorting on a field left out in that projection (not the case in this exercise).</font>

In [8]:
cursor = (db.laureates.find(
    projection={"firstname": 1, "prizes.year": 1, "_id": 0},
    filter={"gender": "org"})
 .limit(3).sort("prizes.year", -1))

project_stage = {"$project": {"firstname": 1, "prizes.year": 1, "_id": 0}}
match_stage = {"$match": {"gender": "org"}}
limit_stage = {"$limit": 3}
sort_stage = {"$sort": {"prizes.year": -1}}

print('Using db.laureates.find...')
pprint(list(cursor))

cursor = list(db.laureates.aggregate([match_stage, project_stage, sort_stage, limit_stage]))
print('\nUsing db.laureates.aggregate...')
pprint(cursor)

Using db.laureates.find...
[{'firstname': 'International Campaign to Abolish Nuclear Weapons (ICAN)',
  'prizes': [{'year': '2017'}]},
 {'firstname': 'National Dialogue Quartet', 'prizes': [{'year': '2015'}]},
 {'firstname': 'Organisation for the Prohibition of Chemical Weapons (OPCW)',
  'prizes': [{'year': '2013'}]}]

Using db.laureates.aggregate...
[{'firstname': 'International Campaign to Abolish Nuclear Weapons (ICAN)',
  'prizes': [{'year': '2017'}]},
 {'firstname': 'National Dialogue Quartet', 'prizes': [{'year': '2015'}]},
 {'firstname': 'Organisation for the Prohibition of Chemical Weapons (OPCW)',
  'prizes': [{'year': '2013'}]}]


# 04.03 Aggregating a few individuals' country data

The following query cursor yields birth-country and prize-affiliation-country information for three non-organization laureates:

<code>
cursor = (db.laureates.find(
    {"gender": {"$ne": "org"}},
    ["bornCountry", "prizes.affiliations.country"]
).limit(3))
</code>

***Instructions***

1. Translate the above cursor cursor to an equivalent aggregation cursor, saving the pipeline stages to pipeline. Recall that the find collection method's "filter" parameter maps to the "$match" aggregation stage, its "projection" parameter maps to the 2. "$project" stage, and the "limit" parameter (or cursor method) maps to the "$limit" stage.

**Results**

<font color=darkgreen>Terrific translation! Note that the specification(s) of a '$project' stage must be in the form of a dictionary, whereas they can be passed as a list to <collection>.find.</font>

In [9]:
print('Using find method...')
cursor = (db.laureates.find(
    {"gender": {"$ne": "org"}},
    ["bornCountry", "prizes.affiliations.country"]
).limit(3))
pprint(list(cursor))

# Translate cursor to aggregation pipeline
print('\nUsing find aggregate...')
pipeline = [
    {"$match": {"gender": {"$ne": "org"}}},
    {"$project": {"bornCountry": 1, "prizes.affiliations.country": 1}},
    {"$limit": 3}
]

for doc in db.laureates.aggregate(pipeline):
    print("{bornCountry}: {prizes}".format(**doc))

Using find method...
[{'_id': ObjectId('6035cd48354dd8e354623266'),
  'bornCountry': 'the Netherlands',
  'prizes': [{'affiliations': [{'country': 'the Netherlands'}]}]},
 {'_id': ObjectId('6035cd48354dd8e354623267'),
  'bornCountry': 'USA',
  'prizes': [{'affiliations': [{'country': 'USA'}]}]},
 {'_id': ObjectId('6035cd48354dd8e354623268'),
  'bornCountry': 'USA',
  'prizes': [{'affiliations': [{'country': 'USA'}]}]}]

Using find aggregate...
the Netherlands: [{'affiliations': [{'country': 'the Netherlands'}]}]
USA: [{'affiliations': [{'country': 'USA'}]}]
USA: [{'affiliations': [{'country': 'USA'}]}]


# 04.04 Passing the aggregation baton to Python

Construct an aggregation pipeline to collect, in reverse chronological order (i.e., descending year), prize documents for all original categories (that is, __$in__ categories awarded in 1901). Project only the prize year and category (including document _id is fine).

The aggregation cursor will be fed to Python's __itertools.groupby__ function to group prizes by year. For each year that at least one of the original prize categories was missing, a line with all missing categories for that year will be printed.

**Instructions**

1. Save to pipeline an aggregation pipeline to collect prize documents as detailed above. Use Python's collections.OrderedDict to specify any sorting.

**Results**

<font color=darkgreen>Perfect pipelining! We will learn in the next lesson how Mongo can do grouping for us in a $group stage.</font>

In [10]:
# Exploring the data
print('One document from "prizes" collection...')
pprint(db.prizes.find_one({}))

One document from "prizes" collection...
{'_id': ObjectId('6035cd48354dd8e354623018'),
 'category': 'physics',
 'laureates': [{'firstname': 'Arthur',
                'id': '960',
                'motivation': '"for the optical tweezers and their application '
                              'to biological systems"',
                'share': '2',
                'surname': 'Ashkin'},
               {'firstname': 'Gérard',
                'id': '961',
                'motivation': '"for their method of generating high-intensity, '
                              'ultra-short optical pulses"',
                'share': '4',
                'surname': 'Mourou'},
               {'firstname': 'Donna',
                'id': '962',
                'motivation': '"for their method of generating high-intensity, '
                              'ultra-short optical pulses"',
                'share': '4',
                'surname': 'Strickland'}],
 'overallMotivation': '“for groundbreaking inventions in

In [11]:
original_categories = set(db.prizes.distinct("category", {"year": "1901"}))

# Save an pipeline to collect original-category prizes
pipeline = [
    {'$match': {'category': {'$in': list(original_categories)}}},
    {'$project': {'year': 1, 'category': 1}},
    {'$sort': OrderedDict([('year', -1)])}
]

cursor = db.prizes.aggregate(pipeline)
for key, group in groupby(cursor, key=itemgetter("year")):
    missing = original_categories - {doc["category"] for doc in group}
    if missing:
        print("{year}: {missing}".format(year=key, missing=", ".join(sorted(missing))))

2018: literature
1972: peace
1967: peace
1966: peace
1956: peace
1955: peace
1948: peace
1943: literature, peace
1939: peace
1935: literature
1934: physics
1933: chemistry
1932: peace
1931: physics
1928: peace
1925: medicine
1924: chemistry, peace
1923: peace
1921: medicine
1919: chemistry
1918: literature, medicine, peace
1917: chemistry, medicine
1916: chemistry, medicine, peace, physics
1915: medicine, peace
1914: literature, peace


# 04.05 Aggregation Operators and Grouping

1. Back to Counting:
>In the last lesson, we learned how to translate the implicit stages of a query to aggregation stages. Now, let's dip our toes into more-advanced aggregation capabilities.

2. Field paths
>Aggregation stages can use expressions that contain field paths. To see this in action, first let's clarify some terminology. An expression object has the form "field1, expression1, dot dot dot". It's what you pass to an aggregation stage. Here, we pass an expression object to a "project" stage. The object has one key, "prizes-dot-share", with a corresponding expression value of 1. In contrast, here we project a field that we call "n_prizes". The field takes the value of the expression "dollar-size maps to dollar-prizes". The string dollar-prizes is a field path. It takes the value of the prizes field for each document processed at that stage by the pipeline. Note that you can create new fields, or overwrite old ones, during aggregation.

3. Operator expressions
>The other new concept here is the operator expression, which treats an operator as a function. The expression applies the operator to one or more arguments and returns a value. Here, the size operator take the field path dollar-prizes as an argument. Thus, the expression object assigns the field n-prizes to the size of the prizes array. We could also write the operator expression as taking a list of one element, and we get the same result. For convenience, when an operator only has one parameter, we can omit the brackets as above.

4. One more example: a multi-parameter operator
>Many operators available in query filters have counterparts for aggregation. For example, here I use the dollar-in operator, which takes two parameters. To get the array of prize shares for a laureate, I use a field path. I then project a new field, "solo winner", which is true if and only if the array of prize shares contains the string value "1".

5. Implementing .distinct()
>Now we know a bit about expressions and field paths. Let's translate the "distinct" collection method to an aggregation. Here I use a new stage, dollar-group. A group stage takes an expression object that must map the underscore-id field. As for any MongoDB document, the underscore-id field must be unique. In this case, each output document will have as its id a distinct value of the bornCountry field. All bornCountry values get captured because no match stage precedes the group stage. Thus, our list comprehension collecting id values collects all distinct bornCountry values. This includes the value None, which happens when a field is not present in a document.

6. How many prizes have been awarded in total?
>Let's combine a group stage with a project stage. How many prizes has the Nobel committee awarded? The project stage is familiar to us from a few slides back, but what about this group stage? The underscore-id gets mapped to None for every document. This means one and only one document will emerge from the group stage. This one document maps a new field, n-prizes-total, to an operator expression. Some operators, like dollar-sum here, act as accumulators in a group stage. This means they don't operate only on one document. Rather, they have state and will accumulate a value as one document after another of a group gets passed to it. Here, we compute the sum of lengths of all prizes arrays across all laureates. We do this without sending a single laureate document down the wire. Aggregations like this can save a lot of time and bandwidth for very large collections.

7. Let's practice!
>Okay, time to practice using field paths, operator expressions, and group stages for aggregation.

In [12]:
# Field paths
print('FIELDS PATH...')
print(list(db.laureates.aggregate([{"$project": {"prizes.share": 1, '_id': 0}}]))[:2])
print(db.laureates.aggregate([{"$project": {"prizes.share": 1, '_id': 0}}]).next())

print(db.laureates.aggregate([{"$project": {"n_prizes": {"$size": "$prizes"}, '_id': 0}}]).next())

# Operator expressions
# We could also write the operator expression as taking a list of one element, and we get the same result.
# For convenience, when an operator only has one parameter, we can omit the brackets as above.
print('\nOPERATOR EXPRESSIONS...')
print(db.laureates.aggregate([{"$project": {"n_prizes": {"$size": ["$prizes"]}, '_id': 0}}]).next())

# One more example: a multi-parameter operator
# Here I use the dollar-in operator, which takes two parameters. I then project a new field, "solo winner", 
# which is true if and only if the array of prize shares contains the string value "1".
print('\nMULTI PARAMETER OPERATOR EXPRESSIONS...')
print(db.laureates.aggregate([{"$project": {"solo_winner": {"$in": ["1", "$prizes.share"]}, '_id': 0}}]).next())

# Implementing .distinct()
list_1 = list(db.laureates.distinct("bornCountry", 
                                    {"prizes.share": "4"}))
pprint(list_1)
# A group stage takes an expression object that must map the underscore-id field. 
# In this case, each output document will have as its id a distinct value of the bornCountry field.
# This includes the value None, which happens when a field is not present in a document.
list_2 = list(db.laureates.aggregate([
            {"$match": {"prizes.share": "4"}},
            {"$group": {"_id": "$bornCountry"}},
         ]))
pprint(list_2)

list_2 = [doc["_id"] for doc in list_2]
print(set(list_2) - {None} == set(list_1))

# How many prizes have been awarded in total?
print("\nHow many prizes have been awarded in total?")
data = list(db.laureates.aggregate([
                {"$match"  : {"prizes.share": "4"}},
                {"$project": {"n_prizes": {"$size": "$prizes"}}},
                {"$group": {"_id": None, "n_prizes_total": {"$sum": "$n_prizes"}}}
        ]))
print(data)
# How many prizes have been awarded per born country?
print("\nHow many prizes have been awarded per born country?")
data = list(db.laureates.aggregate([
                {"$match"  : {"prizes.share": "4"}},
                {"$project": {'bornCountry': 1, 
                              "n_prizes": {"$size": "$prizes"}}},
                {"$group"  : {"_id": '$bornCountry', 
                              "n_prizes_total": {"$sum": "$n_prizes"}}}
        ]))
pprint(data)

FIELDS PATH...
[{'prizes': [{'share': '2'}]}, {'prizes': [{'share': '3'}]}]
{'prizes': [{'share': '2'}]}
{'n_prizes': 1}

OPERATOR EXPRESSIONS...
{'n_prizes': 1}

MULTI PARAMETER OPERATOR EXPRESSIONS...
{'solo_winner': False}
['Australia',
 'Austria-Hungary (now Czech Republic)',
 'Canada',
 'France',
 'Germany',
 'Germany (now Poland)',
 'Ireland',
 'Japan',
 'Luxembourg',
 'Norway',
 'Poland (now Lithuania)',
 'Prussia (now Germany)',
 'Russian Empire (now Poland)',
 'Sweden',
 'Switzerland',
 'USA',
 'USSR (now Belarus)',
 'USSR (now Russia)',
 'United Kingdom',
 'West Germany (now Germany)',
 'the Netherlands']
[{'_id': 'Russian Empire (now Poland)'},
 {'_id': 'Norway'},
 {'_id': 'Prussia (now Germany)'},
 {'_id': 'Germany (now Poland)'},
 {'_id': 'USSR (now Belarus)'},
 {'_id': 'Sweden'},
 {'_id': 'Ireland'},
 {'_id': 'Australia'},
 {'_id': 'Canada'},
 {'_id': 'Luxembourg'},
 {'_id': 'Germany'},
 {'_id': 'USA'},
 {'_id': 'West Germany (now Germany)'},
 {'_id': 'Austria-Hungary (no

## 04.06 Field Paths and Sets

Previously, we confirmed -- via a Python loop -- that for each prize, either all laureates have a 1/3 share, or none do. Now, let's do this via an aggregation (result should be an empty list):

<code>list(db.prizes.aggregate([
    {"$project": {"allThree": {"$setEquals": [____, ____]},
                  "noneThree": {"$not": {"$setIsSubset": [____, ____]}}}},
    {"$match": {"$nor": [{"allThree": True}, {"noneThree": True}]}}]))
</code>

**Instructions**

1. Which values fill the blanks?

**Possible Answers**

1. __<code>"\$laureates.share", ["3"], ["3"], "$laureates.share"</code> Correct!__

2. <code>"laureates.share"  , ["3"], ["3"], "laureates.share" </code>

3. <code>"laureates.share"  , {"3"}, {"3"}, "laureates.share" </code>

4. <code>"$laureates.share" , {"3"}, {"3"}, "$laureates.share"</code>

**Results**

<font color=darkgreen>Correct! Field paths in operator expressions are prepended by "$" to distinguish them from literal string values, and JSON/MongoDB "sets" are delimited by square brackets, just like lists.</font>

In [13]:
# Limiting our exploration
for doc in db.prizes.find({}, ["laureates.share"]):
    share_is_three = [laureate["share"] == "3" for laureate in doc["laureates"]]
    
    assert all(share_is_three) or not any(share_is_three)
print(share_is_three)

print(list(db.prizes.aggregate([
        {"$project": {"allThree": {"$setEquals": [[3], '$laureates.share']},
                      "noneThree": {"$not": {"$setIsSubset": [[3], '$laureates.share']}}}},
        {"$match": {"$nor": [{"allThree": True}, {"noneThree": True}]}}])))

[False, False]
[]


# 04.07 Organizing prizes

In the slides at the beginning of this lesson, we saw a two-stage aggregation pipeline to determine the number of prizes awarded in total. How many prizes were awarded (at least partly) to organizations?

**Instructions**

1. Fill out pipeline to determine the number of prizes awarded (at least partly) to organizations. To do this, you'll first need to $\$$match on the "gender" that designates organizations.
2. Then, use a field path to project the number of prizes for each organization as the "$\$$size" of the "prizes" array. Recall that to specify the value of a field "<my_field>", you use the field path "$\$$\<my_field>".
4. Finally, use a single group {"_id": None} to sum over the values of all organizations' prize counts.

**Results**

<font color=darkgreen>Well done! I hope you can envision iteratively building and rebuilding an aggregation pipeline to drill down into and ask related questions about MongoDB data collections.</font>

In [14]:
# Count prizes awarded (at least partly) to organizations as a sum over sizes of "prizes" arrays.
pipeline = [
    {'$match': {'gender': "org"}},
    {"$project": {"n_prizes": {"$size": "$prizes"}}},
    {"$group": {"_id": None, "n_prizes_total": {"$sum": '$n_prizes'}}}
]

print(list(db.laureates.aggregate(pipeline)))

[{'_id': None, 'n_prizes_total': 27}]


# 04.08 Gap years, aggregated

In a previous exercise, you collected instances of prize categories not being awarded in particular years. You implemented this using a for loop in Python. You will now implement this as an aggregation pipeline that:

1. Filters for original prize categories (i.e. sans economics),
2. Projects category and year,
3. Groups distinct prize categories awarded by year,
4. Projects prize categories not awarded by year,
5. Filters for years with missing prize categories, and
6. Returns a cursor of documents in reverse chronological order, one per year, each with a list of missing prize categories for that year.

Remember to use field paths (precede field names with "$") to extract field values in expressions.

**Instructions**

1. Make the $\$$group stage output a document for each prize year (set "_id" to the field path for year) with the set of categories awarded that year.
2. Given your intermediate collection of year-keyed documents, $\$$project a field named "missing" with the (original) categories not awarded that year. Again, mind your field paths!
3. Use a $\$$match stage to only pass through documents with at least one missing prize category.
4. Finally, add sort documents in descending order.

**Results**

<font color=darkgreen>Beautiful! MongoDB has a rich library of aggregation operators, so it is possible to refactor a variety of client-side analyses into server-side aggregations depending on your performance requirements.</font>

In [15]:
original_categories = sorted(set(db.prizes.distinct("category", {"year": "1901"})))
pipeline = [
    {"$match": {"category": {"$in": original_categories}}},
    {"$project": {"category": 1, "year": 1}},
    
    # Collect the set of category values for each prize year.
    {"$group": {"_id": '$year', "categories": {"$addToSet": "$category"}}},
    
    # Project categories *not* awarded (i.e., that are missing this year).
    {"$project": {"missing": {"$setDifference": [original_categories, '$categories']}}},
    
    # Only include years with at least one missing category
    {"$match": {"missing.0": {"$exists": True}}},
    
    # Sort in reverse chronological order. Note that "_id" is a distinct year at this stage.
    {"$sort": OrderedDict([("_id", -1)])},
]

for doc in db.prizes.aggregate(pipeline):
    print("{year}: {missing}".format(year=doc["_id"],missing=", ".join(sorted(doc["missing"]))))

2018: literature
1972: peace
1967: peace
1966: peace
1956: peace
1955: peace
1948: peace
1943: literature, peace
1939: peace
1935: literature
1934: physics
1933: chemistry
1932: peace
1931: physics
1928: peace
1925: medicine
1924: chemistry, peace
1923: peace
1921: medicine
1919: chemistry
1918: literature, medicine, peace
1917: chemistry, medicine
1916: chemistry, medicine, peace, physics
1915: medicine, peace
1914: literature, peace


# 04.09 Zoom into Array Fields

1. Zoom into Array Fields with \$unwind
>Documents can have array-valued fields, and aggregation stages can introduce them. In this lesson, we'll learn a tool to access array elements during aggregation.

2. Sizing and summing
>Let's say we want the number of laureates for each prize. One way to do this is to project a field using the dollar-size operator. We can then add a stage to group by prize category, producing a count of laureates per category. I remove the projection of year in this second pipeline, as there is no need for it. Then, I reset the n_laureates field to be the sum of n_laureates values over each category. Finally, I sort by descending count.

3. How to \$unwind
>How might we use individual elements of the laureates array? One powerful option is the dollar-unwind stage. This outputs one pipeline document per array element. Here, we unwind the laureates field across three documents.

4. Renormalization, anyone?
>We can use stages following an unwind to recompress data. What if we want to normalize our data and track only laureate ids for each prize? After all, we can fetch more information from the laureates collection. Here, we get a list of laureate ids for each prize. After unwinding the laureates array, we project year, category, and laureate id. Year and category together identify a prize. So, we can group by a concatenation of those values. I use the addToSet operator in the group stage to collect laureate ids for each prize, and there you have it. I could also have grouped by underscore-id. But, the category-year combo is more readable, and I introduced you to a new operator!

5. \$unwind and count 'em, one by one
>Here's another way to understand the unwind operator. Before, we used the size operator to project the number of laureates per prize. This projection fed into a group stage to output counts by category. Instead of projecting sizes and summing over them, we can unwind and count documents. The group stage here counts the documents per category fed to it by the unwind stage. The two pipelines shown produce the same result.

6. \$lookup
>Finally, let's see a stage that often accompanies unwinding: dollar-lookup. This stage pulls in documents from another collection via what's termed a left outer join. Let's collect countries of birth for economics laureates. From the prizes collection, we first unwind the laureates array. Each pipeline document now has a single laureates-dot-id. Then, we query the laureates collection for documents with the same value for id. For each one we find, we push it into an array we name "laureate bios". Next, we collect the distinct laureate bornCountry values. We want to feed single bornCountry values, not arrays, to the \$addToSet operator. Hence, we unwind before the group stage. Is there an easier way to do this? Sure! MongoDB doesn't enforce a normalized schema. Thus, you can tailor a collection's schema to support query simplicity and efficiency. We know the laureates collection stores info on prize categories as well. So, this one-liner produces the same result as the five-stage aggregation pipeline above. Even so, it's good to know that you can perform server-side joins in a pinch.

7. Time to unwind... with exercises!
>Sometimes, it feels good to unwind. Let's practice.

In [16]:
# Sizing and summing
print('** SIZING AND SUMMING...')
print('Laureates per year/category before 1903:')
data = list(db.prizes.aggregate([
                {"$project": {"year"       : 1, 
                              "category"   : 1, 
                              "n_laureates": {"$size": "$laureates"},
                              "_id"        : 0}},
                {"$match"  : {'year'       : {"$lt"  : '1903'}}},
                {"$sort"   : {'category'   : 1}}
       ]))
pprint(data)

print('\nLaureates per category before 1903:')
data = list(db.prizes.aggregate([
                {"$match"  : {'year'       : {"$lt"  : '1903'}}},
                {"$project": {"category"   : 1,
                              "n_laureates": {"$size": "$laureates"}}},
                {"$group"  : {"_id"        : "$category", 
                              "n_laureates": {"$sum" : "$n_laureates"}}},
                {"$sort"   : {"n_laureates": -1}},
       ]))
pprint(data)

** SIZING AND SUMMING...
Laureates per year/category before 1903:
[{'category': 'chemistry', 'n_laureates': 1, 'year': '1902'},
 {'category': 'chemistry', 'n_laureates': 1, 'year': '1901'},
 {'category': 'literature', 'n_laureates': 1, 'year': '1902'},
 {'category': 'literature', 'n_laureates': 1, 'year': '1901'},
 {'category': 'medicine', 'n_laureates': 1, 'year': '1902'},
 {'category': 'medicine', 'n_laureates': 1, 'year': '1901'},
 {'category': 'peace', 'n_laureates': 2, 'year': '1902'},
 {'category': 'peace', 'n_laureates': 2, 'year': '1901'},
 {'category': 'physics', 'n_laureates': 2, 'year': '1902'},
 {'category': 'physics', 'n_laureates': 1, 'year': '1901'}]

Laureates per category before 1903:
[{'_id': 'peace', 'n_laureates': 4},
 {'_id': 'physics', 'n_laureates': 3},
 {'_id': 'medicine', 'n_laureates': 2},
 {'_id': 'literature', 'n_laureates': 2},
 {'_id': 'chemistry', 'n_laureates': 2}]


In [17]:
# How to $unwind
print('\n** HOW TO $UNWIND...')
print('Laureatees in 1901 present in prizes collections:')
data = list(db.prizes.aggregate([
                {"$match"  : {'year': '1901'}},
                {"$unwind" : "$laureates"},
                {"$project": {"_id": 0, 
                              "year": 1, 
                              "category": 1,
                              "laureates.id": 1,
                              "laureates.surname": 1, 
                              "laureates.share": 1}},
                {"$sort"   : {'category'   : 1}}
               #{"$limit"   : 3}
       ]))
pprint(data)


** HOW TO $UNWIND...
Laureatees in 1901 present in prizes collections:
[{'category': 'chemistry',
  'laureates': {'id': '160', 'share': '1', 'surname': "van 't Hoff"},
  'year': '1901'},
 {'category': 'literature',
  'laureates': {'id': '569', 'share': '1', 'surname': 'Prudhomme'},
  'year': '1901'},
 {'category': 'medicine',
  'laureates': {'id': '293', 'share': '1', 'surname': 'von Behring'},
  'year': '1901'},
 {'category': 'peace',
  'laureates': {'id': '462', 'share': '2', 'surname': 'Dunant'},
  'year': '1901'},
 {'category': 'peace',
  'laureates': {'id': '463', 'share': '2', 'surname': 'Passy'},
  'year': '1901'},
 {'category': 'physics',
  'laureates': {'id': '1', 'share': '1', 'surname': 'Röntgen'},
  'year': '1901'}]


In [18]:
# Renormalization, anyone?
print('\n** RENORMALIZATION, ANYONE?...')
data = list(db.prizes.aggregate([
                {"$match"  : {'year'        : '1901'}},
                {"$unwind" : "$laureates"},
                {"$project": {"year"        : 1, 
                              "category"    : 1, 
                              "laureates.id": 1}},
                {"$group"  : {"_id"         : {"$concat": ["$category", ":", "$year"]},
                              "laureate_ids": {"$addToSet": "$laureates.id"}}},
                {"$sort"   : {'_id'   : 1}}
               #{"$limit": 5}
       ]))
pprint(data)


** RENORMALIZATION, ANYONE?...
[{'_id': 'chemistry:1901', 'laureate_ids': ['160']},
 {'_id': 'literature:1901', 'laureate_ids': ['569']},
 {'_id': 'medicine:1901', 'laureate_ids': ['293']},
 {'_id': 'peace:1901', 'laureate_ids': ['462', '463']},
 {'_id': 'physics:1901', 'laureate_ids': ['1']}]


In [19]:
# $unwind and count 'em, one by one
print("\n$UNWIND AND COUNT 'EM, ONE BY ONE...")
print('Laureates per category in 1901 (using ""$group"):')
data = list(db.prizes.aggregate([
                {"$match"  : {'year'       : '1901'}},
                {"$project": {"category"   : 1,
                              "n_laureates": {"$size": "$laureates"}}},
                {"$group"  : {"_id"        : "$category", 
                              "n_laureates": {"$sum" : "$n_laureates"}}},
                {"$sort"   : {"n_laureates": -1}},
       ]))
pprint(data)

print('\nLaureates per category in 1901 (using ""$unwind"):')
data = list(db.prizes.aggregate([
                {"$match"  : {'year'       : '1901'}},
                {"$unwind" : "$laureates"},
                {"$group"  : {"_id"        : "$category", 
                              "n_laureates": {"$sum" : 1}}},
                {"$sort"   : {"n_laureates": -1}},
       ]))
pprint(data)


$UNWIND AND COUNT 'EM, ONE BY ONE...
Laureates per category in 1901 (using ""$group"):
[{'_id': 'peace', 'n_laureates': 2},
 {'_id': 'physics', 'n_laureates': 1},
 {'_id': 'chemistry', 'n_laureates': 1},
 {'_id': 'literature', 'n_laureates': 1},
 {'_id': 'medicine', 'n_laureates': 1}]

Laureates per category in 1901 (using ""$unwind"):
[{'_id': 'peace', 'n_laureates': 2},
 {'_id': 'physics', 'n_laureates': 1},
 {'_id': 'chemistry', 'n_laureates': 1},
 {'_id': 'literature', 'n_laureates': 1},
 {'_id': 'medicine', 'n_laureates': 1}]


In [20]:
# $lookup
print('\n$LOOKUP')
# This stage pulls in documents from another collection via what's termed a left outer join. 
# Let's collect countries of birth for economics laureates.
print('Laureates per category in 1901:')
data = list(db.prizes.aggregate([
                {"$match": {'year'       : '1901'}},
                {"$unwind": "$laureates"},
                {"$lookup": {"from": "laureates", "foreignField": "id",
                             "localField": "laureates.id", "as": "laureate_bios"}},
                {"$project": {"category": 1,
                              'laureate_bios.id': 1,
                              'laureate_bios.surname':1,
                              "laureate_bios.bornCountry": 1,
                              '_id': 0}},
                {"$sort"   : {"category": 1}},
       ]))
pprint(data)

print('\nReconfigurating to group the bornCountries per category in 1901:')
data = list(db.prizes.aggregate([
                {"$match"  : {'year'       : '1901'}},
                {"$unwind" : "$laureates"},
                {"$lookup" : {"from": "laureates", "foreignField": "id",
                             "localField": "laureates.id", "as": "laureate_bios"}},
                {"$project": {"category": 1,
                              'laureate_bios.id': 1,
                              "laureate_bios.bornCountry": 1,
                              '_id': 0}},
                {"$unwind" : "$laureate_bios"},
                {"$group"  : {"_id": '$category',
                              "bornCountries": {"$addToSet": "$laureate_bios.bornCountry"}}},
                {"$sort"   : {"_id": 1, 
                              'bornCountries': 1}},
       ]))
pprint(data)

print('\nReconfigurating to group the bornCountries in 1901:')
data = list(db.prizes.aggregate([
                {"$match"  : {'year'       : '1901'}},
                {"$unwind" : "$laureates"},
                {"$lookup" : {"from": "laureates", "foreignField": "id",
                             "localField": "laureates.id", "as": "laureate_bios"}},
                {"$project": {"category": 1,
                              'laureate_bios.id': 1,
                              "laureate_bios.bornCountry": 1,
                              '_id': 0}},
                {"$unwind" : "$laureate_bios"},
                {"$group"  : {"_id": None,
                              "bornCountries": {"$addToSet": "$laureate_bios.bornCountry"}}},
                {"$sort"   : {"_id": 1, 
                              'bornCountries': 1}},
       ]))
pprint(data)
print(data[0]['bornCountries'])

print('\nTaking the data from laureates collections:')
bornCountries = db.laureates.distinct(
                    key = "bornCountry", 
                    filter = {"prizes.year": "1901"}
                )
print(bornCountries)
assert set(bornCountries) == set(data[0]['bornCountries'])


$LOOKUP
Laureates per category in 1901:
[{'category': 'chemistry',
  'laureate_bios': [{'bornCountry': 'the Netherlands',
                     'id': '160',
                     'surname': "van 't Hoff"}]},
 {'category': 'literature',
  'laureate_bios': [{'bornCountry': 'France',
                     'id': '569',
                     'surname': 'Prudhomme'}]},
 {'category': 'medicine',
  'laureate_bios': [{'bornCountry': 'Prussia (now Poland)',
                     'id': '293',
                     'surname': 'von Behring'}]},
 {'category': 'peace',
  'laureate_bios': [{'bornCountry': 'Switzerland',
                     'id': '462',
                     'surname': 'Dunant'}]},
 {'category': 'peace',
  'laureate_bios': [{'bornCountry': 'France',
                     'id': '463',
                     'surname': 'Passy'}]},
 {'category': 'physics',
  'laureate_bios': [{'bornCountry': 'Prussia (now Germany)',
                     'id': '1',
                     'surname': 'Röntgen'}]}]

Re

# 04.10 Embedding aggregation expressions

**Instructions**

The $\$$expr operator allows embedding of aggregation expressions in a normal query (or in a $\$$match stage). Which of the following expressions counts the number of laureate documents with string-valued bornCountries when passed to db.laureates.count_documents?

You can assume (and check!) that the following is true:

<code>assert all(isinstance(v, str) for v in db.laureates.distinct("bornCountry"))</code>

**Possible Answers**

1. <code>{"bornCountry": {"$in": db.laureates.distinct("bornCountry")}}</code> <font color=red>This produces the correct count, but so do the other options.</font>

2. <code>{"$expr": {"$in": ["$bornCountry", db.laureates.distinct("bornCountry")]}}</code>

3. <code>{"$expr": {"$eq": [{"$type": "$bornCountry"}, "string"]}}</code>

4. <code>{"bornCountry": {"$type": "string"}}</code>

5. __All of the above__ Correct!

**Results**

<font color=darkgreen>Correct! Though aggregation expressions different in syntax, they often correspond to familiar query expressions.</font>

In [21]:
#print('Exploring laureates collection...')
#pprint(db.laureates.find_one({'bornCountry': {'$exists': False}}))
#pprint(db.laureates.find_one({'prizes.year': {'$exists': False}}))

#print('\nDistinct bornCountry per year...')
#data = list(db.laureates.aggregate([
#                {"$unwind" : "$prizes"},
#                {"$project": {"prizes.year" : 1, 
#                              "bornCountry" : 1}},
#                {"$group"  : {"_id"         : '$prizes.year',
#                              "bornCountries": {"$addToSet": "$bornCountry"}}},
#                {"$sort"   : {'_id'   : 1}}
#       ]))
#pprint(data)

data = db.laureates.distinct("bornCountry", {'prizes.year': '1944'})
print('\nDistinct bornCountry in 1944: ', data)

data = db.laureates.count_documents({'bornCountry': {'$exists': False}, 'prizes.year': '1944'})
print('Documents with no bornCountry in 1944: ', data)

assert all(isinstance(v, str) for v in set(db.laureates.distinct("bornCountry")) - {None})

print('\nDocuments found in 1944 with bornCountry:')
data = db.laureates.count_documents({'bornCountry': {'$exists': True}, 
                                     'prizes.year': '1944'})
print('Result (1st option): ', data)
data = db.laureates.count_documents({"bornCountry": {"$in": db.laureates.distinct("bornCountry", {'prizes.year': '1944'})}, 
                                     'prizes.year': '1944'})
print('Result (2nd option): ', data)
data = db.laureates.count_documents({"$expr": {"$in": ["$bornCountry", db.laureates.distinct("bornCountry", 
                                                                                             {'prizes.year': '1944'})]}, 
                                     'prizes.year': '1944'})
print('Result (3rd option): ', data)
data = db.laureates.count_documents({"$expr": {"$eq": [{"$type": "$bornCountry"}, "string"]}, 
                                     'prizes.year': '1944'})
print('Result (4th option): ', data)
data = db.laureates.count_documents({"bornCountry": {"$type": "string"}, 
                                     'prizes.year': '1944'})
print('Result (5th option): ', data)


Distinct bornCountry in 1944:  ['Austria-Hungary (now Poland)', 'Denmark', 'Germany', 'USA']
Documents with no bornCountry in 1944:  1

Documents found in 1944 with bornCountry:
Result (1st option):  5
Result (2nd option):  5
Result (3rd option):  5
Result (4th option):  5
Result (5th option):  5


# 04.11 Here and elsewhere

What proportion of laureates won a prize while affiliated with an institution in their country of birth? Build an aggregation pipeline to get the count of laureates who either did or did not win a prize with an affiliation country that is a substring of their country of birth -- __for example, the prize affiliation country "Germany" should match the country of birth "Prussia \(now Germany)__".

**Instructions**

1. Use $\$$unwind stages to ensure a single prize affiliation country per pipeline document.
2. Filter out prize-affiliation-country values that are "empty" (null, not present, etc.) -- ensure values are "$\$$in" the list of known values.
3. Produce a count of documents for each value of "affilCountrySameAsBorn" (a field we've projected for you using the $\$$indexOfBytes operator) by adding 1 to the running sum.


**Results**

<font color=darkgreen>Cool! Over a third of leaureates have no detected affiliation in their recorded country of birth.</font>

In [22]:
print('1st stage...')
pipeline = [{"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # For exploration purpose
            {'$limit': 3}]
data = list(db.laureates.aggregate(pipeline))
pprint(data)

print('\n2nd stage...')
pipeline = [{"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # Ensure a single prize affiliation country per pipeline document
            {'$unwind': "$prizes"},
            {'$unwind': "$prizes.affiliations"},
            # For exploration purpose
            {'$limit': 3}] 
data = list(db.laureates.aggregate(pipeline))
pprint(data)

print('\n3rd stage...')
pipeline = [{"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # Ensure a single prize affiliation country per pipeline document
            {'$unwind': "$prizes"},
            {'$unwind': "$prizes.affiliations"},
            # Ensure values in the list of distinct values (so not empty)
            {"$match": {"prizes.affiliations.country": {'$in': db.laureates.distinct("prizes.affiliations.country")}}},
            # For exploration purpose
            {'$limit': 3}] 
data = list(db.laureates.aggregate(pipeline))
pprint(data)

print('\n4th stage...')
pipeline = [{"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # Ensure a single prize affiliation country per pipeline document
            {'$unwind': "$prizes"},
            {'$unwind': "$prizes.affiliations"},
            # Ensure values in the list of distinct values (so not empty)
            {"$match": {"prizes.affiliations.country": {'$in': db.laureates.distinct("prizes.affiliations.country")}}},
            # Reproject the data
            {"$project": {"affilCountrySameAsBorn": {"$gte": [{"$indexOfBytes": ["$prizes.affiliations.country", 
                                                                                 "$bornCountry"]}, 0]}}},
            # For exploration purpose
            {'$limit': 3}] 
data = list(db.laureates.aggregate(pipeline))
pprint(data)

print('\n5th stage...')
pipeline = [{"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # Ensure a single prize affiliation country per pipeline document
            {'$unwind': "$prizes"},
            {'$unwind': "$prizes.affiliations"},
            # Ensure values in the list of distinct values (so not empty)
            {"$match": {"prizes.affiliations.country": {'$in': db.laureates.distinct("prizes.affiliations.country")}}},
            # Reproject the data
            {"$project": {"affilCountrySameAsBorn": {"$gte": [{"$indexOfBytes": ["$prizes.affiliations.country", 
                                                                                 "$bornCountry"]}, 0]}}},
            # Count by "$affilCountrySameAsBorn" value (True or False)
            {"$group": {"_id": "$affilCountrySameAsBorn",
                        "count": {"$sum": 1}}},] 
data = list(db.laureates.aggregate(pipeline))
print(data)

1st stage...
[{'bornCountry': 'the Netherlands',
  'prizes': [{'affiliations': [{'country': 'the Netherlands'}]}]},
 {'bornCountry': 'USA', 'prizes': [{'affiliations': [{'country': 'USA'}]}]},
 {'bornCountry': 'USA', 'prizes': [{'affiliations': [{'country': 'USA'}]}]}]

2nd stage...
[{'bornCountry': 'the Netherlands',
  'prizes': {'affiliations': {'country': 'the Netherlands'}}},
 {'bornCountry': 'USA', 'prizes': {'affiliations': {'country': 'USA'}}},
 {'bornCountry': 'USA', 'prizes': {'affiliations': {'country': 'USA'}}}]

3rd stage...
[{'bornCountry': 'the Netherlands',
  'prizes': {'affiliations': {'country': 'the Netherlands'}}},
 {'bornCountry': 'USA', 'prizes': {'affiliations': {'country': 'USA'}}},
 {'bornCountry': 'USA', 'prizes': {'affiliations': {'country': 'USA'}}}]

4th stage...
[{'affilCountrySameAsBorn': True},
 {'affilCountrySameAsBorn': True},
 {'affilCountrySameAsBorn': True}]

5th stage...
[{'_id': True, 'count': 477}, {'_id': False, 'count': 261}]


In [23]:
print('\nAll process at once:')
key_ac = "prizes.affiliations.country"
key_bc = "bornCountry"
pipeline = [
    {"$project": {key_bc: 1, key_ac: 1}},

    # Ensure a single prize affiliation country per pipeline document
    {'$unwind': "$prizes"},
    {'$unwind': "$prizes.affiliations"},

    # Ensure values in the list of distinct values (so not empty)
    {"$match": {key_ac: {'$in': db.laureates.distinct(key_ac)}}},
    {"$project": {"affilCountrySameAsBorn": {
        "$gte": [{"$indexOfBytes": ["$"+key_ac, "$"+key_bc]}, 0]}}},

    # Count by "$affilCountrySameAsBorn" value (True or False)
    {"$group": {"_id": "$affilCountrySameAsBorn",
                "count": {"$sum": 1}}},
]
for doc in db.laureates.aggregate(pipeline): print(doc)


All process at once:
{'_id': False, 'count': 261}
{'_id': True, 'count': 477}


# 04.12 Countries of birth by prize category

Some prize categories have laureates hailing from a greater number of countries than do other categories. You will build an aggregation pipeline for the prizes collection to collect these numbers, using a $lookup stage to obtain laureate countries of birth.

**Instructions**

1. $\$$unwind the laureates array field to output one pipeline document for each array element.
2. After pulling in laureate bios with a $\$$lookup stage, unwind the new laureate_bios array field (each laureate has only a single biography document).
3. Collect the set of bornCountries associated with each prize category.
4. Project out the size of each category's set of bornCountries.

**Results**

<font color=darkgreen>Excellent! It seems that economics laureates hail from the smallest number of countries than any other category, and literature laureates hail from the largest.</font>

In [24]:
print('1st stage...')
pipeline = [
    # Unwind the laureates array
    {'$unwind': "$laureates"},
    # For exploration purpose
    {'$limit': 1},
]
data = list(db.prizes.aggregate(pipeline))
pprint(data)

print('\n2nd stage...')
pipeline = [
    # Unwind the laureates array
    {'$unwind': "$laureates"},
    # Link with laureates collection
    {"$lookup": {"from": "laureates", "foreignField": "id",
                 "localField": "laureates.id", "as": "laureate_bios"}},
    # Unwind the new laureate_bios array
    {"$unwind": '$laureate_bios'},
    # For exploration purpose
    {'$limit': 1},
]
data = list(db.prizes.aggregate(pipeline))
pprint(data)

print('\n3rd stage...')
pipeline = [
    # Unwind the laureates array
    {'$unwind': "$laureates"},
    {"$lookup": {"from": "laureates", "foreignField": "id",
                 "localField": "laureates.id", "as": "laureate_bios"}},
    # Unwind the new laureate_bios array
    {"$unwind": '$laureate_bios'},
    {"$project": {"category": 1,
                  "bornCountry": "$laureate_bios.bornCountry",
                  '_id': 0}},
    # For exploration purpose
    {'$limit': 3},
]
data = list(db.prizes.aggregate(pipeline))
pprint(data)

print('\n4th stage...')
pipeline = [
    # Unwind the laureates array
    {'$unwind': "$laureates"},
    {"$lookup": {"from": "laureates", "foreignField": "id",
                 "localField": "laureates.id", "as": "laureate_bios"}},
    # Unwind the new laureate_bios array
    {"$unwind": '$laureate_bios'},
    {"$project": {"category": 1,
                  "bornCountry": "$laureate_bios.bornCountry",
                  '_id': 0}},
    # Collect bornCountry values associated with each prize category
    {"$group": {'_id': "$category",
                "bornCountries": {"$addToSet": "$bornCountry"}}},
    # For exploration purpose
    {'$limit': 1},
]
data = list(db.prizes.aggregate(pipeline))
pprint(data)

print('\n5th stage...')
pipeline = [
    # Unwind the laureates array
    {'$unwind': "$laureates"},
    {"$lookup": {"from": "laureates", "foreignField": "id",
                 "localField": "laureates.id", "as": "laureate_bios"}},
    # Unwind the new laureate_bios array
    {"$unwind": '$laureate_bios'},
    {"$project": {"category": 1,
                  "bornCountry": "$laureate_bios.bornCountry",
                  '_id': 0}},
    # Collect bornCountry values associated with each prize category
    {"$group": {'_id': "$category",
                "bornCountries": {"$addToSet": "$bornCountry"}}},
    # Project out the size of each category's (set of) bornCountries
    {"$project": {"category": 1,
                  "nBornCountries": {"$size": "$bornCountries"}}},
    {"$sort": {"nBornCountries": -1}},
]
data = list(db.prizes.aggregate(pipeline))
pprint(data)

1st stage...
[{'_id': ObjectId('6035cd48354dd8e354623018'),
  'category': 'physics',
  'laureates': {'firstname': 'Arthur',
                'id': '960',
                'motivation': '"for the optical tweezers and their application '
                              'to biological systems"',
                'share': '2',
                'surname': 'Ashkin'},
  'overallMotivation': '“for groundbreaking inventions in the field of laser '
                       'physics”',
  'year': '2018'}]

2nd stage...
[{'_id': ObjectId('6035cd48354dd8e354623018'),
  'category': 'physics',
  'laureate_bios': {'_id': ObjectId('6035cd48354dd8e35462360a'),
                    'born': '1922-09-02',
                    'bornCity': 'New York, NY',
                    'bornCountry': 'USA',
                    'bornCountryCode': 'US',
                    'died': '0000-00-00',
                    'firstname': 'Arthur',
                    'gender': 'male',
                    'id': '960',
                    'priz

In [25]:
print('\nAll process at once:')
pipeline = [
    # Unwind the laureates array
    {'$unwind': "$laureates"},
    {"$lookup": {
        "from": "laureates", "foreignField": "id",
        "localField": "laureates.id", "as": "laureate_bios"}},

    # Unwind the new laureate_bios array
    {"$unwind": '$laureate_bios'},
    {"$project": {"category": 1,
                  "bornCountry": "$laureate_bios.bornCountry"}},

    # Collect bornCountry values associated with each prize category
    {"$group": {'_id': "$category",
                "bornCountries": {"$addToSet": "$bornCountry"}}},

    # Project out the size of each category's (set of) bornCountries
    {"$project": {"category": 1,
                  "nBornCountries": {"$size": '$bornCountries'}}},
    {"$sort": {"nBornCountries": -1}},
]
for doc in db.prizes.aggregate(pipeline): print(doc)


All process at once:
{'_id': 'literature', 'nBornCountries': 55}
{'_id': 'peace', 'nBornCountries': 50}
{'_id': 'chemistry', 'nBornCountries': 48}
{'_id': 'medicine', 'nBornCountries': 44}
{'_id': 'physics', 'nBornCountries': 44}
{'_id': 'economics', 'nBornCountries': 21}


# 04.13 Something Extra: \$addFields to Aid Analysis

1. Something Extra: \$addFields to Aid Analysis
>It's time to round out our aggregation know-how and wrap up the course. In this lesson, we'll learn how to add fields in a pipeline without having to project existing fields.

2. A somber \$project
>For Nobel laureates that have died, I want to know the number of years they were alive. I can start a pipeline to compute this by projecting out the "died" and "born" fields. Skimming the MongoDB documentation, I found a handy operator, dateFromString. This will help us subtract the date of birth from the date of death. But wait! Some laureate documents have an invalid date of all zeroes. Why? This encodes that their date of birth is not recorded. To overcome this, let's insert a match stage at the start of our pipeline. Now, we include only laureates with reasonable years of both birth and death. Darn! It looks like some laureates have only their year of birth recorded. How can we accommodate this?

3. \$split and \$cond-itionally correct (with \$concat)
>Here's one way we can choose to accommodate a date of birth that is only a year. First, we can use a new stage, addFields, to provide new array fields split on the hyphen in the date strings. This gives us year, month, and day as the array elements. Why use addFields rather than project? Simple. We do not need to specify all the other fields we want to pass along in the pipeline. This enables us to use the existing born field in this next stage, also an addFields stage. Here, I re-write the born field if the string value zero-zero is in the bornArray. I fix it to be a real date by concatenating the year element of bornArray with the string suffix for January 1st. The conditional expression operator, cond, is a ternary operator. It evaluates the first expression, and, if it's true, returns the value of the next expression. Otherwise, it returns the value of the third expression. Now, at last, we are able to compute the number of years each laureate was alive.

4. A \$bucket list
>Now, let's compute the number of years between the died and born dates. I show only the last stage of our pipeline so far. First, we subtract the dates. This produces a value in milliseconds. Next, we divide by the approximate number of milliseconds in an average year. Finally, we floor the value to the nearest integer. At this point I'd verify that this stage works. I may add a limit stage to inspect a few output documents. I want to show you one last operator to get a sense of the distribution of "year" values across laureates. MongoDB's bucket operator groups values into buckets defined by a sequence of boundaries. Here, we see that one laureate died before the age of 40, and two lived to be over a hundred years old!

5. Practice \$addFields
>Let's solidify your understanding of the addFields stage. I'll be sure to fold in some of what you learned before.

In [26]:
print('1st stage...')
pipeline = [
    {"$project": {"died": {"$dateFromString": {"dateString": "$died"}},
                  "born": {"$dateFromString": {"dateString": "$born"}}}},
    # For exploration purpose
    {'$limit': 1}
]
try: 
    data = list(db.laureates.aggregate(pipeline))
    pprint(data)
except:
    print('Error found in dates!')

print('\n2nd stage...')
pipeline = [
    {"$match": {"died": {"$gt": "1700"}, 
                "born": {"$gt": "1700"}}},
    {"$project": {"died": {"$dateFromString": {"dateString": "$died"}},
                  "born": {"$dateFromString": {"dateString": "$born"}}}},
    # For exploration purpose
    {'$limit': 1}
]
try: 
    data = list(db.laureates.aggregate(pipeline))
    pprint(data)
except:
    print('Error found in dates!')
    
print('\n3rd stage...')
pipeline = [
    {"$match": {"died": {"$gt": "1700"}, 
                "born": {"$gt": "1700"}}},
    {"$addFields": {"bornArray": {"$split": ["$born", "-"]},
                    "diedArray": {"$split": ["$died", "-"]}}},
    # For exploration purpose
    {'$limit': 1}
]
data = list(db.laureates.aggregate(pipeline))
pprint(data)

print('\n4th stage...')
pipeline = [
    {"$match": {"died": {"$gt": "1700"}, 
                "born": {"$gt": "1700"}}},
    {"$addFields": {"bornArray": {"$split": ["$born", "-"]},
                    "diedArray": {"$split": ["$died", "-"]}}},
    {"$addFields": {"born": {"$cond": [{"$in": ["00", "$bornArray"]},
                                       {"$concat": [{"$arrayElemAt": ["$bornArray", 0]}, "-01-01"]},
                                       "$born"]}}},
    # For exploration purpose
    {'$limit': 1}
]
data = list(db.laureates.aggregate(pipeline))
pprint(data)

print('\n5th stage...')
pipeline = [
    {"$match": {"died": {"$gt": "1700"}, 
                "born": {"$gt": "1700"}}},
    {"$addFields": {"bornArray": {"$split": ["$born", "-"]},
                    "diedArray": {"$split": ["$died", "-"]}}},
    {"$addFields": {"born": {"$cond": [{"$in": ["00", "$bornArray"]},
                                       {"$concat": [{"$arrayElemAt": ["$bornArray", 0]}, "-01-01"]},
                                       "$born"]}}},
    {"$project": {"died": {"$dateFromString": {"dateString": "$died"}},
                  "born": {"$dateFromString": {"dateString": "$born"}},
                  "_id": 0}},
    {"$project": {"years": {"$floor": {"$divide": [{"$subtract": ["$died", "$born"]},
                                                   31557600000]}}}}, # 1000 * 60 * 60 * 24 * 365.25 ms
    # For exploration purpose
    {'$limit': 3}
]
data = list(db.laureates.aggregate(pipeline))
pprint(data)

print('\n6th stage...')
pipeline = [
    {"$match": {"died": {"$gt": "1700"}, 
                "born": {"$gt": "1700"}}},
    {"$addFields": {"bornArray": {"$split": ["$born", "-"]},
                    "diedArray": {"$split": ["$died", "-"]}}},
    {"$addFields": {"born": {"$cond": [{"$in": ["00", "$bornArray"]},
                                       {"$concat": [{"$arrayElemAt": ["$bornArray", 0]}, "-01-01"]},
                                       "$born"]}}},
    {"$project": {"died": {"$dateFromString": {"dateString": "$died"}},
                  "born": {"$dateFromString": {"dateString": "$born"}},
                  "_id": 0}},
    {"$project": {"years": {"$floor": {"$divide": [{"$subtract": ["$died", "$born"]},
                                                   31557600000]}}}}, # 1000 * 60 * 60 * 24 * 365.25 ms
    {"$bucket": {"groupBy": "$years",
                 "boundaries": list(range(30, 120, 10))}}
]
data = list(db.laureates.aggregate(pipeline))
pprint(data)

1st stage...
[{'_id': ObjectId('6035cd48354dd8e354623266'),
  'born': datetime.datetime(1853, 7, 18, 0, 0),
  'died': datetime.datetime(1928, 2, 4, 0, 0)}]

2nd stage...
[{'_id': ObjectId('6035cd48354dd8e354623266'),
  'born': datetime.datetime(1853, 7, 18, 0, 0),
  'died': datetime.datetime(1928, 2, 4, 0, 0)}]

3rd stage...
[{'_id': ObjectId('6035cd48354dd8e354623266'),
  'born': '1853-07-18',
  'bornArray': ['1853', '07', '18'],
  'bornCity': 'Arnhem',
  'bornCountry': 'the Netherlands',
  'bornCountryCode': 'NL',
  'died': '1928-02-04',
  'diedArray': ['1928', '02', '04'],
  'diedCountry': 'the Netherlands',
  'diedCountryCode': 'NL',
  'firstname': 'Hendrik Antoon',
  'gender': 'male',
  'id': '2',
  'prizes': [{'affiliations': [{'city': 'Leiden',
                                'country': 'the Netherlands',
                                'name': 'Leiden University'}],
              'category': 'physics',
              'motivation': '"in recognition of the extraordinary service th

# 04.14 "...it's the life in your years"

**Instructions**

For the pipeline we developed in the last slide deck, I want you to replace the last ($bucket) stage with one such that, given the documents docs collected, we can get the following output:

<code>
from operator import itemgetter

print(max(docs, key=itemgetter("years")))
print(min(docs, key=itemgetter("years")))
</code>

{'firstname': 'Rita', 'surname': 'Levi-Montalcini', 'years': 103.0}
{'firstname': 'Martin Luther', 'surname': 'King Jr.', 'years': 39.0}

You may assume that any earlier $\$$project stage has been replaced by an equivalent $\$$addFields stage.

**Possible Answers**

1. __<code>{"$project": {"years": 1, "firstname": 1, "surname": 1, "_id": 0}}</code>__ Correct!

2. <code>{"$addFields": {"firstname": 1, "surname": 1}}</code>

3. <code>{"$project": {"firstname": 1, "surname": 1}}</code>

4. <code>{"$project": {"firstname": 1, "surname": 1, "_id": 0}}</code>


**Results**

<font color=darkgreen>Solid! You projected the three needed fields are explicitly excluded the _id field. Poor Martin.</font>

In [27]:
pipeline = [
    {"$match": {"died": {"$gt": "1700"}, 
                "born": {"$gt": "1700"}}},
    {"$addFields": {"bornArray": {"$split": ["$born", "-"]},
                    "diedArray": {"$split": ["$died", "-"]}}},
    {"$addFields": {"born": {"$cond": [{"$in": ["00", "$bornArray"]},
                                       {"$concat": [{"$arrayElemAt": ["$bornArray", 0]}, "-01-01"]},
                                       "$born"]}}},
    {"$project": {"died": {"$dateFromString": {"dateString": "$died"}},
                  "born": {"$dateFromString": {"dateString": "$born"}},
                  "firstname": 1, 
                  "surname": 1,
                  "_id": 0}},
    {"$project": {"_id": 0,
                  "firstname": 1, 
                  "surname": 1,
                  "years": {"$floor": {"$divide": [{"$subtract": ["$died", "$born"]},
                                                   31557600000]}}}}, # 1000 * 60 * 60 * 24 * 365.25 ms
    # For exploration purpose
    {'$limit': 3}
]
data = list(db.laureates.aggregate(pipeline))
pprint(data)

[{'firstname': 'Hendrik Antoon', 'surname': 'Lorentz', 'years': 74.0},
 {'firstname': 'Sir Martin', 'surname': 'Ryle', 'years': 66.0},
 {'firstname': 'Aage Niels', 'surname': 'Bohr', 'years': 87.0}]


# 04.15 How many prizes were awarded to immigrants?

How many prizes were awarded to people who had no affiliation in their country of birth at the time of the award?

**Instructions**

1. In your aggregation pipeline pipeline, use the "gender" field to limit results to people (that is, not organizations).
2. Count prizes for which the laureate's "bornCountry" is not also the "country" of any of their affiliations for the prize. Be sure to use field paths (precede a field name with "$\$$") when appropriate.

**Results**

<font color=darkgreen>Fine work! Note that because we ended up using only the "bornCountryInAffiliations" field after the $addFields stage, we could refactor that stage to be a $project stage instead.</font>

In [28]:
# Finding documents with more than one affiliations
db.laureates.count_documents({'prizes.affiliations.1': {'$exists': True}})

# How many documents decompressing until prizes.affiliations
print('Exploring...')
pipeline = [# Assure only data belong to person
            {'$match': {'gender': {'$ne': 'org'}}},
            # Select only needed data
            {"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # Ensure a single prize affiliation country per pipeline document
            {'$unwind': "$prizes"},
            {'$unwind': "$prizes.affiliations"},
            # Add a new field
            {"$addFields": {"prizeACExist": {'$ne': ["$prizes.affiliations.country", []]}}},
            {"$addFields": {"affilCountrySameAsBorn": {"$cond": [{'$ne': ["$prizes.affiliations.country", []]},
                                                                 {"$gte": [{"$indexOfBytes": ["$prizes.affiliations.country",
                                                                                              "$bornCountry"]}, 0]},
                                                                 False]}}},
            # Reproject the data
            {"$project": {"bornCountry": 1, 
                          "prizes.affiliations.country": 1, 
                          'prizeACExist': 1,
                          'affilCountrySameAsBorn': 1,
                          '_id': 0}},
]
data = list(db.laureates.aggregate(pipeline))
print('\nNumber of docs (decompressing until prizes.affiliations):', len(data))
print('3 first docs as example:')
pprint(data[:3])

pipeline = [# Assure only data belong to person
            {'$match': {'gender': {'$ne': 'org'}}},
            # Select only needed data
            {"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # Ensure a single prize affiliation country per pipeline document
            {'$unwind': "$prizes"},
            # Add a new field
            {"$addFields": {"prizeACExist": {'$ne': ["$prizes.affiliations.country", []]}}},
            {"$addFields": {"affilCountrySameAsBorn": {"$in": ['$bornCountry', "$prizes.affiliations.country"]}}},
            # Reproject the data
            {"$project": {"bornCountry": 1, 
                          "prizes.affiliations.country": 1, 
                          'prizeACExist': 1,
                          'affilCountrySameAsBorn': 1,
                          '_id': 0}},
]
data = list(db.laureates.aggregate(pipeline))
print('\nNumber of docs (decompressing until prizes):', len(data))
print('3 first docs as example:')
pprint(data[:3])

Exploring...

Number of docs (decompressing until prizes.affiliations): 978
3 first docs as example:
[{'affilCountrySameAsBorn': True,
  'bornCountry': 'the Netherlands',
  'prizeACExist': True,
  'prizes': {'affiliations': {'country': 'the Netherlands'}}},
 {'affilCountrySameAsBorn': True,
  'bornCountry': 'USA',
  'prizeACExist': True,
  'prizes': {'affiliations': {'country': 'USA'}}},
 {'affilCountrySameAsBorn': True,
  'bornCountry': 'USA',
  'prizeACExist': True,
  'prizes': {'affiliations': {'country': 'USA'}}}]

Number of docs (decompressing until prizes): 914
3 first docs as example:
[{'affilCountrySameAsBorn': True,
  'bornCountry': 'the Netherlands',
  'prizeACExist': True,
  'prizes': {'affiliations': [{'country': 'the Netherlands'}]}},
 {'affilCountrySameAsBorn': True,
  'bornCountry': 'USA',
  'prizeACExist': True,
  'prizes': {'affiliations': [{'country': 'USA'}]}},
 {'affilCountrySameAsBorn': True,
  'bornCountry': 'USA',
  'prizeACExist': True,
  'prizes': {'affiliation

In [29]:
# For example, the prize affiliation country "Germany" should match the country of birth "Prussia (now Germany).
# First approximation (without book help)
print('\nWithout book help...')
pipeline = [# Assure only data belong to person
            {'$match': {'gender': {'$ne': 'org'}}},
            # Select only needed data
            {"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # Ensure a single prize affiliation country per pipeline document
            {'$unwind': "$prizes"},
            # Add a new field
            {"$addFields": {"prizeACExist": {'$ne': ["$prizes.affiliations.country", []]}}},
            {"$addFields": {"affilCountrySameAsBorn": {"$in": ['$bornCountry', "$prizes.affiliations.country"]}}},
            # Reproject the data
            {"$project": {"bornCountry": 1, 
                          "prizes.affiliations.country": 1, 
                          'prizeACExist': 1,
                          'affilCountrySameAsBorn': 1,
                          'bornCountryInAffiliations': 1,
                          '_id': 0}},
            # Filtering the data with no Affiliation (special cases in data)
            {'$match': {'prizeACExist': False}},
            # For exploration purpose
            {'$limit': 3}
] 
data = list(db.laureates.aggregate(pipeline))
print('Exploring...')
pprint(data)


pipeline = [# Assure only data belong to person
            {'$match': {'gender': {'$ne': 'org'}}},
            # Select only needed data
            {"$project": {"bornCountry": 1, "prizes.affiliations.country": 1, '_id': 0}},
            # Ensure a single prize affiliation country per pipeline document
            {'$unwind': "$prizes"},
            # Reproject the data
            {"$project": {"affilCountrySameAsBorn": {"$in": ['$bornCountry', "$prizes.affiliations.country"]}}},
            # Count by "$affilCountrySameAsBorn" value (True or False)
            {"$group": {"_id": "$affilCountrySameAsBorn",
                        "count": {"$sum": 1}}},
            # Show only no affilliation count
            {'$match': {'_id': False}}
] 
data = list(db.laureates.aggregate(pipeline))
print('\nFinal result...')
pprint(data)


Without book help...
Exploring...
[{'affilCountrySameAsBorn': False,
  'bornCountry': 'Russian Empire (now Poland)',
  'prizeACExist': False,
  'prizes': {'affiliations': [[]]}},
 {'affilCountrySameAsBorn': False,
  'bornCountry': 'France',
  'prizeACExist': False,
  'prizes': {'affiliations': [{}]}},
 {'affilCountrySameAsBorn': False,
  'bornCountry': 'Austria',
  'prizeACExist': False,
  'prizes': {'affiliations': [[]]}}]

Final result...
[{'_id': False, 'count': 478}]


In [30]:
print('\nBase on lesson...')
print('With $in operator')
pipeline = [
    # Limit results to people; project needed fields; unwind prizes
    {'$match': {'gender': {"$ne": "org"}}},
    {"$project": {"bornCountry": 1, "prizes.affiliations.country": 1}},
    {"$unwind": "$prizes"},
            
    # Count prizes with no country-of-birth affiliation
    {"$addFields": {"bornCountryInAffiliations": {"$in": ['$bornCountry', "$prizes.affiliations.country"]}}},
    {'$match': {"bornCountryInAffiliations": False}},
    {"$count": "awardedElsewhere"},
]
data = list(db.laureates.aggregate(pipeline))
print(f'With $addField: {data}')

pipeline = [
    # Limit results to people; project needed fields; unwind prizes
    {'$match': {'gender': {"$ne": "org"}}},
    {"$project": {"bornCountry": 1, "prizes.affiliations.country": 1}},
    {"$unwind": "$prizes"},
            
    # Count prizes with no country-of-birth affiliation
    {"$project": {"bornCountryInAffiliations": {"$in": ['$bornCountry', "$prizes.affiliations.country"]}}},
    {'$match': {"bornCountryInAffiliations": False}},
    {"$count": "awardedElsewhere"},
]
data = list(db.laureates.aggregate(pipeline))
print(f'With $project instead: {data}')


Base on lesson...
With $in operator
With $addField: [{'awardedElsewhere': 478}]
With $project instead: [{'awardedElsewhere': 478}]


# 04.16 Refinement: filter out "unaffiliated" people

In the previous exercise, we counted prizes awarded to people without an affiliation in their "bornCountry". However, hundreds of prizes were awarded to people without recorded affiliations; sure, their "bornCountry" is technically not the "country" of any of their affiliations, but there are no "country" values to compare against!

**Instructions**

1. Construct a stage added_stage that filters for laureate "prizes.affiliations.country" values that are non-empty, that is, are $\$$in a list of the distinct values that the field takes in the collection.
2. Insert this stage into the pipeline so that it filters out single prizes (not arrays) and precedes any test for membership in an array of countries. Recall that the first parameter to <list>.insert is the (zero-based) index for insertion.

**Results**

<font color=darkgreen>Superb! Note that further refinements are possible. For example, substring matching could mark certain countries as equivalent. When it comes to assumptions, explicit is better than implicit (import this).</font>

In [31]:
pipeline = [
    {"$match": {"gender": {"$ne": "org"}}},
    {"$project": {"bornCountry": 1, "prizes.affiliations.country": 1}},
    {"$unwind": "$prizes"},
    {"$addFields": {"bornCountryInAffiliations": {"$in": ["$bornCountry", "$prizes.affiliations.country"]}}},
    {"$match": {"bornCountryInAffiliations": False}},
    {"$count": "awardedElsewhere"},
]

# Construct the additional filter stage
added_stage = {"$match": {'prizes.affiliations.country': {'$in': db.laureates.distinct('prizes.affiliations.country')}}}

# Insert this stage into the pipeline
pipeline.insert(3, added_stage)
print(list(db.laureates.aggregate(pipeline)))

[{'awardedElsewhere': 252}]


# 04.17 Wrap-Up

1. Wrap-Up
>Congratulations on completing this course!

2. You know know how to...
>You now know how to work with MongoDB databases and collections. You've created and composed query filters with operators. You've used dot notation to query substructure. You've fetched distinct values, queried arrays, and used regular expressions. You've projected, sorted, and ensured performant queries with indexes. And finally, you've constructed aggregation pipelines for flexible and powerful analyses. You now have the vocabulary and experience you need to resolve issues you encounter in the wild. The official MongoDB documentation is a great resource. The pymongo driver also has great documentation. And of course, search engines are your friend.

3. Thanks!
>I hope you've had as much fun taking this course as I had making it. Thank you for your time.

# Aditional material

- Datacamp course: https://learn.datacamp.com/courses/introduction-to-using-mongodb-for-data-science-with-python
- Querys in mongo trouble example: https://stackoverflow.com/questions/38133529/aggregate-query-in-mongo-works-does-not-in-pymongo/38135029
- Pipeline operators: https://docs.mongodb.com/manual/reference/operator/aggregation/
- Documentation:
    - https://api.mongodb.com/
    - https://docs.mongodb.com/
    - https://pymongo.readthedocs.io/