# Lab: Expression  Composition

## Preclude
For this lab, you'll be composing expressions together.  
The dataset for this lab can be downloaded [here](https://s3.amazonaws.com/edu-static.mongodb.com/lessons/coursera/aggregation/movies.json) for upload to your own cluster.

In [1]:
import pymongo
import json

In [2]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = pymongo.MongoClient(course_cluster_uri)

In [3]:
movies = course_client['aggregations']['movies']

This lab will have you work with data within arrays, a common operation.

Specifically, one of the arrays you'll work with is ``writers``, from the
**movies** collection.

There are times when we want to make sure that the field is an array, and that
it is not empty. We can do this within ``$match``

  `{ "$match": { "writers": { "$elemMatch": { "$exists": true } } }`

However, the entries within ``writers`` presents another problem. A good amount
of entries in ``writers`` look something like the following, where the writer is
attributed with their specific contribution ::

  `"writers" : [ "Vincenzo Cerami (story)", "Roberto Benigni (story)" ]`

But the writer also appears in the ``cast`` array as "Roberto Benigni"!

Give it a look with the following query

In [4]:
result = movies.find_one({"title": "Life Is Beautiful"}, { "_id": 0, "cast": 1, "writers": 1})
print(json.dumps(result, indent=4))

{
    "cast": [
        "Roberto Benigni",
        "Nicoletta Braschi",
        "Giustino Durano",
        "Giorgio Cantarini"
    ],
    "writers": [
        "Vincenzo Cerami (story)",
        "Roberto Benigni (story)"
    ]
}


This presents a problem, since comparing ``"Roberto Benigni"`` to
``"Roberto Benigni (story)"`` will definitely result in a difference.

Thankfully there is a powerful expression to help us, ``$map``. ``$map`` lets us
iterate over an array, element by element, performing some transformation on
each element. The result of that transformation will be returned in the same
place as the original element.

Within ``$map``, the argument to ``input`` can be any expression as long as it
resolves to an array. The argument to ``as`` is the name we want to use to refer
to each element of the array when performing whatever logic we want, surrounding
it with quotes and prepending two `$` signs. The field ``as`` is optional, and if omitted
each element must be referred to as ``"$$this"``

      "writers": {
        "$map": {
          "input": "$writers",
          "as": "writer",
          "in": "$$writer"


``in`` is where the work is peformed. Here, we use the ``$arrayElemAt``
expression, which takes two arguments, the array and the index of the element we
want. We use the ``$split`` expression, splitting the values on ``" ("``.

If the string did not contain the pattern specified, the only modification is it
is wrapped in an array, so ``$arrayElemAt`` will always work

      "writers": "$map": {
        "input": "$writers",
        "as": "writer",
        "in": {
          "$arrayElemAt": [
            {
              "$split": [ "$$writer", " (" ]
            },
            0
          ]
        }
      }
      
Let's see it in action to get a full sense of what it does.

In [5]:
# this stage is provided for you, use it later as well
mapping = {
    "$project": {
        "_id": 0,
        "cast": 1,
        "directors": 1,
        "writers": {
            "$map": {
                "input": "$writers",
                "as": "writer",
                "in": {
                    "$arrayElemAt": [
                        { "$split": ["$$writer", " ("] },  # $$writer reference the user variable
                        0
                    ]
                }
            }
        }
    }
}

In [6]:
result = movies.aggregate([
    {
        "$match": {"title": "Life Is Beautiful"}
    },
    mapping
])
print(json.dumps(list(result), indent=4))

[
    {
        "cast": [
            "Roberto Benigni",
            "Nicoletta Braschi",
            "Giustino Durano",
            "Giorgio Cantarini"
        ],
        "directors": [
            "Roberto Benigni"
        ],
        "writers": [
            "Vincenzo Cerami",
            "Roberto Benigni"
        ]
    }
]


## Question

Let's find how many movies in our **movies** collection are a "labor of love",
where the same person appears in ``cast``, ``directors``, and ``writers``

How many movies are "labors of love"?

In [None]:
a function P: X -> {true, false} is called a predicate on X.
"predicate" in computer science means a pre-check or validation stage, e.g. check if data "IsNull".

In [8]:
predicate = {
    "$match": {
        "$and": [
            {"cast": {"$elemMatch": {"$exists": True } } },
            {"directors": {"$elemMatch": {"$exists": True } } },
            {"writers": {"$elemMatch": {"$exists": True } } }
        ]
    }
}

In [9]:
mapping = {
    "$project": {
        "_id": 0,
        "cast": 1,
        "directors": 1,
        "writers": {
            "$map": {  # similar to the Python map(), takes in an array
                "input": "$writers",  # an input array
                "as": "writer",       # specify a name for each element in the array
                "in": {  # the operation to apply 1 by 1 on each array element
                    "$arrayElemAt": [
                        {"$split": ["$$writer", " ("] },  # $$writer is the user variable as specified in "as"
                        0
                    ]
                }
            }
        }
    }
}

In [10]:
projection = {
    "$project": {
        "overlaps": {   # user-defined variable
            "$size": {  # get size of an array
                "$setIntersection": ["$cast", "$directors", "$writers"]
            }
        }
    }
}

In [58]:
matching = {
    "$match": {
        "overlaps": {"$gte": 1}
    }
}

In [59]:
counting = {
    "$count": "labors_of_love"
}

In [60]:
from IPython.display import display

pipeline = [
    predicate,
    mapping,
    projection,
    matching,
    counting
]

display(list(movies.aggregate(pipeline)))

[{'labors_of_love': 1597}]

# Lab: Changing Document Shape

## Preclude
For this lab, you'll be using expressions to change document shape and perform an analysis.  
The dataset for this lab can be downloaded [here](https://s3.amazonaws.com/edu-static.mongodb.com/lessons/coursera/aggregation/movies.json) for upload to your own cluster.

In [61]:
import pymongo

In [62]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = pymongo.MongoClient(course_cluster_uri)

In [63]:
movies = course_client['aggregations']['movies']

Our movies dataset has a lot of different documents, some with more convoluted
titles than others. 

If we'd like to analyze our collection to find movie titles
that are composed of only one word, we **could** fetch all the movies in the
dataset and do some processing in a client application, but the Aggregation
Framework allows us to do this on the server!

Ensure you explore the  [string expressions](https://docs.mongodb.com/manual/meta/aggregation-quick-reference/#string-expressions) and the [array expressions](https://docs.mongodb.com/manual/meta/aggregation-quick-reference/#array-expressions) before attempting this lab.

## Question

Using the Aggregation Framework, find a count of the number of movies that have
a title composed of one word. To clarify, "Cinderella" and "3-25" should count,
where as "Cast Away" would not.

Don't forget to append the following `counting` variable to your pipeline!

In [77]:
shaping = {
    "$project": {
        "title_length": {
            "$size": {
                "$split": ["$title", ' ']
            }
        },
        "_id": 0
    }
}

In [78]:
matching = {
    "$match": {
        "title_length": {"$eq": 1}
    }
}

In [79]:
counting = {
    "$count": "one_word_titles"
}

In [80]:
from IPython.display import display

pipeline = [
    shaping,
    matching,
    counting
]

display(list(movies.aggregate(pipeline)))

[{'one_word_titles': 8068}]

# Lab: Using Cursor-like aggregation stages

## Preclude
For this lab, you'll have to use cursor-like aggregation stages to find the answer for the following scenario.  
The dataset for this lab can be downloaded [here](https://s3.amazonaws.com/edu-static.mongodb.com/lessons/coursera/aggregation/movies.json) for upload to your own cluster.

## Quesion: Movie Night

Your organization has a movie night scheduled, and you've again been tasked with coming up with a selection.  
HR has polled employees and assembled the following list of preferred actresses and actors.

In [1]:
import pymongo

In [2]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = pymongo.MongoClient(course_cluster_uri)

In [3]:
movies = course_client['aggregations']['movies']

In [4]:
favorites = [
  "Sandra Bullock",
  "Tom Hanks",
  "Julia Roberts",
  "Kevin Spacey",
  "George Clooney"
]

For movies released in the **USA** with a ``tomatoes.viewer.rating`` greater than or equal to __3__, calculate a new field called `num_favs` that represents how many **favorites** appear in the ``cast`` field of the movie.

Sort your results by ``num_favs``, ``tomatoes.viewer.rating``, and ``title``, all in descending order.

What is the ``title`` of the **25th** film in the aggregation result?

**Hint**: MongoDB has a great expression for quickly determining whether there are common elements in lists, ``$setIntersection``

In [41]:
predicate = {
    "$match":{
        "tomatoes.viewer.rating":{
            "$gte":3
        },
        "cast":{
            "$exists":True
       },
        "countries":"USA"
    }
}

In [47]:
projection = {
    "$project": {
        "num_favs": {
            "$size": {
                "$setIntersection": [favorites, "$cast"]  # directly reference local variable `favorites`
            }
        },
        "title": 1,
        "rating": "$tomatoes.viewer.rating",  # rename the field
        "_id": 0
    }
}

In [48]:
sorting = {
    "$sort": {
        "num_favs": -1,
        "rating": -1,
        "title": -1
    }
}

In [49]:
skipping = {
    "$skip": 24
}

In [50]:
limiting = {
    "$limit": 1
}

In [51]:
from IPython.display import display

pipeline = [
    predicate,
    projection,
    sorting,
    skipping,
    limiting
]

display(list(movies.aggregate(pipeline)))

[{'num_favs': 1, 'rating': 3.8, 'title': 'The Heat'}]

# Lab: Group Accumulators

## Preclude
For this lab, you'll be using group accumulators.

In [52]:
import pymongo

In [53]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = pymongo.MongoClient(course_cluster_uri)

In [54]:
movies = course_client['aggregations']['movies']

## Question

In this lab, you will need to capture the highest `imdb.rating`, lowest `imdb.rating`, average, and **sample** standard deviation for all films that won an Oscar.

You may find documentation on [group accumulators](https://docs.mongodb.com/manual/reference/operator/aggregation-group/#group-accumulator-operators) helpful!

The matching stage to find films with Oscar wins is provided below.

In [55]:
matching = {
    "$match": {
        "awards": {"$regex": "Won \\d{1,2} Oscars?"}
    }
}

In [57]:
grouping = {
    "$group": {
        "_id": "null",
        "high": {"$max": "$imdb.rating"},
        "low": {"$min": "$imdb.rating"},
        "average": {"$avg": "$imdb.rating"},
        "ssd": {"$stdDevSamp": "$imdb.rating"}
    }
}

In [58]:
from IPython.display import display

pipeline = [
    matching,
    grouping
]

display(list(movies.aggregate(pipeline)))

[{'_id': 'null',
  'average': 7.527024070021882,
  'high': 9.2,
  'low': 4.5,
  'ssd': 0.5988145513344504}]

# Lab: Using `$unwind` and `$group`

## Preclude
For this lab, you'll be using both the ``$unwind`` and ``$group`` stages.  
The dataset for this lab can be downloaded [here](https://s3.amazonaws.com/edu-static.mongodb.com/lessons/coursera/aggregation/movies.json) for upload to your own cluster.

In [59]:
import pymongo

In [60]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = pymongo.MongoClient(course_cluster_uri)

In [61]:
movies = course_client['aggregations']['movies']

## Question

Let's use our increasing understanding of the Aggregation Framework to explore our
movies collection in more detail. We'd like to calculate how many movies every
**cast** member has been in, and get an average ``imdb.rating`` for each
``cast`` member.

Which cast member has the been in the most movies with **English** as an available language?

To verify that you've successfully completed this exercise please submit your answer as the sum of the number of films and average rating for this cast member.

For example, if the cast member was output like so:

    { "_id": "James Dean", "numFilms": 11, "average": 7.1 }
Then the answer would be 18.1.

In [123]:
predicate = {
    "$match": {
        "languages": {"$exists": True},
        "cast": {"$exists": True},
        "languages": {"$in": ["English"]}
    }
}

In [124]:
unwinding = {
    "$unwind": "$cast"
}

In [125]:
grouping = {
    "$group": {
        "_id": "$cast",
        "numFilms": {"$sum": 1},
        "average": {"$avg": "$imdb.rating"}
    }
}

In [126]:
shaping = {
    "$project": {
        "score": {"$add": ["$numFilms", "$average"]},
        "_id": 1,
        "numFilms": 1,
        "average": 1
    }
}

In [127]:
sorting = {
    "$sort": {"numFilms": -1}
}

In [128]:
limiting = {
    "$limit": 1
}

In [129]:
from IPython.display import display

pipeline = [
    predicate,
    unwinding,
    grouping,
    shaping,
    sorting,
    limiting
]

In [130]:
display(list(movies.aggregate(pipeline))[0])

{'_id': 'John Wayne',
 'average': 6.424299065420561,
 'numFilms': 107,
 'score': 113.42429906542056}

# Lab: Using ``$lookup``

## Preclude
For this lab, you'll be using the ``$lookup``.  
The dataset for this lab can be downloaded by clicking the following links - [air_alliances](https://s3.amazonaws.com/edu-static.mongodb.com/lessons/coursera/aggregation/air_alliances.json), [air_routes](https://s3.amazonaws.com/edu-static.mongodb.com/lessons/coursera/aggregation/air_routes.json) - for upload to your own cluster.

In [1]:
import pymongo

In [2]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = pymongo.MongoClient(course_cluster_uri)

In [3]:
alliances = course_client['aggregations']['air_alliances']
routes = course_client['aggregations']['air_routes']

## Question

Which alliance from ``air_alliances`` flies the most **routes** with either a
Boeing 747 or an Airbus A380 (abbreviated 747 and 380 in ``air_routes``)?

**Note**: Begin from the ``air_routes`` collection!

In [8]:
# the easier way: start from `air_alliances`, and join `air_routes`.
unwinding = {
    "$unwind": "$airlines"
}

lookup = {
    "$lookup": {
        "from": "air_routes",
        "localField": "airlines",
        "foreignField": "airline.name",
        "as": "route"  # this will be an array of all matching routes
    }
}

unwinding2 = {
    "$unwind": "$route"  # so we need to unwind again
}

match = {
    "$match": {
        "route.airplane": {"$regex": "747|380"}
    }
}

grouping = {
    "$group": {
        "_id": "$name",
        "NumRoutes": {"$sum": 1}
    }
}

sorting = {
    "$sort": {"NumRoutes": -1}
}

In [9]:
pipeline = [
    unwinding,
    lookup,
    unwinding2,
    match,
    grouping,
    sorting
]

In [10]:
display(list(alliances.aggregate(pipeline)))

[{'NumRoutes': 16, '_id': 'SkyTeam'},
 {'NumRoutes': 11, '_id': 'OneWorld'},
 {'NumRoutes': 11, '_id': 'Star Alliance'}]

In [31]:
# the harder way: start from `air_routes`, and join `air_alliances`.
predicate = {
    "$match": {
        "airplane": {"$regex": "747|380"}
    }
}

lookup = {
    "$lookup": {
        "from": "air_alliances",
        "localField": "airline.name",
        "foreignField": "airlines",
        "as": "route"
    }
}

unwinding = {
    "$unwind": "$route"
}

grouping = {
    "$group": {
        "_id": "$route.name",
        "NumRoutes": {"$sum": 1}
    }
}

sorting = {
    "$sort": {"NumRoutes": -1}
}

In [33]:
pipeline = [
    predicate,
    lookup,
    unwinding,
    grouping,
    sorting
]

In [34]:
display(list(routes.aggregate(pipeline)))

[{'NumRoutes': 16, '_id': 'SkyTeam'},
 {'NumRoutes': 11, '_id': 'Star Alliance'},
 {'NumRoutes': 11, '_id': 'OneWorld'}]

# Lab : `$graphLookup`

For this lab, you'll be calculating the [degrees of separation](https://en.wikipedia.org/wiki/Six_degrees_of_separation) of directors to "Steven Spielberg".  

This is a bit like calculating a ["Kevin Bacon" number](https://en.wikipedia.org/wiki/Six_Degrees_of_Kevin_Bacon), but instead of all connections you will only consider connections through the `directors` graph nodes.  

Complete the the `$graphLookup` and `$project` stages by correctly constructing the `graph_lookup` and `project_cast` variables below.  

To optimize the execution of `$graphLookup` stage, use a `maxDepth` of 6.  

For the solution, only provide the numeric portion of the returned output to the validator.  

**HINT**: `$reduce` is a powerful expression!

In [38]:
import pymongo
import pprint
import dateparser
from bson.son import SON

course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = pymongo.MongoClient(course_cluster_uri)
movies = course_client['aggregations']['movies']

In [48]:
predicate = {
    "$match": {
        "directors": "Steven Spielberg"  # match docs whose "$directors" array contains "Steven Spielberg"
    }                                    # not strict equality match
}

projecting = {
    "$project": {
        "directors": 1
    }
}

graph_lookup = {
    "$graphLookup": {
        "from": "movies",
        "startWith": "$directors",
        "connectFromField": "directors",
        "connectToField": "directors",
        "as": "network",
        "maxDepth": 6,
        "depthField": "network_level",
    }
}

unwinding = {
    "$unwind": "$network"
}

reshaping = {
    "$project": {
        "cast": "$network.cast",
        "level": "$network.network_level"
    }
}

grouping = {
    "$group": {
        "_id": "$level",
        "cast": {"$addToSet": "$cast"}
    }
}

project_cast = {
    "$project": {
        "cast": {
            "$reduce": {
                "input": "$cast",
                "initialValue": [],
                "in": {"$concatArrays": ["$$value", "$$this"]}
            }
        }
    }
}
    
matching = {
    "$match": {
        "cast": "Woody Harrelson"
    }
}

sorting = {
    "$sort": {
        "_id": 1
    }
}

reusulting = {
    "$project": {
        "_id": 0,
        "answer": "$_id"
    }
}

limiting = {
    "$limit": 1
}

In [49]:
from IPython.display import display

pipeline = [
    predicate,
    projecting,
    graph_lookup,
    unwinding,
    reshaping,
    grouping,
    project_cast,
    matching,
    sorting,
    reusulting,
    limiting
]

In [59]:
display(list(movies.aggregate(pipeline)))

[{'answer': 2}]

# Schemas and accumulators

## Question

Consider the following document from the `coursera-agg.orders` collection on the course cluster:

In [None]:
{
  "_id": 536366,
  "date": ISODate("2010-12-01T08:28:00Z"),
  "customer_id": 17850,
  "country": "United Kingdom",
  "purchases": [
    {
      "description": "HAND WARMER UNION JACK",
      "quantity": 6,
      "stock_code": "22633",
      "unit_price": 1.85
    },
    {
      "description": "HAND WARMER RED POLKA DOT",
      "quantity": 6,
      "stock_code": "22632",
      "unit_price": 1.85
    }
  ]
}

The documents in this collection represent orders made to an online retail store. The purchases field is an array containing each item order with its associated quantity and unit price.

Your goal for this exercise is to add the following fields to each document using the aggregation framework:

- order_total - This field represents the total for the order. i.e. The sum of each item's unit_price multiplied by its quantity.
- order_quantity - This field represents the total number of products ordered. i.e. The sum of each item's quantity.
- mean_order_unit_price - This field represents the mean unit price of the products ordered. i.e. The average of each item's unit_price.
- mean_order_quantity - This field represents the mean quantity of the products ordered. i.e. The average of each item's quantity. 

For example, the example document above would become:

In [None]:
{
  "_id": 536366,
  "date": ISODate("2010-12-01T08:28:00Z"),
  "customer_id": 17850,
  "country": "United Kingdom",
  "mean_order_quantity": 6.0,
  "mean_order_unit_price": 1.85,
  "order_quantity": 12,
  "order_total": 22.20,
  "purchases": [
    {
      "description": "HAND WARMER UNION JACK",
      "quantity": 6,
      "stock_code": "22633",
      "unit_price": 1.85
    },
    {
      "description": "HAND WARMER RED POLKA DOT",
      "quantity": 6,
      "stock_code": "22632",
      "unit_price": 1.85
    }
  ]
}

Hint: You don't need to use $unwind to make this work.  
To verify that you've successfully completed this exercise, what is the order_total of the largest single order?

## Solution

In [1]:
from pymongo import MongoClient
import pprint

In [2]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = MongoClient(course_cluster_uri)

In [3]:
orders = course_client['coursera-agg']['orders']

In [15]:
add_field = {
    "$addFields": {
        "order_total": {"$sum": {"$multiply": ["$purchases.unit_price", "$purchases.quantity"]}},
        "order_quantity": {"$sum": "$purchases.quantity"},
        "mean_order_unit_price": {"$avg": "$purchases.unit_price"},
        "mean_order_quantity": {"$avg": "$purchases.quantity"}
    }
}

In [16]:
pipeline = [
    add_field
]

In [17]:
cursor = orders.aggregate(pipeline)

OperationFailure: $multiply only supports numeric types, not array

Oops!! What the hell is going on? I've checked the `$addFields` stage many times, pretty sure my code is correct, so why did MongoDB throw this error? Calm down, let's read the error message: `$multiply only supports numeric types, not array`. So, within the `$multiply` operator, there must be some documents that have a different data type.

The way we get around this, is to check if there's any documents that have a non-numeric type in the `$purchases.unit_price` or `$purchases.quantity` field. We can either filter documents in the Compass, or try to run some query operations here. Stackoverflow says, that 1 document has a blank `$purchases.unit_price`, since that field is empty, MongoDB interpreted it as an array, that's the cause.  

__Remember, it's very important that you validate your data before applying operations on the whole collection.__  
__In general, every pipeline must have a validate or predicate stage to clean up the data.__

In [62]:
predicate = {
    "$match": {
        "$and": [
            {"purchases.unit_price": {"$exists": True, "$ne": "null" }},
            {"purchases.quantity": {"$exists": True, "$ne": "null" }}
        ]
    }
}

In [66]:
pipeline = [
    predicate,
    add_field
]

In [67]:
cursor = orders.aggregate(pipeline)

OperationFailure: $multiply only supports numeric types, not array

error again, so how to do "Not null" queries in MongoDB? you can check:  
https://stackoverflow.com/questions/4057196/how-do-you-query-for-is-not-null-in-mongo

In [72]:
# let's try another predicate
predicate = {
    "$match": {
        "$and": [
            {"purchases.unit_price": {"$type": "double" }},
            {"purchases.quantity": {"$type": "int" }}
        ]
    }
}

In [73]:
pipeline = [
    predicate,
    add_field
]

In [74]:
cursor = orders.aggregate(pipeline)

OperationFailure: $multiply only supports numeric types, not array

f\*ck, the same error. So the problem with MongoDB is that, a pipeline can sometimes be very hard to debug. When you get an error, still it's not clear where the bug lies in. Usually, the problem you are focusing on might not be the real cause, so take a break, clear your mind, and check the problem again, look at the data types more carefully.  

After a while, I came to realize that, the field `$purchases` is actually not an embedded document, so the dot notation might not make sense in this case, it's an array. So, we have 2 choices, either unroll the array using `$unwind` operator, which consumes a lot more memory to run, alternatively, we can use the `$map` operator to element-wisely deal with the array and make it simpler, and then use `$reduce` to calculate a single value from each entire array. When it comes to dealing with array field, __map+reduce__ is a very good choice. Let's do it step-by-step.

In [86]:
# for each element in the array, let's project the only useful fields
mapping = {
    '$addFields': {
        'item_total': {
            '$map': {
                'input': '$purchases',
                'in': { '$multiply': ['$$this.unit_price', '$$this.quantity'] } 
            } 
        },
        "quantities":'$purchases.quantity',
        "unit_prices": '$purchases.unit_price',
        "num_purchases": {
            '$size': '$purchases'
        }
    }
}

limiting = {
    '$limit': 3
}

In [87]:
cursor = orders.aggregate([mapping,limiting])

In [88]:
for doc in cursor:
    pprint.pprint(doc)

{'_id': 536369,
 'country': 'United Kingdom',
 'customer_id': 13047,
 'date': datetime.datetime(2010, 12, 1, 8, 35),
 'item_total': [17.85],
 'num_purchases': 1,
 'purchases': [{'description': 'BATH BUILDING BLOCK WORD',
                'quantity': 3,
                'stock_code': '21756',
                'unit_price': 5.95}],
 'quantities': [3],
 'unit_prices': [5.95]}
{'_id': 536367,
 'country': 'United Kingdom',
 'customer_id': 13047,
 'date': datetime.datetime(2010, 12, 1, 8, 34),
 'item_total': [54.08,
                12.600000000000001,
                30.0,
                12.600000000000001,
                25.5,
                14.850000000000001,
                19.9,
                17.85,
                17.85,
                9.899999999999999,
                31.8,
                31.8],
 'num_purchases': 12,
 'purchases': [{'description': 'ASSORTED COLOUR BIRD ORNAMENT',
                'quantity': 32,
                'stock_code': '84879',
                'unit_price': 

In [91]:
mapping2 = {
    '$addFields': {
        "order_total": { 
            '$reduce': {  # now calculate `order_total` by `$reduce`
                'input': '$item_total',
                'initialValue': 0,
                'in': { '$add': ['$$value', '$$this' ] }  # sum up `item_total` to get `order_total`
            }
        },
        
        "order_quantity": {
            '$reduce': {
                'input': '$quantities',
                'initialValue': 0,
                'in': { '$add': ['$$value', '$$this'] }  # sum up `quantities` to get `order_quantity`
            }
        },
        
        "mean_order_unit_price": {
            '$divide': [
                {'$reduce': {
                    'input': '$unit_prices',
                    'initialValue': 0,
                    'in': {'$add': ['$$value', '$$this'] }  # sum up `unit_prices` and divide by number of items
                 }
                },
                {'$size': '$unit_prices'} 
            ]
        }
    }
}

In [92]:
cursor = orders.aggregate([mapping,mapping2,limiting])

In [93]:
for doc in cursor:
    pprint.pprint(doc)

{'_id': 536369,
 'country': 'United Kingdom',
 'customer_id': 13047,
 'date': datetime.datetime(2010, 12, 1, 8, 35),
 'item_total': [17.85],
 'mean_order_unit_price': 5.95,
 'num_purchases': 1,
 'order_quantity': 3,
 'order_total': 17.85,
 'purchases': [{'description': 'BATH BUILDING BLOCK WORD',
                'quantity': 3,
                'stock_code': '21756',
                'unit_price': 5.95}],
 'quantities': [3],
 'unit_prices': [5.95]}
{'_id': 536367,
 'country': 'United Kingdom',
 'customer_id': 13047,
 'date': datetime.datetime(2010, 12, 1, 8, 34),
 'item_total': [54.08,
                12.600000000000001,
                30.0,
                12.600000000000001,
                25.5,
                14.850000000000001,
                19.9,
                17.85,
                17.85,
                9.899999999999999,
                31.8,
                31.8],
 'mean_order_unit_price': 4.853333333333334,
 'num_purchases': 12,
 'order_quantity': 83,
 'order_total': 278.

In [95]:
grouping = {
    '$group': {
        "_id": "null",
        "orders_total": { '$push': '$order_total' }  # now push all '$order_total' into an array
    }
}

In [96]:
cursor = orders.aggregate([mapping,mapping2,grouping])

In [97]:
for doc in cursor:
    pprint.pprint(doc)

{'_id': 'null',
 'orders_total': [17.85,
                  278.73,
                  350.4,
                  328.8,
                  22.200000000000003,
                  259.86,
                  34.8,
                  444.98,
                  430.59999999999997,
                  204.0,
                  70.05000000000001,
                  22.200000000000003,
                  22.200000000000003,
                  139.12,
                  259.86,
                  489.6,
                  508.20000000000005,
                  130.85,
                  3193.92,
                  449.97999999999996,
                  79.6,
                  318.14000000000004,
                  226.14,
                  1024.6800000000003,
                  279.0,
                  22.200000000000003,
                  17.4,
                  507.88000000000005,
                  1825.74,
                  376.36000000000007,
                  426.56000000000006,
                  357.0,
        

                  426.6,
                  574.02,
                  292.62999999999994,
                  250.67999999999992,
                  664.4500000000003,
                  3336.7599999999993,
                  116.09,
                  2346.3399999999997,
                  181.13000000000005,
                  9.6,
                  7.8,
                  141.48000000000005,
                  432.90000000000003,
                  312.55,
                  212.51999999999998,
                  254.12,
                  94.0,
                  825.0400000000001,
                  666.8899999999999,
                  411.01999999999987,
                  185.0,
                  341.19999999999993,
                  450.78000000000014,
                  259.2,
                  172.79999999999998,
                  723.63,
                  622.88,
                  271.67999999999995,
                  618.8900000000001,
                  284.97,
                  182.750000000

                  450.5199999999999,
                  534.6500000000001,
                  489.59999999999997,
                  312.49,
                  626.1,
                  214.14999999999998,
                  467.7,
                  62.28,
                  157.75,
                  8.5,
                  195.1,
                  344.7,
                  106.66,
                  63.910000000000004,
                  115.95,
                  311.13,
                  481.64,
                  309.35,
                  299.80999999999995,
                  374.0,
                  1110.7199999999996,
                  478.51000000000005,
                  8142.75,
                  303.61000000000007,
                  1132.95,
                  494.84999999999997,
                  797.2599999999999,
                  173.07999999999998,
                  76.65,
                  275.93000000000006,
                  337.37999999999994,
                  565.0,
            

                  457.26000000000005,
                  533.0200000000001,
                  327.19999999999993,
                  365.0199999999999,
                  165.07000000000002,
                  865.62,
                  178.38000000000002,
                  21.96,
                  30.0,
                  1080.6499999999999,
                  213.7,
                  301.26000000000005,
                  299.69999999999993,
                  307.96999999999997,
                  114.94999999999999,
                  15.0,
                  1212.26,
                  351.70000000000005,
                  32.95,
                  1180.1199999999997,
                  214.25,
                  550.4000000000001,
                  5.8,
                  373.40000000000003,
                  168.95,
                  850.4600000000002,
                  793.8799999999999,
                  141.98,
                  156.84999999999997,
                  1305.6,
                  

                  125.55,
                  346.98,
                  676.36,
                  99.4,
                  117.06,
                  310.70000000000005,
                  305.96000000000004,
                  260.49000000000007,
                  5590.670000000002,
                  121.23000000000003,
                  77.31,
                  191.73999999999995,
                  112.75000000000001,
                  207.35999999999999,
                  114.60000000000001,
                  149.35,
                  373.79999999999995,
                  821.0,
                  121.92,
                  387.52,
                  386.74,
                  207.72999999999996,
                  301.2,
                  1250.4,
                  1047.12,
                  401.99999999999994,
                  1585.9199999999998,
                  822.24,
                  399.34000000000003,
                  301.8500000000001,
                  89.06,
                  125

                  377.76,
                  647.7700000000001,
                  225.77000000000012,
                  893.46,
                  99.6,
                  151.2,
                  335.55,
                  118.80000000000001,
                  290.1,
                  381.95,
                  393.60000000000014,
                  74.65,
                  300.16,
                  1204.3599999999997,
                  283.87,
                  183.70000000000002,
                  416.4600000000001,
                  439.05999999999995,
                  141.12,
                  51.78000000000001,
                  401.1,
                  951.48,
                  460.27000000000004,
                  695.0300000000001,
                  142.44,
                  148.72,
                  212.0,
                  756.72,
                  227.07999999999998,
                  229.98999999999998,
                  39.95,
                  244.5,
                  290.659

                  61.199999999999996,
                  41.989999999999995,
                  99.66,
                  203.10000000000008,
                  116.10000000000001,
                  33.0,
                  678.2,
                  82.03999999999999,
                  262.12,
                  153.45000000000002,
                  3.2,
                  309.21999999999997,
                  101.08999999999999,
                  312.62,
                  430.7000000000001,
                  114.61000000000001,
                  179.5,
                  202.87,
                  109.75,
                  3895.86,
                  557.4399999999999,
                  208.00000000000009,
                  471.9,
                  730.38,
                  221.87,
                  336.4299999999999,
                  83.2,
                  102.80000000000001,
                  356.40000000000003,
                  137.08999999999997,
                  231.74,
                

In [98]:
projecting = {
    '$project': {
        "max_orders_total": {'$max': '$orders_total'}
    }   
}

In [100]:
cursor = orders.aggregate([mapping,mapping2,grouping,projecting])

In [101]:
for doc in cursor:
    pprint.pprint(doc)

{'_id': 'null', 'max_orders_total': 168469.6}


# Entity resolution

__Example from the people_master collection__

In [None]:
{
  "_id": ObjectId("57d7a180fa937f710a7dfab0"),
  "last_name": "Austin",
  "job": "Further education lecturer",
  "ssn": "868-50-7592",
  "first_name": "Darren",
  "company_id": ObjectId("57d7a180fa937f710a7df9f9"),
  "birthday": ISODate("2014-06-09T21:37:46Z"),
  "email": "davidyoung@martinez-thomas.com"
}

__Example from the people_import collection__

In [None]:
{
  "_id": ObjectId("57d7a12bfa937f710a7d5a06"),
  "last_name": "Whitney",
  "address": {
    "city": "Whitetown",
    "state": "Connecticut",
    "street": "32940 Potter Burgs",
    "zip": "27841-1320"
  },
  "first_name": "Linda",
  "birthday": ISODate("2015-04-01T00:59:10Z"),
  "email": "stewartjessica@barber.com"
}

The fields on the documents in the people_import collection, expect for address, are also in the people_master collection. You'd like to find all of the documents in people_import that refer to the same person in the people_master collection.

The issue is that the data in people_import was entered in manually and likely contain small typos. Instead of performing a $lookup that matches exactly on all 4 fields that are in both collections (first_name, last_name, birthday, and email) you're going to use the Aggregation Framework to find all documents that match on at least 3 of those fields.

To do this you're going to build an aggregation pipeline that utilizes $lookup. Most of this pipeline has already been built for you. There are two parts that you need to fill-in.

First, you're going to need to build a `$match` stage for your `$lookup` that returns all documents that match on any of the 4 fields.

Hint: Remember, this stage uses variables defined via let inside of `$lookup`. This means you're going to need to use `$expr`.

Second, you're going to need to build a stage that adds a field called `matchScore`. This field is the number of fields that match the source document.

To verify that you've successfully completed this exercise, the pipeline in the notebook only returns documents that have exactly 3 fields that match. How many documents is that?

In [109]:
from pymongo import MongoClient
import pprint

In [110]:
course_cluster_uri = "mongodb://agg-student:agg-password@cluster0-shard-00-00-jxeqq.mongodb.net:27017,cluster0-shard-00-01-jxeqq.mongodb.net:27017,cluster0-shard-00-02-jxeqq.mongodb.net:27017/test?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin"
course_client = MongoClient(course_cluster_uri)

In [114]:
people_master = course_client['coursera-agg']['people_master']

In [135]:
greedy_match = {
    "$match": {
        "$expr": {
            "$gte": [
                {"$size": {
                    "$setIntersection": [["$first_name","$last_name","$email","$birthday"],
                                         ["$$first_name","$$last_name","$$email","$$birthday"]
                                        ]
                 }
                },
                3
            ]
        }
    }
}

In [143]:
match_score_calculation = {
    "$addFields": {
        "matchScore": {
            "$size": {
                "$setIntersection": [["$first_name","$last_name","$email","$birthday"],
                                     ["$$first_name","$$last_name","$$email","$$birthday"]
                                    ]
            }
        }
    }
}

In [144]:
cursor = people_master.aggregate([
    {
        "$lookup": {
            "from": "people_import",
            "let": {
                "first_name": "$first_name",
                "last_name": "$last_name",
                "email": "$email",
                "birthday": "$birthday",
            },
            "pipeline": [
                greedy_match,
                match_score_calculation,
                {
                    "$match": {
                        "matchScore": { "$gte": 3 }
                    }
                },
                {
                    "$sort": { "matchScore": -1 }
                },
                {
                    "$limit": 5
                }
            ],
            "as": "matches"
        }
    },
    {
        "$match": {
            "matches.matchScore": 3
        }
    }
])

In [145]:
len(list(cursor))

19