# Data Analytics with MongoDB Workshop

## Welcome

Welcome to the Data Analytics with MongoDB Workshop. It is our hope that you'll leave this workshop with a more thorough understanding of using the MongoDB Aggregation Framework to accomplish data transformation and analysis goals.

For this workshop we're putting our own data up on the block. We'll be using real data from MongoDB University (scrubbed and anonymized!) to ask some questions both entertaining and very serious.

## Overview

In this workshop, we'll be using both Jupyter Notebooks and MongoDB Compass to explore our data, construct pipelines to transform and query it, and to visualize the results.

You aren't required to or expected to know `pymongo` or Python deeply. If you have any questions as we go along and it isn't explained to your satisfaction (either Python or Aggregation), please don't hesitate to ask! The instructors and TAs are here to help, and your fellow attendees may know a thing or two as well!

## Outline

### Core Aggregation
We'll be covering the core stages of the Aggregation Framework as well as key expressions. You'll gain an understanding of how to weave stages and expressions together to perform some very interesting analysis.

### Capstones
We've prepared a set of Capstone exercises, with visualizations baked in!:
- Capstone 1 - Problem Problems
- Capstone 2 - Cliff Problems

Without further ado, let's jump in.

This phase is the workshop setup and the beginning of the aggregation section. This will prime the environment with our helper methods and fragments to load in (answers), and ensure students are set up correctly. This shouldn't take more than 15 minutes.

In [None]:
# setting up our workshop environment
%reload_ext autoreload
%autoreload 2
%matplotlib inline
from utils.workshop import *
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(color_codes=True)
from IPython.core.magic import register_cell_magic
@register_cell_magic
def handle(line, cell):
    try:
        exec(cell)
    except Exception as e:
        print(str(e))

In [None]:
# let's import our MongoClient class
from pymongo import MongoClient

In [None]:
# For Local or Vagrant execute this cell
client = MongoClient()

In [None]:
# For Docker use execute this cell
client = MongoClient(host="mongo", port=27017, serverSelectionTimeoutMS=200)

This is the basic setup. 
Depending on their base setup, they will have one of the prior two cells.


In [None]:
# Run this cell if you are in either Local or Docker
%%bash
mongorestore --host mongo --port 27017 dump --gzip

In this cell, studentswill be importing the dataset we will be using throughout the workshop.

For Vagrant, this has been already pre-loaded so students do not need to run this in case they are using Vagrant.

In [None]:
# this lets us refer to the mdbw database as mdbw
mdbw = client.mdbw
# and this lets us refer to the workshop database for exercises
workshop = client.workshop
# this lets us refer to the mdbw.student_courses collection as student_courses
student_course = mdbw.student_course

* client = ... is just initializing our connection to the database, akin to starting the mongo shell
* mdbw = ... is giving us a reference to the `mdbw` database, like `use mdbw` in the shell

## Mongo shell and workshop environment equivalents

This workshop environment and the Mongo shell differ in a few ways.

Under the hood, this workshop environment is in Python as it runs in a Jupyter Notebook.


## Displaying database names and collections

### Shell

In the Mongo shell we would use the familiar commands

    show dbs
    show collections
   
to show databases and collections.

![shell](assets/shell_show_dbs_and_colls.png)

### In the Notebook

In the workshop environment, you can use the `pymongo` methods `list_database_names()` and `list_collection_names()` to show the same information.

In [None]:
m_print(client.list_database_names())
m_print(mdbw.list_collection_names())

`m_print` is a utility function that uses a custom JSON encoder and `dumps` from the json package to print out our results in an easier to read format. An optional parameter `justone` can be supplied to print the first result.

## MongoDB Compass

Throughout this workshop, you are encouraged to use Compass. With it, you can easily:

* Visualize your schema
* Tinker with aggregations to visually see the pipeline
* Export your aggregations to the clipboard for easy use in the workshop

Take this time to explain the Compass window by launching Compass on your own computer.

Go over the connection screen, the database and collection views, and the various tabs.

**Try to time box this to ~10 minutes.**

## $match

The first stage we'll learn about - and arguably most important stage in the Aggregation Framework - is `$match`.
Unlike the rest of the stages in the Aggregation Framework, `$match` uses the operator expression syntax you are most likely familiar with.

    db.course.findOne({"key": "value"})
    

    db.course.find({"key": { "$gt": 5 }})
    

    db.course.findOne({"key": { "$type": "string" }})

    db.courses.find({"key": { "$gte": 5 }, "key2": "some_exact_value"})

These cells correspond to :
* A standard equality predicate
* A standard range predicate
* Using the type as the predicate
* Multiple predicates

Refer to these examples using this naming.

Let's look at how we'd do it with the standard query syntax using `findOne` (`find_one` in `pymongo`) in our workshop environment.

In [None]:
m_print(student_course.find_one({"offering": "M103/2018_April"}))

Now, let's do it with aggregation, using the `$match` stage.

But before we do, let's cover some basic rules.

## The Rules (of thumb)

### 1. Friends don't let friends write monolithic pipelines.

* Assign your stages to semantically meaningful variable names

The benefits of this are many. 
* the ability to select which stages to execute
* lightening the cognitive workload of understanding the pipeline 
* better debugability for the individual stages.

Pipelines can become very long and it's easy to forget what a particularly complex stage does when followed by several other equally complex stages.

### 2. Always keep a reference tab open to the [Aggregation Quick Reference](https://docs.mongodb.com/manual/meta/aggregation-quick-reference/)

* There are entirely too many stages and expressions available to try to remember
* The stages and expressions linked on this page show examples on how to use them!

Ok - now that the ceremony is out of the way - using the `$match` stage.

In [None]:
predicate = {
    "$match": {
        "offering": "M103/2018_April"
    }
}

# .next() is a Python/pymongo specific thing
m_print(student_course.aggregate([predicate]).next())

`.next()` is a Python/pymongo feature to iterate the cursor element by element.

It's like shifting off the first element in a list and using the value.

The benefit of using this approach is that it doesn't consume the entire cursor.

The `$match` stage in the Compass Aggregation Builder.

![compass_match](assets/compass_match.png)

Let's look at the flow of the Aggregation Builder in Compass to get a sense of what is going on.

![agg_explanation](assets/compass_explanation.png)

#### Reinforce to students:
* Input documents flow in from the previous stage
  * If it is the first stage, input documents flow in from the collection
* The stage transforms the document stream in some way.
* This process repeats until the final stage.

## Think of `$match` as a filter!



As documents flow into the `$match` stage, those that don't meet the predicate(s) will be "filtered" out.

`$match` should come early and often in your Aggregation pipelines. Only work with the data you need.

![agg_factor](assets/agg_factory.gif)

#### Address the following questions/topics:

- Why think of `$match` as filter?

Think of an Aggregation Pipeline like an assembly line where documents pass through stations.

- Each "station" is a stage.

- In this example, the first stage is `$match`, and it is set up to filter out anything green.

- The second stage is `$project`, and it is set up to transform the shape from square to circle.

- The third stage is `$group`, and it is set up to collect all input and give us a ratio of red to blue circles.

## Exercise: `$match`

For this exercise, find all documents that match the following criteria:

* **offering** is either *"M103/2018_April"* _or_ *"M121/2018_April"*
* **user_id** is greater than or equal to **75000** and less than **150000**

The print statements are provided for you to display your results.

In [None]:
predicate = {
    "$match": {
        <your code here!!>
    }
}
m_len(collection=student_course, aggregation=[predicate])

m_print(student_course.aggregate([predicate]))


``` 
   predicate = {
        "$match": {
            "offering": { "$in": ["M103/2018_April", "M121/2018_April"] },
            "user_id": { "$gte": 75000, "$lt": 150000 }
        }
    }
```

Students should get **88**

`m_len` is a utility function to print out how many results we get. Internally it just appends the `$count` stage to an aggregation.

## Solution: `$match`

In [None]:
# %load fragments/match_exercise.py
predicate = {
    "$match": {
        "offering": { "$in": ["M103/2018_April", "M121/2018_April"] },
        "user_id": { "$gte": 75000, "$lt": 150000 }
    }
}


In [None]:
m_len(collection=student_course, aggregation=[predicate])


## Expressions

The next area we'll cover is expressions. If stages like `$match`, `$project`, and `$group` are the assembly stations of an aggregation pipeline, expressions are the individual tools at each station.

Expressions are functions. With them we can add, multiply, get square roots, join sets, and much, much more. Here are functionally identical examples of a function called add.

- Python Syntax

```python
    def add(a, b):
        return a + b
```

- JavaScript Syntax

```javascript
    function add(a, b) {
      return a + b
    }
```

- Aggregation Syntax

```js
    { "$add": [ "$a", "$b" ] }
```

Here's another example for a function called `floor`, which floors the input to the next lowest integer value.

Python:

```python
    def floor(a):
        return int(a)
```

JavaScript:

```javascript
    function floor(a) {
      return a | 0
    }
```

Aggregation:

```json
    { "$floor": "$a" }
```

And just like functions, expressions can be composed together. Currently there is no round function in the Aggregation Framework, but it's easy to make.

    { "$floor": { "$add": [ "$a", 0.5 ] } }

Adding `0.5` to a number and then flooring it will produce a rounding result.

```
.4 + .5 -> .9 -> floor(.9) -> 0
.5 + .5 -> 1 -> floor(1) -> 1
.9 + .5 -> 1.4 -> floor(1.4) -> 1
```

There is a unit test on the next slide to show this works

In [None]:
import numpy as np
def round(a):
       return int(a + 0.5)
    
expected = [0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2]
actual = [round(a) for a in np.arange(0.0, 2.0, 0.1)]
assert actual == expected
# print([a for a in np.arange(0.0, 2.0, 0.1)])

Here we're using a utility function from the `numpy` library called `arange` to give ourselves values from `0` to `1.9`, increasing by `0.1`

We pass each value through our round function to show that function composition gives us the value we want.

Feel free to have students uncomment the print line to see the list produced from `arange`.

Let's take a closer look at our custom `round` expression to get a feel for aggregation semantics.

    { "$floor": { "$add": [ "$a", 0.5 ] } }

    { "$floor": { "$add": [ "$a", 0.5 ] } }
    ^           ^                       ^ ^
    
Unless an expression is entirely self contained, it is surrounded by curly brackets. These delimit the scope of the expression.

Expressions are evaluated from the inside out. So, the `$add` expression is evaluated before the `$floor` expression.

    { "$floor": { "$add": [ "$a", 0.5 ] } }
                          ^           ^

As a general rule, expressions that take multiple arguments accept them in an array. Some expressions only accept 1 or 2 arguments, like `$subtract`, while others are variadic (number of arguments isn't fixed), like `$add`.

    { "$floor": { "$add": [ "$a", 0.5 ] } }
                              ^
                              
Lastly, let's cover field path expressions. Field path expressions are self contained, hence there's no need to surround them with curly braces when composing them with other expressions or using them to supply an argument to an expression.

Field path expressions refer to a value in the current document. In our custom `round` expression, `"$a"` refers to the value in the current document that has a key of **a**. 


We will look into more detail ahead but reinforce that field path expressions allow you to use values of incoming document fields in the expressions/stages. 

Field path expressions will be covered in more depth in the upcoming sections.


## Expression "Categories"

There are many built-in expressions in the Aggregation Framework that fall into several categories.

![expression categories](assets/expression-categories.png)

Walk students through the main groups, however don't spend too much time on it, just a couple of examples don't go through the extended list

* Arithmetic - add, subtract, divide, multiply, square root, etc...
* Array - filter, map, reduce, arrayElemAt, etc...
* Boolean - and, or, not
* Comparison - eq, ne, gte, lte, etc...
* Conditional - switch, cond, isNull
* Date - dayOfWeek, dayOfMonth, month, second, etc...
* Object - objectToArray, mergeObjects,
* Set - setUnion, setIntersection, setDifference
* String - substr, indexOfCP, concat, etc...
* Accumulators - avg, min, max, stdDevPop, etc...

The best way to learn is by doing. We'll be using expressions heavily throughout this workshop, and pointing out interesting functionality as we go.

Remember:

* Expressions are just functions
* Expressions can be composed together to form complex and powerful logic.
* Refer to the [aggregation quick reference](https://docs.mongodb.com/manual/meta/aggregation-quick-reference/) often!

We are now transitioning into coverage of `$addFields` stage.

## Calculating values and storing them for later use

Let's consider the following document

    {
        name: "John Smith",
        yearly_income: 60000,
        dependents: 3
    }
    
We'd like to engineer a feature, a derived point of data. In Aggregation, we call these **computed fields**. The field should be called **income_pressure**, and is simply the `yearly_income` divided by the number of dependents plus 1 to account for John Smith himself.

    income_pressure = yearly_income / (dependents + 1)

We use two field path expressions to refer to values directly in the document, `$yearly_income` and `$dependents`.

Because expressions are evaluated inside out, the value of `$dependents` is inserted into position, then 1 is added to that.

The resultant value is put in place of the entire `$add` expression, so in our example the value 4 is now there. Then, the value of `yearly_income` is put into place (60000)

Lastly, the `$divide` expression is evaluated. 60000 / 4 == 15000. 15000 is returned from this expression composition.

    { $divide : [ "$yearly_income",  {"$add": [ "$dependents", 1 ] }  ] }

    { "$divide": [ "$yearly_income", { "$add": [ "$dependents", 1 ] } ] }
    
The above expresses the logic we want, first adding one to the value of **dependents**, then using that amount to divide the value in **yearly_income**.

**How do we now assign this value to a field in the document for use in later stages?**

Make sure students understand what "_assigning this value to a field in the document_" means. 

We are asking them to set the value to a new field so it can be use in later pipeline stages. 

## `$addFields`

One possible solution is to use the `$addFields` stage.

`$addFields` - and `$project`, which you'll learn about in the next section - should be considered a main transformative stage, akin to `map` in many programming languages.

Put simply, `$addFields` adds fields to the existing document, or modifies existing fields.

Here's what the `$addFields` stage would look like to calculate and assign the value from the previous expression to a field called **income_pressure**

```
{
    "$addFields": {
        "income_pressure": {
            "$divide": [
                "$yearly_income",
                { "$add": [ "$dependents", 1 ] }
            ]
        }
    }
}
```

## `$addFields` Exercise 1

Let's give it a try! Use your initials or nickname for the collection name making a collection handle.

In [None]:
%%capture
my_coll = workshop.mongonathan
my_coll.drop()
my_coll.insert_one({"name": "John Smith", "yearly_income": 60000, "dependents": 3})

`%%capture` supresses the output of the cell

We are using the drop method to ensure what we insert is the only document in the collection.

In [None]:
income_pressure = {
    "$addFields": {
        "income_pressure": <your code here!!>
    }
}
m_print(my_coll.aggregate([income_pressure]).next())

This is what the stage should look like:

```
income_pressure = {
    "$addFields": {
        "income_pressure": {
            "$divide": [
                "$yearly_income",
                { "$add": [ "$dependents", 1 ] }
            ]
        }
    }
}
```

## `$addFields` Exercise 2

For this exercise, you'll use expressions from the **Accumulator** family to accumulate results, and assigning the results to a computed field in an `$addFields` stage.

Given the following document...

In [None]:
%%capture
yearly_sales_2016 = [
    { "quarter": 1, "sales": 8675309 },
    { "quarter": 2, "sales": 6309212 },
    { "quarter": 3, "sales": 9755867 },
    { "quarter": 4, "sales": 8812392 }
]
yearly_sales_2017 = [
    { "quarter": 1, "sales": 8027581 },
    { "quarter": 2, "sales": 7792839 },
    { "quarter": 3, "sales": 6690023 },
    { "quarter": 4, "sales": 10023977 }
]
my_coll.drop()
my_coll.insert_many([
    {"year": 2016, "yearly_sales": yearly_sales_2016},
    {"year": 2017, "yearly_sales": yearly_sales_2017}
])

We're creating some fake data and assigning it to variables `yearly_sales_2016` and `yearly_sales_2017`.

We then insert two documents into our test collection with a year key and our fake data corresponding to that year.

Calculate the following:
* The average quarterly sales. Add this value to a new field called **avg_quarterly_sales**
* The total yearly sales. Add this value to a new field called **total_yearly_sales**

In [None]:
add_sales_information = {
    "$addFields": {
        "avg_quarterly_sales": <your code here!>,
        "total_yearly_sales": <your code here!>
    }
}
m_print(my_coll.aggregate([add_sales_information]))

This is how the second `$addFields` stage can look like:

```
add_sales_information = {
    "$addFields": {
        "avg_quarterly_sales": { "$avg": "$yearly_sales.sales" },
        "total_yearly_sales": { "$sum": "$yearly_sales.sales" }
    }
}
```

## Solution


In [None]:
%load fragments/addFields.py

### A note about field path expressions

As you can see, field path expressions are incredibly versatile.

We can use dot notation (.) in a field path to specify a child key.

If the field we specify is an array of documents like **yearly_sales** is, the field path expression
`"$yearly_sales.sales"` will extract the value at that key for every document in the array.

Let's look at a quick example to illustrate this.

In [None]:
just_sales = {
    "$addFields": {
        "just_sales": "$yearly_sales.sales"
    }
}
m_print(my_coll.aggregate([just_sales]))

### Assigning a computed value to an existing field

If we specify the name of an existing field within `$addFields`, the computed value will replace the existing value in the pipeline.

An example follows showing assigning only the sales values to the field **yearly_sales**

In [None]:
yearly_sales = {
    "$addFields": {
        "yearly_sales": "$yearly_sales.sales"
    }
}
m_print(my_coll.aggregate([yearly_sales]))

Because the key `yearly_sales` already exists, specifying it as the target of the expression result overwrites the existing value.

Stress to students that all of these transformations are taking place only within the pipeline, and they do not impact documents on disk. While it is possible to write using the `$out` stage, that will come later in the workshop.

## `$project`

You've now seen the basics of computing fields with expressions and transformation by either overwriting existing keys or assigning computed values to new keys with `$addFields`.

Another stage that is **very** similar to `$addFields` is the `$project` stage, but there are some nuanced differences.

Let's explore the similarities and differences.

`$project` is also capable of document transformation by computing fields.
Using an answer to a previous exercise, we'll use a `$project` stage to illustrate a key difference.

In [None]:
add_sales_information = {
    "$project": {
        "avg_quarterly_sales": { "$avg": "$yearly_sales.sales" },
        "total_yearly_sales": { "$sum": "$yearly_sales.sales" }
    }
}
m_print(my_coll.aggregate([add_sales_information]))

If necessary, point out that aside from the **_id** field, all of the other fields except those specified are now absent from the documents.

The `$project` stage also implictly functions like the projection portion of a standard query.

As you can see, all other fields except those we specified were removed from the documents except the **`_id`** field.

`$project` also implicitly has the functionality of the project mechanics like those in standard query operations.

We must be explicit to remove the **`_id`** field.

In [None]:
add_sales_information = {
    "$project": {
        "_id": 0,
        "avg_quarterly_sales": { "$avg": "$yearly_sales.sales" },
        "total_yearly_sales": { "$sum": "$yearly_sales.sales" }
    }
}
m_print(my_coll.aggregate([add_sales_information]))

If we only want to remove fields, we can specify to do so identically to how we just specified to remove the **`_id`** field.

However, there is one caveat to this.

In [None]:
# will work
remove_id_and_sales = {
    "$project": {
        "_id": 0,
        "yearly_sales": 0
    }
}
m_print(my_coll.aggregate([remove_id_and_sales]))

In [None]:
# won't work!
transform = {
    "$project": {
        "_id": 0,
        "yearly_sales": 0,
        "avg_quarterly_sales": { "$avg": "$yearly_sales.sales" },
        "total_yearly_sales": { "$sum": "$yearly_sales.sales" },
        "year": 1
    }
}
m_print(my_coll.aggregate([transform]))

This will fail because we can't mix and match field exclusion when specifying inclusion or computing fields.

The only field we can exclude when computing fields or specifying inclusion is the **`_id`** field.

## Exercise

Given the previous `$project` stage and a desire for the following shape

    {
        avg_quarterly_sales: ...,
        total_yearly_sales: ...,
        year: ...
    }
    
Correct the stage to output the desired shape.

In [None]:
transform = {
    "$project": {
        "_id": 0,
        "yearly_sales": 0,
        "avg_quarterly_sales": { "$avg": "$yearly_sales.sales" },
        "total_yearly_sales": { "$sum": "$yearly_sales.sales" },
        "year": 1
    }
}
m_print(my_coll.aggregate([transform]))

Solution - remove the yearly_sales exclusion

```
transform = {
    "$project": {
        "_id": 0,
        "avg_quarterly_sales": { "$avg": "$yearly_sales.sales" },
        "total_yearly_sales": { "$sum": "$yearly_sales.sales" },
        "year": 1
    }
}
```

## Solution

Remove the exclusion for **yearly_sales**

In [None]:
transform = {
    "$project": {
        "_id": 0,
        "avg_quarterly_sales": { "$avg": "$yearly_sales.sales" },
        "total_yearly_sales": { "$sum": "$yearly_sales.sales" },
        "year": 1
    }
}
m_print(my_coll.aggregate([transform]))

## `$addFields` vs `$project`

Let's summarize the similarities and differences between `$addFields` and `$project`

![project-vs-addfields](assets/project-vs-addfields.png)

Both stages are capable of computing values.

The main difference is in the projection mechanics. `$project` can explicitly include/exclude fields, and will implicitly perform exclusion of all unspecified fields when computing a value except for the `_id` field.

As a general rule, prefer `$addFields` when you wish to only add a new computed value to the document or reassign an existing field without modifying other fields.

# Timing

In previous deliveries this is where the first 15 minute break occurred.

# Data Cleansing

Let's begin by getting student scores on problems offered on MongoDB university.

A large part of "data science" is cleaning and transorming data into a usable form. We'll now begin work to transform a collection into a more usable form, throwing away data we're not interested in.

## Initial Exploration

Begin by opening MongoDB Compass and connecting to the **mdbw** database.

We'll initially be working with the **student_course** collection. Explore the schema and poke around within invidual documents to get a sense of the layout.

At this point, open Compass and walk through the collection with students. This is a critical point to shape student's understanding of the problem. Spend roughly 10-15 minutes. Any collection with **etl** and the **correlated_answers** collection do not need to be shown.

Be sure to visit
* The Schema tab
* The Documents tab

Point out the nesting.

**tab_map** is where the student's submissions to problems are stored. 

Unfortunately, **tab_map** may be an empty object. We're going to filter out documents with an empty **tab_map** object.

To begin with, we'll want to filter out those documents that have an empty **tab_map**. Each document has a **tab_map** entry, but some are empty.

    ...,
    tab_map: {},
    ...

To make it more interesting, there isn't a universal property within **tab_map** that we can check either. The entries in **tab_map** are the ids of gradeable problems. This makes the following impossible


```
student_course.aggregate([
    {
        "$match": {
            "tab_map.some_universal_property": { "$exists": True }
        }
    }
])
```

## `$objectToArray` to the Rescue!

Thankfully, we have the `$objectToArray` expression. Put simply, `$objectToArray` converts an object to an array of key value pairs.

```
{
    "name": "Jane Smith",
    "age": 28
}
```
becomes
```
[ {"k": "name", "v": "Jane Smith"}, {"k": "age", "v": 28 } ]
```

![objectToArray](assets/objecttoarray2.gif)

So, with `$objectToArray` we can convert an object into an array where each entry is an object mapping to one key/value pair.

With that, we can use the `$size` expression to return the size of the resultant array to us.

```
{ "$size": { "$objectToArray": "$tab_map" } }
```

And then use that in a comparison.

```
{ "$gt": [ {"$size": { "$objectToArray": "$tab_map" } }, 0 ] }
```

Let's give it a try!

In [None]:
ensure_tab_map_size = {
    "$match": <your code here!!>
}

Without using `$expr`, this isn't expressible.

However, encourage students to think about this.

Unfortunately this isn't directly expressible in a `$match` stage.

This brings us to a common pattern in Aggregation, computing a value in either `$addFields` or `$project` and following immediately with a `$match` stage.

## Exercise - Filter based on a computed value

How many documents in the **student_course** collection have a non-empty **tab_map** property?

In [None]:
compute_tab_map_size = {
    "$addFields": <your code here!!>
}
filter_out_no_tab_map = {
    "$match": <your code here!!>
}
m_len(collection=student_course, aggregation=[compute_tab_map_size, filter_out_no_tab_map])

In [None]:
compute_tab_map_size = {
    "$addFields": {
        "tab_map_not_empty": {
            "$gte": [{"$size": {"$objectToArray": "$tab_map"}}, 1]
        }
    }
}
filter_out_no_tab_map = {
    "$match": {
        "tab_map_not_empty": True
    }
}
m_len(collection=student_course, aggregation=[
      compute_tab_map_size, filter_out_no_tab_map])


## Solution

In [None]:
%load fragments/filter_based_on_computed_value.py

Talk to students about common patterns that emerge in aggregation. This is the first one. Computing a value and then filtering on it.

## But it can be expressed in `$match` (as of 3.6)

This pattern of computing a field and then following with a `$match` stage was so common, an operator was added in MongoDB 3.6 to allow the expression of rich logic within a `$match` stage (actually within the query operator framework!).

## Introducing `$expr`

The `$expr` operator tells MongoDB to switch from query operator syntax to expression syntax.

```
some_field: { "$gte": 5 }
```

becomes

```
"$expr": { "$gte": [ "$some_field", 5 ] }
```

Let's use `$expr` to find all entries in **student_course** where the **updated_at** field falls on a Tuesday (the day assignments are due on MongoDB University and the day new Chapters become available).

To do this, we'll use the `$dayOfWeek` Aggregation expression. `$dayOfWeek` will return a value between 1 (Sunday) and 7 (Saturday).

In [None]:
tuesday = 3
updated_on_tuesday = {
    "$match": {
        "$expr": {
            "$eq": [ { "$dayOfWeek": "$updated_at"}, tuesday]
        }
    }
}
m_len(collection=student_course, aggregation=[updated_on_tuesday])

14670 documents where the **updated_at** value is a Tuesday. For fun, let's look at activity by hour for Tuesdays. For now we'll use a utility function, but you'll learn how to do this later in the workshop.

Incidentally, this is built in to Compass in the Schema tab!

Also keep in mind times are in UTC, and our assignments are due at 1700!

Show students this functionality in the Compass Schema tab.

In [None]:
from utils.workshop import get_distribution_for_tuesdays
hours = get_distribution_for_tuesdays(student_course)
sns.set(style="whitegrid")
ax = sns.barplot(x="hour", y="count", data=hours)

Our students appear to like to wait until the last moment!

Ask students:

- Are any attendees data points in this graph?

## Exercise - Use `$expr` in `$match` to filter out documents with an empty `tab_map`

Using a single `$match` stage with an `$expr` within, filter out documents with an empty **tab_map** object.

In [None]:
no_empty_tab_maps = {
    "$match": {
        "$expr": <your code here!!>
    }
}
m_len(collection=student_course, aggregation=[no_empty_tab_maps])

In [None]:
# solution
no_empty_tab_maps = {
    "$match": {
        "$expr": { "$gt": [ { "$size": { "$objectToArray": "$tab_map" } }, 0 ] }
    }
}
m_len(collection=student_course, aggregation=[no_empty_tab_maps])

## Solution - no_empty_tab_maps

In [None]:
%load fragments/no-empty-tab-maps-solution.py

`$expr` is awesome. 

Ask stundents:

- why not just use it exclusively?

It is awesome, however it won't use an index unless it is strict equality. Obviously later in the pipeline this doesn't matter, but as a first stage `$match`, if there is a range query it is better to use the pattern of compute/match.

## Shaping

Ok! Now that we've ensured we're only working with documents with a **tab_map** that actually has entries, it's time to shape.

Since we're performing an ETL to use for later analysis, it's ok if we throw away the **_id** field. We'll also want to convert the **tab_map** object into an array for later use. Again, this is because we don't know the top level key as it's an **_id** of the tab itself. However, we _are_ interested in its value! We also want to keep the **offering** and **user_id** values!

We need to add two more stages. The first should remove the **_id** entry and convert **tab_map** into an array, and the second should extract all of the **v** entries and reassign those to **tab_map**

In [None]:
remove_id_and_convert_to_array =  {
    "$project": {
        "_id": 0,
        "offering": 1,
        "user_id": 1,
        "tab_map": {
          "$objectToArray": "$tab_map"
        }
    }
}

remap_tab_map_values_to_parent = {
    "$addFields": {
      "tab_map": "$tab_map.v"
    }
  }

Prior to running these in the notebook, open Compass and add these stages to your **student-map-elt** pipeline.

Demonstrate this in Compass and point out that this **tab_map** is now an array

Let's define and run our pipeline and preview the results in the notebook!

In [None]:
pipeline = [
    no_empty_tab_maps,
    remove_id_and_convert_to_array,
    remap_tab_map_values_to_parent
]
m_print(student_course.aggregate(pipeline), justone=True)

Ok, we're making progress! Looking at the results, there's yet more to clean.
The only values we really need from the **tab_map** documents are **_cls**, **text**, **indices**, **choices**, and **index** as these will help us map these student attempts later to whether the attempt was correct.

So, how do we work with the values inside the **tab_map** array? There are many options, all of which we'll cover shortly.

## `$unwind`

One of the easiest ways to work with data in an array is to use the `$unwind` aggregation stage.

`$unwind` accepts one argument, a field path expression that points to an array. We'll be using it on the **tab_map** field, so it looks like this
```
"$unwind": "$tab_map"
```

`$unwind` "unwinds" or expands an array, and creating a new document for **every** entry in the array.
The value of the entry will be assigned to the key that was pointed to by the field path expression to `$unwind`.  Let's view this in action. We're going to add a `$limit` stage to the end of our pipeline, and then `$unwind` for demonstration purposes.

In [None]:
unwinding_tab_map= {
    "$unwind": "$tab_map"
}

# making a copy of our pipeline to test with! Always do this
unwind = pipeline[:]
unwind.append(unwinding_tab_map)

m_print(student_course.aggregate(unwind), justone=True)

Point out the transition, **tab_map** is now a document rather than an array of documents.

As we iterate through this next section, we'll be assigning the current value of `pipeline` to another temporary variable that we can add stages to as we test. This is so that as we explore and validate the transformations we are preserving the original pipeline and can append/extend it with finished stages.

What's with the funny `[:]` at the end of the assignments? It's so that we clone the current pipeline. If we didn't add that, we'd just be aliasing `pipeline`.

Alright, it looks like we've arrived at a document shape that we can work with. However, one thing we need to take into considering is if the transformed **tab_map** is entry.

We'll modify our usage of `$unwind` slightly, taking advantage of a different form. We can also specify to `$unwind` whether to `preserveNullAndEmptyArrays`, which defaults to True, as well as an option to `includeArrayIndex`, which takes a string as a key we'd like defined with the position the element was at in the array. Lastly, to use this form of `$unwind`, we must specify a `$path`.

Let's look at an example.

In [None]:
# no need to specify includeArrayIndex. We don't want it
unwinding_tab_map_discard_empties = {
    "$unwind": {
        "path": "$tab_map",
        "preserveNullAndEmptyArrays": False
    }
}
# again making a copy of the pipeline
discard_empties = pipeline[:]
discard_empties.append(unwinding_tab_map_discard_empties)
m_print(student_course.aggregate(discard_empties), justone=True)

And we can see the the effect is identical, however now we don't have to worry encountering a document with a value like `"tab_map": {}`.

So, now each document contains a **tab_map** object and is representative of one student's work on a single problem.

- The **id** refers to the unique problem id.
- Within the **attempts** array:
  - The **_cls** refers to the type of problem it is
  - **date** is the date of attempt
  - **indices**, **index**, or **text** is their submission
  

This shape looks good to work with to transform the answers into just the data we need, namely the id of the problem, and the student's attempts and submissions. We'll correlate this information with another collection later that contains the answers. Let's ensure we save this stage back to the `pipeline` variable we're working with.

In [None]:
# making a temporary copy of our pipeline
tmp_pip = pipeline[:]
pipeline.append(unwinding_tab_map_discard_empties)

Make sure to let students know to only run cells with `append` once (with shift-enter).

Also remind students where we are.

- We began with a document representing one student's **problems** per course
- With the `$unwind` we're increasing dimensionality and moving to a document representing one problem per student per course.

Continue to frame the dimensionality for attendees as necessary as we continue.

## Going Deep into Expression Composition

This next section will introduce several new expressions as well as compose them together to extract the information we want and produce the desired shaped. Here is a sample record of what we are going for.

![desired_shape](assets/desired_shape.png)

We'll be transforming the **tab_map** document and assigning the transormation to a key called **submission**. Within, we'll compute the **num_attempts** and **problem_id** field. We'll also clean up the **attempts** to only get the relevant data we need as described previously. Keep in mind that because we want to remove **tab_map** and instead have a key called **submission**, we'll need to use the `$project` stage.

Let's begin!

First, we'll express that we want to retain the **user_id** and **offering**, then specify the new field we want, **submission**, along with the **problem_id**, and **num_attempts**.

```
"$project": {
    "user_id": 1,
    "offering": 1,
    "submission: {
        "problem_id": "$tab_map.id",
        "num_attempts": ....
```

We want the **num_attempts** field to be the size of the **attempts** array within **tab_map**. How do we get this information?

We can use the `$size` expression, and specify the attempts array as its input.

`"num_attempts": { "$size": "$tab_map.attempts" }`

Let's give it a try.

In [None]:
%%handle
attempts_size = pipeline[:]
calculate_num_attempts = {
    "$project": {
        "user_id": 1,
        "offering": 1,
        "submission": {
            "problem_id": "$tab_map.id",
            "num_attempts": { "$size": "$tab_map.attempts"}
        }
    }
}
attempts_size.append(calculate_num_attempts)
m_print(student_course.aggregate(attempts_size), justone=True)

Enterprising students might notice the magic `%%handle`. 

This aggregation will indeed throw an error from Mongo because some **tab_maps** do not have an **attempts** field!

Unfortunately, we're encountering an error because some documents do not have a **tab_map.attempts** field. 

This may be because the entry wasn't actually a problem. In order to avoid this, we'll need to use a `$match` stage prior to this `$project` to eliminate those records.

## Exercise - Ensure attempts exists

In [None]:
attempts_size_no_error = pipeline[:]
calculate_num_attempts = {
    "$project": {
        "user_id": 1,
        "offering": 1,
        "submission": {
            "problem_id": "$tab_map.id",
            "num_attempts": { "$size": "$tab_map.attempts"}
        }
    }
}
ensure_attempts_exist = {
    "$match": {
        <your code here!!!>
    }
}
attempts_size_no_error.extend([ensure_attempts_exist, calculate_num_attempts])
m_print(student_course.aggregate(attempts_size_no_error), justone=True)

This is how the stage should look like:

```js
    ensure_attempts_exist = {
        "$match": {
            "tab_map.attempts": { "$type": "array"  }
        }
    }
```



## Solution

In [None]:
%load fragments/ensure_attempts_exist.py

Ok, let's go ahead and add `ensure_attempts_exist` to our pipeline. Then, let's perform the computation again.

In [None]:
pipeline.append(ensure_attempts_exist)

In [None]:
attempts_size = pipeline[:]
calculate_num_attempts = {
    "$project": {
        "user_id": 1,
        "offering": 1,
        "submission": {
            "problem_id": "$tab_map.id",
            "num_attempts": { "$size": "$tab_map.attempts"}
        }
    }
}
attempts_size.append(calculate_num_attempts)
m_print(student_course.aggregate(attempts_size), justone=True)

# Timing

Suggested break. You should be anywhere between 1:45 to 2:15 into the workshop here.

Now we're able to compute the value for **num_attempts** without throwing an error. Excellent.

Next, we need to clean up the **attempts** array.

For reference, this is what we're going for.

![desired_shape](assets/desired_shape.png)

We'll need to iterate over the elements within the **attempts** array. To do so, we'll use the `$map` expression. 

## `$map`

![map](assets/$map.gif)

Map works by taking a supplied array and a specified expression, then applying that expression, one by one, to each element in the array.

In this example, `$map` accepts the THING array and squares each element by applying the `$pow` of 2 expression to each value, returning the array with the squared values maitaining order.


In the Aggregation Framework, `$map` has the following form

```
"$map": {
    input: <expression>,
    as: <string>,
    in: <expression>
}
```

`input` is the input array that we want to map over. Any expression could be specified here, most typically it will be a field path expression to an array in the document.

`as` is an optional field. If we wanted to refer to the current element being transformed by a different name other than `$$this`, we would specify that name here.

`in` is the expression or composition of expressions that the value will be transformed with.

So, the transformation so far will look like
```
    "$project": {
        "user_id": 1,
        "offering": 1,
        "submission": {
            "problem_id": "$tab_map.id",
            "num_attempts": { "$size": "$tab_map.attempts"},
            "$map": {
                "input": "$tab_map.attempts",
                "in": {
                    ....
                }
            }
        }
    }
}
```

We aren't bothering with the optional `as` field here because it's easy to reference the element with `$$this`.

Alright, we're ready to process the data in the **attempts** array. When working with an expression like `$map`, it's oftentimes easier to isolate the transformation within the `in` section, so we'll do just that.

Let's take one document from the **attempts** array from the output from our saved pipeline and work with it in isolatation.

In [None]:
%%capture
just_one_attempt = {
    "_cls": "HgCheckAllThatApplyAttempt",
    "date": "2017-11-27 21:40:17",
    "git_stamp": {
      "date": "2017-11-27 21:40:11",
      "revision": "789c694b49dd6d7dfbb245af7ab71bf509b0ba38"
    },
    "indices": [
      1,
      0
    ]
}
my_coll.drop()
my_coll.insert_one(just_one_attempt)

Although this document is limited and doesn't include all problem types, it will work for our needs. As a reminder, the keys we want to keep are **_cls**, **text**, **indices**, **choices**, and **index**.

In order to do this, we'll use the `$objectToArray` expression you've learned about already. Let's run an aggregation, transforming this document into an array to see the results. We'll use an `$addFields` stage, but keep in mind that this is only for testing our transformation. In the real pipeline, this transformation will take place in the `in` section of `$map`.

In [None]:
transform_attempt = {
    "$project": {
        "_id": 0,
        "attempt": { "$objectToArray": "$$ROOT"}
    }
}
m_print(my_coll.aggregate([transform_attempt]), justone=True)

We've just casually introduced `$$ROOT` to students! Explain to them that `$$ROOT` is a system variable that refers to the root document. It's a very useful way to transform the entire document at once when there's no field path to reference it by.

Additionally, the document was given an _id by MongoDB when we inserted it, hence why we projected it away. This won't be the case when we finish building out this transformation and use it within the real pipeline.

Alright, this is promising. The **k** keys now have values of the key name in the original object. We can now filter out the keys we aren't interested in. Unfortunately, we can't use `$match` symantics because we're firmly executing within the context of an expression. Thus, we're introduced to our next expression, `$filter`

## `$filter`

![filter](assets/$filter.gif)

Filter works a lot like map. However, rather than apply a transformation to each element in the array, again referenced to as THING in this example, filter takes a predicate expression that must resolve to true or false.

Here, filter is taking every element in the array and testing it with this expression that composes the `$mod` modulo operator and checks if the element is even. If it is it will return it back, otherwise it will be filtered out.

In the aggregation framework, `$filter` has the following form
```
"$filter": {
    "input": <expression>,
    "as": <string>,
    "cond": <expression>
}
```

`input` is the input array that we want to iterate over. Any expression could be specified here, most typically it will be a field path expression to an array in the document.

`as` is an optional field. If we wanted to refer to the current element being transformed by a different name other than $$this, we would specify that name here.

`$cond` is the expression or composition of expressions that the value will be tested with.

Now that we know how to filter elements in an array, let's do so. Your task is to use `$filter` to keep only the keys we're interested in.

## Exercise - Filter out undesired keys
A shell is provided to help get you started.

In [None]:
transform_attempt = {
    "$project": {
        "_id": 0,
        "attempt": {"$objectToArray": "$$ROOT"}
    }
}
keys_to_keep = ["_cls", "text", "indices", "choices", "index"]
filter_keys = {
    "$filter": {
        <your code here!!!>
    }
}
filter_attempt = {
    "$addFields": {
        "attempt": filter_keys
    }
}
m_print(my_coll.aggregate([transform_attempt, filter_attempt]), justone=True)

```
"$filter": {
    "input": "$attempt",
    "cond": {
        "$in": [ 
            "$$this.k",
            keys_to_keep
        ]
    }
}
```
Specify the **attempt** array as input with a field path.
Use the `$in` operator to determine whether `$$this.k`, the current element's **k**, is in the keys to keep array.

## Solution

In [None]:
%load fragments/filter_out_undesired_keys.py

Great job! We've now successfully cleaned the array to only retain the values that we want, but there is a more succint method.

Recall that expression are composable. We can specify as to the `input` of `$filter` the transformation we apply in the `$project` stage we called **transform attempt**.

Run the next code cell to see it in action.

In [None]:
%load fragments/filter_keys_one_shot.py

So now that we have a cleaned array, we need to get it back into an object. We want to be able to reference the values by key name for later operations.

You guessed it, there's another expression for that.

Introducing `$arrayToObject`

## `$arrayToObject`

`$arrayToObject` has the following form
```
"$arrayToObject": <expression>
```
The expression can be anything that resolves to an array of one of the following forms.

[["key1", "value1"], ["key2", "value2"]]
    - or -
[{"k": "key1", "v": "value1"}, {"k": "key2", "v": "value2"}]

In either case, they will both end up as
```
{
    "key1": "value1",
    "key2": "value2"
}
```

We know that our filter expression results in an array of the required form, so let's specify that entire expression as the input expression to `$arrayToObject`!

We'll re-use the `filter_keys` variable defined previously to save on typing. This is good practice when building up complex aggregations!

In [None]:
filter_with_object_conversion = {
    "$project": {
        "_id": 0,
        "attempt": {
            "$arrayToObject": filter_keys
        }
    }
}
m_print(my_coll.aggregate([filter_with_object_conversion]), justone=True)

Excellent, we're now ready to combine them together. Let's go ahead and assign our array cleaning operation to a variable called **array_cleaning**.

In [None]:
array_cleaning = {
    "$arrayToObject": filter_keys
}
# and to test for good measure
test_step = {
    "$project": {
        "_id": 0,
        "attempt": array_cleaning
    }
}
m_print(my_coll.aggregate([test_step]), justone=True)

Looks good! We're now ready to combine this portion together with our map stage.

Let's test this out in isolation, just as we've done with our work with `$arrayToObject` and `$filter`.

In [None]:
%%capture
my_coll.drop()
my_coll.insert_one({
  "user_id": 1,
  "offering": "M036/2017_November",
  "tab_map": {
    "_cls": "HgStudentAttemptableProblem",
    "id": "5a1c7e44f62672abf36e4ba3",
    "given_credit": False,
    "attempts": [
      {
        "_cls": "HgCheckAllThatApplyAttempt",
        "date": "2017-11-27 21:40:17",
        "git_stamp": {
          "date": "2017-11-27 21:40:11",
          "revision": "789c694b49dd6d7dfbb245af7ab71bf509b0ba38"
        },
        "indices": [
          1,
          0
        ]
      },
      {
        "_cls": "HgCheckAllThatApplyAttempt",
        "date": "2017-11-29 14:55:01",
        "git_stamp": {
          "date": "2017-11-29 14:54:52",
          "revision": "94556bc6f03ac374a48e4df79808b8b059c806e7"
        },
        "indices": [
          3,
          2
        ]
      }
    ]
  }
})

## Composing the Expressions

Now we should be able to compose our `$filter` with a `$map` to test the combined functionality. Let's summarize what we're going to do
- We'll test it using a `$project` stage, using `$map` to iterate over every element in the **attempts** array
  - Convert each element to an array with `objectToArray`, 
  - `$filter` out undesirable keys, 
  - Convert that back into an object with `$arrayToObject`, 
- Putting that result back into position. 

Lastly, we'll assign the entire computed value to a computed key called **submission**.

Prior to that, though, we need to make a small modification. The **filter_keys** expression we defined earlier currently is looking for the field path expression `$attempts`. Because we're iterating over the **attempts** array with `$map`, we'll redefine it to look for `$$this`, the currently iterated element in `$map`.

We'll assign the entire composition to a variabled called `combined_map_and_filter`. If it works, we'll be able to reuse this in our main pipeline.

In [None]:
filter_keys = {
    "$filter": {
        "input": {"$objectToArray": "$$this"},
        "cond": {
            "$in": [ "$$this.k", keys_to_keep ]
        }
    }
}
array_cleaning = {
    "$arrayToObject": filter_keys
}
combined_map_and_filter = {
    "$map": {
        "input": "$tab_map.attempts",
        "in": array_cleaning
    }
}

Recap with students:

- `filter_keys` accepts as its input `{$objectToArray: "$$this"}` because `$map` is iterating over the **tab_map.attempts** array. 
- It's safe to reference the element as `$$this` because until we drop into the context of 
`$filter`'s **in** block, `$$this` is scoped within the **in** block of `$map`.

Also remind students:

- if they are confused about referring to the input for  `$map` that the field path `$tab_map.attempts` will point directly to the **attempts** array in **tab_map**.

In [None]:
composing_project = {
    "$project": {
        "_id": 0,
        "user_id": 1,
        "offering": 1,
        "submission": combined_map_and_filter
    }
}
m_print(my_coll.aggregate([composing_project]), justone=True)

It looks like it works! Let's go ahead and and try it out with the pipeline we've assembled so far. Again we'll create a copy of the pipeline to test everything out before we assign it back.

We'll also ensure we assign to the correct keys.

In [None]:
testing_composition = pipeline[:]
shaping_submissions_stage = {
    "$project": {
        "user_id": 1,
        "offering": 1,
        "submission": {
            "problem_id": "$tab_map.id",
            "attempts": combined_map_and_filter
        }
    }
}
testing_composition.append(shaping_submissions_stage)
m_print(student_course.aggregate(testing_composition), justone=True)

Excellent, this we've reached our goal. Now every output document corresponds to one student's submissions for one problem in a course.

Let's ensure we capture this work.

In [None]:
pipeline.append(shaping_submissions_stage)

The next step is to group documents together that have equal **offering**s and **user_id**s as they were originally. This way, each document is a record of one student's performance in an entire course.

This is a common pattern in Aggregation, especially after an `$unwind`. The Aggregation Framework provides a stage for this, too.

Introducing `$group`

# Timing

You should now be between 3:00 to 3:30 into the presentation.

Suggested break.

## `$group`

![agg factory](assets/agg_factory.gif)

`$group` has the following form
```
"$group": {
    "_id": <expression>,
    "fieldN": { <accumulatorN>: <expressionN> },
    ...
}
```

**_id** is any expression or composition of expressions to match similar documents together on.

**fieldN** is an optional field, and as hinted at by the postfixed "N", can be any number of fields. 

Each field specified accepts an accumulator and expression to accumlate.

When we say that the **_id** field in `$group` can be any expression or composition of expressions to group similar documents together with, we really mean it.

Let's explore `$group` in isolation to get a sense for how powerful it is, as well as how accumulator expressions work and some very common patterns.

First, we'll insert 100 documents into our test collection. They'll be of the shape
```
{
    "_id": <assigned by mongod>,
    "value": 0..100
}
```

In [None]:
my_coll.drop()
my_coll.insert_many([{"value": n} for n in range(100)])
print(my_coll.count())
m_print(my_coll.find_one())

Alright, we have 100 documents inserted into our collection, with values ranging from 0 to 99.

First, let's `$group` similar documents together. We're also going to use a very common accumulator expression to count how many documents were grouped together, called `$sum`.

We're about to introduce control flow expressions as an exercise to explore accumulator expressions!

In [None]:
grouping_by_odd_or_even = {
    "$group": {
        "_id": {
            "$cond": [ 
                { "$eq": [{"$mod": [ "$value", 2 ] }, 0] }, 
                "even", 
                "odd" 
            ]
        },
        "count": { "$sum": 1 }
    }
}
m_print(my_coll.aggregate([grouping_by_odd_or_even]))

OK let's break this down.

Within the **_id** argument, we specified one of the _control flow_ expressions, `$cond`. 

It can be read as

**if** `{ "$eq": [{"$mod": [ "$value", 2 ] }, 0] },`

**then** `"even",`

**else** `"odd",`

The next portion is the accumulator expression block. We specified a field called `count`, but it could have been any name. Within, we use the accumulator expression `$sum`.

`"count": { "$sum": 1 }` can be thought of as the instruction "for every document that matches, **sum** 1 with the current value of **count**. So the first time it matched, it summed 1 and 0, then 1 and 1, then 1 and 2, and so on.

We would expect that both **count** fields in the output documents were 50, as there are 50 even and 50 odd numbers in the interval **[0, 99]**

Following is an almost identical example, however it won't use the `$cond` expression and will use the name **documents_matched** instead of count. What do you expect the output to be?

In [None]:
grouping_by_odd_or_even = {
    "$group": {
        "_id": { "$eq": [{"$mod": [ "$value", 2 ] }, 0] }, 
        "documents_matched": { "$sum": 1 }
    }
}
m_print(my_coll.aggregate([grouping_by_odd_or_even]))

Encourage students to answer before they run the code.

Because we didn't specify the `$cond`, we got the raw output from the expression, which is a boolean true/false. We can see that the **documents_matched** key is what we expected, 50.

Alright, let's have a little bit of fun.

We're going to solve a variation of **FizzBuzz** with aggregation.

If you aren't familiar, FizzBuzz is a programming problem where you take the interval [0, 99] or [1, 100] and do the following:
- If the number if divisible by 3, output "Fizz"
- If the number is divisible by 5, output "Buzz"
- If the number is divisible by 15, output "FizzBuzz"
- If the number doesn't meet one of the above criteria, output the number itself

We'll group on this criteria instead, and use an accumulator expression called `$push` to push values into an array.

To do this, we're going to use a `$switch` expression. We could just nest `$cond` expressions but the syntax starts to get a bit unweildy.

![haduken_code](assets/haduken_code.jpg)

In [None]:
fizz_buzz = {
    "$group": {
        "_id": {
            "$switch": {
                "branches": [
                    { "case": { "$eq": [{"$mod": [ "$value", 15 ] }, 0] }, "then": "FizzBuzz" },
                    { "case": { "$eq": [{"$mod": [ "$value", 5 ] }, 0] }, "then": "Buzz" },
                    { "case": { "$eq": [{"$mod": [ "$value", 3 ] }, 0] }, "then": "Fizz" }
                ],
                "default": "plain"
            }
        },
        "matching_nums": { "$push": "$value"}
    }
}
m_print(my_coll.aggregate([fizz_buzz]))

Alright, we see that documents where the value was divisible by 15 were matched and the **_id** resulted in "FizzBuzz", and the individual values were pushed into a field called **matching_nums**.

Same for "Fizz" and "Buzz", and because we need a common value to match other values together with, we chose to use "plain" if no branches in our `$switch` matched.

We can also `$group` all input documents together. To do so, we specify an expression or value that won't be enountered in any documents. By convention, we tend to use the value **0**.

Let's look at an example. We'll `$group` all of the documents together, and use some mathematical accumulator expressions to find the `$max`, `$min`, `$avg`, `$sum`, and `$stdDevPop`.

In [None]:
the_stats = {
    "$group": {
        "_id": 0,
        "max": { "$max": "$value" },
        "min": { "$min": "$value" },
        "avg": { "$avg": "$value" },
        "sum": { "$sum": "$value" },
        "sd": { "$stdDevPop": "$value" }
    }
}
m_print(my_coll.aggregate([the_stats]))

`$group` and accumulator expressions are very powerful and full of utility and will be used further in this workshop.

Ok, back to our pipeline. We have just the data we want, and we want to group documents together matching on the **offering** and **user_id**. Let's do it!

In [None]:
test_grouping = {
    "$group": {
        "_id": {
            "offering": "$offering",
            "user_id": "$user_id"
        }
    }
}
test_pipeline = pipeline[:]
test_pipeline.append(test_grouping)
m_print(student_course.aggregate(test_pipeline), justone=True)

Prior to running this cell, ask students what might be wrong.

With no accumulator, we're losing the data we've been cleaning.

Well, it kind of worked. But, we lost all of the **submission** data we so meticulously cleaned and derived.

Let's fix that by specifying a field to accumulate values in.

In [None]:
%%handle
group_by_offering_and_student = {
    "$group": {
        "_id": {
            "offering": "$offering",
            "user_id": "$user_id"
        },
        "submissions": {
            "$push": "$submission"
        }
    }
}
test_pipeline = pipeline[:]
test_pipeline.append(group_by_offering_and_student)
m_print(student_course.aggregate(test_pipeline), justone=True)

![uh_oh](assets/beaker_fire.jpg)

We may encounter errors like this once our pipeline becomes sufficiently complex.

The Aggregation Framework limits the operations in the pipeline to use 100MB of memory in order to avoid negatively impacting performance for other database operations.

This is typically why MongoDB recommends analytics be performed on a secondary.

In order to get around this, we'll do exactly what the error message suggest, allow the Aggregation Framework to use the disk!

In [None]:
group_by_offering_and_student = {
    "$group": {
        "_id": {
            "offering": "$offering",
            "user_id": "$user_id"
        },
        "submissions": {
            "$push": "$submission"
        }
    }
}
test_pipeline = pipeline[:]
test_pipeline.append(group_by_offering_and_student)
m_print(student_course.aggregate(test_pipeline, allowDiskUse=True), justone=True)

And it works! We have one last operation to perform before saving our transformed documents, namely extracting the **offering** and **user_id** fields from with the **_id** field that was assigned by `$group` to label our matches.

In [None]:
last_transforming_project = {
    "$project": {
        "_id": 0,
        "offering": "$_id.offering",
        "user_id": "$_id.user_id",
        "submissions": 1
    }
}
test_pipeline = pipeline[:]
test_pipeline.extend([group_by_offering_and_student, last_transforming_project])
m_print(student_course.aggregate(test_pipeline, allowDiskUse=True), justone=True)

Look good, let's save it to our pipeline.

In [None]:
# only run this once
pipeline.extend([group_by_offering_and_student, last_transforming_project])

Alright, we've come a long way in a short amount of time!

We've used a combination of expressions and different stages to transform our data into the shape we want. The last step would be to save our entire transformation using the `$out` expression.

## `$out`

`$out` is a simple expression that lets us save the results of an aggregation. It has the following form
```
"$out": <string> # the name of the collection you'd like to save the output to
```

If the name of an existing collection is specified, that collection will be overwritten but existing indexes will be preserved and rebuilt.

There's no need to add a `$out` to the pipeline you've built as it's already been done for you! The **courses_etl** collection was built using the transformation we've put together so far in this workshop.

Let's look at the pipeline to see what we've built.

In [None]:
m_print(pipeline)

We've covered a lot of ground so far, well done!

The next step is to correlate our transformed data with another collection, called **answers_etl** (don't worry, we've performed the transformation of this collection for you!), to determine whether the student got an answer correct.

We're now going to cover a few more extremely useful stages and expressions that were used to prepare the datasets for this workshop.

You can see the full aggregation pipelines used to transform the source collections in the `scratch` directory in your handout.

## `$lookup`

`$lookup` has two forms. In this workshop, we'll focus on the new form. It looks like this

```
"$lookup": {
    "from": <string>,
    "let": <expressions>,
    "pipeline: [<aggregation pipeline>],
    "as": <string>
}
```

This form of `$lookup` was introduced in MongoDB 3.6 and is **very** powerful. Here's an example of this form being used in one of the pipelines that was used to prepare the data for this workshop!

```
...
    {
        "$lookup": {
            "from": "answers_etl",
            "let": {"source": "$submissions.problem_id"},
            "pipeline": [
                {
                    "$match": {
                        "$expr": {"$eq": ["$problem_id", "$$source"]}
                    }
                },
                {
                    "$project": {
                        "_id": 0,
                        "answer": 1
                    }
                }
            ],
            "as": "answer"
        }
    },
...
```

Let's break this down.

- First, we specify the `from` collection as **answers_etl**, a prepared collection that has problem answers.
- In `let`, we bind the value of the current document's **$submission.problem_id** to the name **source**. We'll reference this name within the pipeline.
- In `pipeline`, we define an aggregation pipeline that executes in the context of the collection that we specified in `from`.
- This pipeline matches where the **problem_id** in documents in the **answer_etl** collection match the **$$source** value, the value we bound in `let`. It then projects away the **_id** and keeps the **answer** value.
- Lastly, we return the results and store them in a field called **answer**.

If this is moving too quickly for students, feel free to run through more examples than what we're about to do.

Let's run through a quick exercise. We'd like to use a `$lookup` to get a list of students who have taken the **most** MongoDB Certified Developer/DBA exams.

In [None]:
exams_by_student = [
    {
        "$project": {
            "_id": 0,
            "user_id": 1
        }
    },
    {
        "$lookup": {
            <your code here!!>
        }
    },
    {
        "$match": {
            "$expr": {
                "$gt": [{"$size": "$exams"}, 0]
            }
        }
    },
    {
        "$sortByCount": {"$size": "$exams"}
    }
]
m_print(student_course.aggregate(exams_by_student), justone=True)

This is how it should look like:


```json
    {
        "$lookup": {
            "from": "exam_results_etl",
            "let": {"source": "$user_id"},
            "pipeline": [
                {
                    "$match": {
                        "$expr": {
                            "$eq": ["$user_id", "$$source"]
                        }
                    }
                },
                {
                    "$project": {
                        "_id": 0,
                        "user_id": 1
                    }
                }
            ],
            "as": "exams"
        }
    }
```

Next, we will be covering ``$sortByCount``

You may be wondering what that `$sortByCount` stage is doing. We'll look at that momentarily! Let's focus on `$lookup`.

- The `$lookup` stage is "looking up" documents **from** the **exam_results** collection.
- Within `let`, it is binding the value of its own **user_id** field to the name **source**
- The pipeline executes in the context of the **exam_results** collection. The `$match` stage is filtering out documents where **user_id** in the document doesn't match the value of **$source**. It then uses a `$project` stage to project away every field except for the **user_id**. This is done so that the documents returned to the `as` field are very tiny.
- `as` tells `$lookup` to collect matching documents into a field called **as** in the current document being iterated over in the **student_course** collection.

Ok, that `$sortByCount` stage we saw. It's a useful utility stage that does grouping and sorting in one shot, very handy when you want to group documents together just to get a count on some matching criteria.

`"$sortByCount": <expression>`

is the same as

```
{
    "$group": {
        "_id": <expression>,
        "count": { "$sum": 1 }
    }
},
{
    "$sort": {
        "count": -1
    }
}
```


`$sort` is a stage that does exactly what the cursor method `find().sort()` does.

## Exercise - `$lookup`

Let's have a little bit of fun.

Within the **user_profile** collection there is a field called **devlang** where a user has stated their development langauge.

Of poeple who have taken the **offering** *M103/2018_April* (found in the **student_course** collection), what is the most popular **devlang**?

In [None]:
only_m103_april = {
    "$match": {
        "offering": "M103/2018_April"
    }
}
just_user_id = {
    "$project": {
        "_id": 0,
        "user_id": 1
    }
}
unwind_devlangs = {
    "$unwind": "$devlangs"
}
sort_devlangs = {
    "$sortByCount": "$devlangs"
}

In [None]:
lookup = {
    "$lookup": {
        <your code here!!>
    }
}


m_print(
    student_course.aggregate(
        [only_m103_april, just_user_id, lookup, unwind_devlangs, sort_devlangs]
    )
)

The answer is Java with 389.

```js
lookup = {
    "$lookup": {
        "from": "user_profile",
        "let": {"source": "$user_id"},
        "pipeline": [
            {
                "$match": {
                    "$expr": {
                        "$eq": ["$user_id", "$$source"]
                    }
                }
            },
            {
                "$project": {
                    "_id": 0,
                    "devlang": 1
                }
            }
        ],
        "as": "devlangs"
    }
}
```

## Solution - `$lookup`

In [None]:
%load fragments/lookup_exercise.py

Another useful expression when dealing with arrays is `$reduce`. Let's take a look at it.

```
"$reduce": {
    input: <expression that results in an array>,
    initialValue: <expression>,
    in: <expression>
}
```

Like `$map` and `$filter`, **input** and **in** function the same. Where `$reduce` differs is the **initialValue**, that can be anything we want.

Here is an example where we used `$reduce` to correlate answers with student submissions. We reduced the array to a boolean value!

```
"$reduce": {
    "input": "$$this.indices",
    "initialValue": True,
    "in": {
        "$cond": [
            { "$eq": [ {"$arrayElemAt": [ "$answer", "$$this"] }, 1 ] },
            { "$and": [ "$$value", True] },
            { "$and": [ "$$value", False] }
        ]
    }
}
```

Within `$reduce`'s **in** block, we refer to the **`initialValue`** as **`$$value`**. 

As `$reduce` iterates over the array elements, it returns the result of the **in** expression block back to **$$value**. 

When it finishes iterating, it returns the result of whatever **`$$value`** is. We call **`$$value`** the **accumulator**.

Here, we check each value in some **$answer** array. If it is equal to 1, the answer choice at that index is correct (0 means false).

If it is, we return the current **`$$value`** and **True**, otherwise **`$$value`** and **False**.

This has the effect that even though we start with True, if False is ever returned to the **accumulator**, it can **never** switch back to True.

`$reduce` is a powerful expression. In fact, `$map` and `$filter` are expressible as a `$reduce`.

Let's experiment with it to develop a sense.

In [None]:
%%capture
my_coll.drop()
my_coll.insert_many([
    {
        "choices": [0, 1, 2, 3],
        "answer": [1, 1, 0, 1]
    },
    {
        "choices": [0, 1, 3],
        "answer": [1, 1, 0, 1]
    }
])

Now, let's `$reduce` over **indices**, comparing each entry to index position in the **answer** array. If it is, it will assign True to the **accumulator**, otherwise False.

In [None]:
is_correct = {
    "$addFields": {
        "correct": {
            "$reduce": {
                "input": "$choices",
                "initialValue": True,
                "in": {
                    "$and": ["$$value", {"$eq": [ { "$arrayElemAt": ["$answer", "$$this"] }, 1 ]}]
                }
            }
        }
    }
}
m_print(my_coll.aggregate([is_correct]))

Your turn. Use `$reduce` to get the sum of values in each **choices** array. Yes, you could use the `$sum` operator to do this, and should when not learning.

In [None]:
choices_sum = [
    {
        "$addFields": {
            "sum_of_choices": {
                "$reduce": <your code here!!>
            }
        }
    }
]
m_print(my_coll.aggregate(choices_sum))

In [None]:
# students should get 6 for the first document, and 4 for the second
choices_sum = [
    {
        "$addFields": {
            "sum_of_choices": {
                "$reduce": {
                    "input": "$choices",
                    "initialValue": 0,
                    "in": {
                        "$add": ["$$value", "$$this"]
                    }
                }
            }
        }
    }
]
m_print(my_coll.aggregate(choices_sum))

Lastly, let's look at **set** expressions.

Set expressions are useful when dealing with arrays when you want to find overlap, remove duplicates, and so on.

Let's look at a few examples. We'll insert some test data.

In [None]:
%%capture
my_coll.drop()
my_coll.insert_one(
    {
        "values": [1, 1, 2, 3, 4, 4, 4, 5, 5, 6, 7, 7, 7],
        "evens": [0, 2, 4, 6, 8],
        "odds": [1, 3, 5, 7, 9]
    }
)

Let's remove duplicates from the **values** field.

In [None]:
remove_dups = [{
    "$addFields": {
        "values": { "$setUnion": ["$values", [] ] }
    }
}]
m_print(my_coll.aggregate(remove_dups))

We've used the `$setUnion` expression to express a union between the **values** array and an empty array. 

Because it's a set operation, it implicitly removes duplicates.


Now, let's find the intersection between odds and evens. We expect there to be no intersection!

In [None]:
intersect_odds_and_evens = [{
    "$addFields": {
        "hopefully_empty": {
            "$setIntersection": ["$odds", "$evens"]
        }
    }
}]
m_print(my_coll.aggregate(intersect_odds_and_evens))

Let's look at one last example before moving on. What if we want to remove a single value we know might exist from an array?

**$setDifference** is a great operator for this.

In [None]:
remove_4_from_values = [{
    "$addFields": {
        "no_fours": {
            "$setDifference": ["$values", [4]]
        }
    }
}]
print(my_coll.aggregate(remove_4_from_values), justone=True)

## `$bucket`

Let's get introduced to two more useful stages that perform a very similar task.

`$bucket` is similar to the `$group` stage in a way, because they group similar documents together based on some value.

`$bucket` allows us to pre-define boundaries. Let's look at an example of `$bucket` in action. Let's get the distribution of student problem submissions by the `$dayOfWeek`.

In [None]:
bucketing = {
    "$bucket": {
        "groupBy": { "$dayOfWeek": "$updated_at"},
        "boundaries": [1, 2, 3, 4, 5, 6, 7],
        "default": "uncaught range",
        "output": {
            "count": { "$sum": 1 }
        }
    }
}

Pause and ask attendees what they think the output will be.

They may be surprised that we get output in "uncaught range". This is because we didn't define an upper bound of 8.

The `$dayOfWeek` expression gives us the day of the week from an ISOString or a Date object. 1 is Sunday,  7 is Saturday.

`groupBy` is similar to the `_id` block in `$group`.

`boundaries` are boundaries we define for bucket ranges. Each boundary is the inclusive lower bound of that bucket, and the next boundary is the exlusive upper bound. In order for a full bucket to be defined there must be a lower and upper bound.

`default` is for documents that don't fall into a defined bucket boundary. If `default` isn't specified, `$bucket` will error if any document doesn't fall into a bucket boundary.

`output` is a block where we can specify accumulator expressions, just like `$group`.

In [None]:
m_print(student_course.aggregate([bucketing]))

When defining bucket boundaries, we have to specify both the lower and upper bounds. From the output we can see that submissions on Saturdays were caught in the **"uncaught range"** bucket. Let's fix that.

In [None]:
bucketing = {
    "$bucket": {
        "groupBy": { "$dayOfWeek": "$updated_at"},
        "boundaries": [1, 2, 3, 4, 5, 6, 7, 8],
        "default": "uncaught range",
        "output": {
            "count": { "$sum": 1 }
        }
    }
}
m_print(student_course.aggregate([bucketing]))

Excellent, we've now caught all encountered values in a bucket.

Earlier in the workshop we had a bit of fun showing the distribution of problem submissions on Tuesdays and promised we'd reveal our methods later. This is how.

Let's have a bit more fun with this. We'll graph out the distribution of problem submissions by day and by hour. We can see the majority of submissions are on Mondays, so maybe most MongoDB University students don't wait until the last moment.

Let's think about how we'd want to do this. We could just issue multiple queries to the database, something like match where the submission is a Monday and then use `$bucket` to get the distribution by hour.

This approach would indeed solve the task. However, there's a more efficient way.

Introducing `$facet`...

## `$facet`

`$facet` is a very powerful stage and perhaps one of the most misunderstood. `$facet` allows us to execute multiple pipelines within a single aggregation.

This is very useful if we want to perform more than one transformation of our data but each transformation alone would prevent the other. It's also useful when we want to get multiple different metrics. For example, an ecommerce site might use facet to return results based on review rating and price. This takes a lot of the heavy lifting off of the UI and allows the UI to focus on what it should, giving end users a great experience.

So, rather than issue multiple different queries, let's issue one `$facet` aggregation that performs the sub-aggregations. In each sub-aggregation we'll `$match` on the `$dayOfWeek`, then use `$bucket` to get information by the hour.

We'll use functions to build out our query, otherwise it would be pretty tedious.

In [None]:
def generate_sub_agg(dayOfWeek):
    return [
        {
            "$match": {
                "$expr": { "$eq": [ {"$dayOfWeek": "$updated_at" }, dayOfWeek] }
            }
        },
        {
            "$bucket": {
                "groupBy": { "$hour": "$updated_at" },
                "boundaries": [x for x in range(25)],
                "output": {
                    "count": { "$sum": 1 }
                }
            }
        }
    ]

Ok, we've defined a function to generate a sub-aggregation to use within a `$facet`. It accepts a **dayOfWeek** that we can pass in, uses `$match` to filter out documents that aren't that day, then a `$bucket` stage. We use a list comprehension to generate our boundaries, 0-24. This is simply to reduce typing.

Let's build our complete pipeline.

In [None]:
first_facet = [
    {
        "$facet": {
            "sunday": generate_sub_agg(1),
            "monday": generate_sub_agg(2),
            "tuesday": generate_sub_agg(3),
            "wednesday": generate_sub_agg(4),
            "thursday": generate_sub_agg(5),
            "friday": generate_sub_agg(6),
            "saturday": generate_sub_agg(7)
        }
    }
]

`$facet` itself does not have a lot of ceremony in form like other stages. We specify the stage, `$facet`, and then specify the pipeline names. These names will be the key used to assign the value of the sub-aggregation.

Let's see it in action.

In [None]:
m_print(student_course.aggregate(first_facet), justone=True)

And now we have a breakdown, by hour, of student submissions. Let's visualize the results. The hardest part now is selecting a color scheme...

In [None]:
# dependencies
import pandas as pd

# rather than print out the results, store it to a variable
by_hour_breakdown = list(student_course.aggregate(first_facet))[0]

In [None]:
"""
we're creating a 3d dataframe, remapping every day entry to the key "day", and a value of "stats"
each "stats" value is itself a dataframe, where the _id has been mapped to "hour" and the count is
remapped to "count"
This is all for display
"""

data =[{"day": k, "stats": pd.DataFrame(list(map(lambda x: {"hour": x["_id"], "count": x["count"]}, v)))} for (k, v) in by_hour_breakdown.items()]
df = pd.DataFrame(data)

In [None]:
"""
Plotting the data. We use a for loop to create a subplot for every "day" entry.
The rest is formatting
"""
fig, axs = plt.subplots(ncols=3, nrows=3,figsize=(16, 16))
days = df['day'].tolist()
x = 0
for day in df['day']:
    bp = sns.barplot(
        x="hour", 
        y="count", 
        data=df['stats'][days.index(day)], 
        ax=axs[x//3, x%3]
    ).set_title(day)
    axes = bp.axes
    axes.set_ylim(0,1200)
    x+=1

Hmm... it looks like there is an outlier on Monday. Perhaps we ran a script to grant credit to users for a problem at that time, heavily skewing the **updated_at** value that produced this bucket.

Either way, very interesting to look at the usage pattern of our users!

There is another bucketing stage, `$bucketAuto`. `$bucketAuto` automatically deterimines bucket boundaries, attempting to evenly distribute results.

`$bucketAuto` has the following form
```
"$bucketAuto": {
    "groupBy": <expression>,
    "buckets": <number>,
    "output": {
        fieldN: { <accumulator expression },
        ...
    }
}
```

Like `$bucket`, **groupBy** and **output** are the same. Again, instead of defining boundaries, **buckets** within **bucketAuto** takes a number and will attempt to distribute documents into that many buckets.

Let's experiment. Let's aggregate over the **correlated_answers** collection, using `$bucketAuto` to get an idea of the distribution of the **num_attempts** each problem has.

In [None]:
bucket_auto = [
    {
        "$unwind": "$submissions"
    },
    {
        "$bucketAuto": {
            "groupBy": "$submissions.num_attempts",
            "buckets": 5,
            "output": {
                "count": { "$sum": 1 }
            }
        }
    }
]
m_print(mdbw.correlated_answers.aggregate(bucket_auto, allowDiskUse=True))

Very interesting. The vast majority of attempted problems fall into the less than 2 bucket, with another large portion falling into the less than 5 attempts bucket. The rest range from 5 attempts all the way up to 83.

We cringe thinking about the frustration that user probably had.

`$bucketAuto` was unable to give us 5 buckets because there wasn't enough distribution in our data.

There's one more optional field that can be specified to `$bucketAuto`, **granularity**. We won't delve into it in this workshop, but if you are interested you can [refer to the documentation](https://docs.mongodb.com/manual/reference/operator/aggregation/bucketAuto/#pipe._S_bucketAuto)

## Exercise - `$facet`

In this exercise, we're to find **user_id**s that are enrolled in every shortform offering on MongoDB University for the month of April.

Your job will be fill out the `$facet` portion of the aggregation.

In [None]:
offerings = [
    "M001/2018_April", "M121/2018_April", "M103/2018_April", 
    "M201/2018_April", "M310/2018_April", "M312/2018_April"
]
def pipeline_builder(offering):
    return [
        {
            "$match": {
                "_id.offering": offering
            }
        },
        {
            "$project": {
                "_id": 0,
                "user_id": "$_id.user_id"
            }
        }
    ]

def after_projection():
    projection = {}
    for offering in offerings:
        # this will strip the offering to just the course code
        # i.e. M001/2018_April will become M001
        projection[offering[:4]] = f"${offering[:4]}.user_id"
    return projection

In [None]:
subpipelines = {}
for offering in offerings:
    subpipelines[offering[:4]] = pipeline_builder(offering)
pipeline = [
    {
        "$facet": subpipelines
    },
    {
        "$project": after_projection()
    },
    {
        "$project": {
            "concurrent_students": {
                "$setIntersection": [f"${offering[:4]}" for offering in offerings]
            }
        }
    }
    
]
m_print(mdbw.correlated_answers.aggregate(pipeline))


Now roughly 5 to 5.5 hours in.

# Summary

Congratulations, you've made it! We've covered a lot of ground in a short amount of time.

Nothing reinforces learning like using newly acquired knowledge. It's time to jump into the Capstone exercises.

## Capstones

- [Capstone 1 - Problem problems?](http://localhost:8888/notebooks/capstone1.ipynb)
- [Capstone 2 - Cliff Problems](http://localhost:8888/notebooks/capstone2.ipynb)