# Introducing Aggregation Pipelines

This Notebook is about getting to know MongoDB's analysis pipeline as applied to the `accidents` dataset.

In a previous notebook, you saw how the memory footprint of a MongoDB results cursor remained constant, irrespective of how many query result items might be associated with the cursor.

We also mentioned how in many data processing systems it may be preferable to try to run computations over large datasets as close to the data as we can, rather than having to consume bandwidth and local memory when analysing the data.

In this notebook, you will learn how we can make use of MongoDB aggregation pipelines to process data through cursors rather than downloaded large amounts of data and processing it via *pandas* dataframes.

Pipelines allow you to define a sequence of steps that can be used to query a Mongo database and process the records that are returned.

The pipeline may have multiple steps. The same operator may be used in multiple steps.

As ever, load in some required packages:

In [None]:
# Standard imports
import pandas as pd

import seaborn as sns

## Setting up the document database 

In the notebooks for parts 14, 15 and 16, you will be using a document database to manage data. As with the relational database you looked at in previous sections, the data in the database is *persistent*. The document database, MongoDB, is described as "NoSQL" to reflect that it does not use the tabular format of the relational database to store data. However, many of properties of a formal RDBMS apply to MongoDB, including the need to connect to the database server.

As with PostgreSQL, the MongoDB database server runs independently from the Jupyter notebook server. To interact with it, you need to set up an explicit connection.

### Setting your database credentials

In order to work with a database, we need to create a *connection* to the database. A connection allows us to manipulate the database, and query its contents (depending on what usage rights you have been granted). For the SQL notebooks in TM351, the details of your connection will depend upon whether you are using the OU-hosted server, accessed via [tm351.open.ac.uk](https:tm351.open.ac.uk), or whether you are using a version hosted on your own computer, which you should have set up using either Vagrant or Docker.

To set up the connection, you need a login name and a pasword. we will use the variables `DB_USER` and `DB_PWD` to hold the user name and password respectively that you will use to connect to the database. Run the appropriate cell to set your credentials in the following cells.

#### Connecting to the database on [tm351.open.ac.uk](https:tm351.open.ac.uk)

If you are using the Open University hosted server, you should execute the following cell, using your OUCU as the value of `DB_USER`, and the password you were given at the beginning of the module. Note that if the cell is in RAW NBconvert style, you will need to change its type to Code in order to execute it.

The variables `DB_USER` and `DB_PWD` are strings, and so you need to put them in quotes.

In this case, note that the connection string contains an additional option at the end: `?authsource=user-data`. For the MongoDB setup that we are using here, this option tells Mongo where to look for the authentication database.

#### Connecting to the database on a locally hosted machine

If you are running the Jupyter server on your own machine, via Docker or Vagrant, you should execute the following cell. Note that if the cell is in RAW NBconvert style, you will need to change its type to Code in order to execute it.

Note that the locally hosted versions of the environment give you full administrator rights, which is why you do not need to specify a user name or password. Obviously, this would not generally not be granted on a multi-user database, unless you are the database administrator.

### Connecting to the database

We can now set up a connection to the database. As with PostgreSQL, we use a connection string:

In [None]:
print(MONGO_CONNECTION_STRING)

The connection string is made up of several parts:

- `mongodb` : tells `pymongo` that we will use MongoDB as our database engine
- Your user name and (character escaped) password, separated by a colon if you are using the remote server. If you are using a local server, you will be logged on as an adminstrator, and do not need to specify a name or password.
- `localhost:27017` : the port on which the database engine is listening.
- A reference to the authentication file (`?authsource=user-data`), if you are using the remote server.

We now connect to the database with a `pymongo.MongoClient` object.

In [None]:
from pymongo import MongoClient

In [None]:
mongo_client=MongoClient(MONGO_CONNECTION_STRING)

You should now be connected to the MongoDB database server.

## The accidents database

The accidents database takes a long time to set up, so we have already imported it into a MongoDB database so that you can work with it. Note that on the remote VCE, the database is read-only, so you will not be able to alter its contents, although you can copy the contents into your own database space as discussed in the previous MongoDB notebooks, and alter that.

The cells in the earlier section, Setting up the document database, put the name of the accidents database into the variable `ACCIDENTS_DB_NAME`. Use this value to set up the connection to the `accidents` database and collections within it:

In [None]:
accidents_db=mongo_client[ACCIDENTS_DB_NAME]

We can look at the names of the collections in the database:

In [None]:
accidents_db.list_collection_names()

We will introduce some of the different collections in the rest of the materials, but let's start with the `accidents` collection:

In [None]:
accidents_collection=accidents_db['accidents']

This collection contains information on individual accidents. We can see how many examples it contains with the `.count_documents()` method:

In [None]:
accidents_collection.count_documents({})

We will also specify the `labels` collection:

In [None]:
labels=accidents_db['labels']

We'll be plotting some charts, so increase the default plot size to make things easier to read:

In [None]:
# Set a larger plot size than the default
sns.set(rc={'figure.figsize':(11.7,8.27)})

## Selecting items using an aggregation pipeline

First, an example of using an aggregation pipeline to get you started.

Let's just check in advance how many accidents there are in the database for the Milton Keynes Highway Authority area (`E06000042`):

In [None]:
accidents_collection.count_documents({'Local_Authority_(Highway)': 'E06000042'})

Aggregation pipelines are constructed from an sequence of pipleine operations. These operations may include, but are not limited to, selection operations, projections and grouping operations.

We can create a pipeline to find a set of items using a single `$match` pipeline stage:

```python
# Define a pipeline stage
select_stage = {'$match': {'Local_Authority_(Highway)': 'E06000042'}}

# The pipeline is a list of stages
pipeline = [select_stage]

# Run a collection through a pipeline
accidents.aggregate(pipeline)
```

The pipeline itself is defined as a list which is then evaluated via the `.aggregate()` operation applied to a collection:

In [None]:
pipeline = [{'$match': {'Local_Authority_(Highway)': 'E06000042'}}]

# Show totals for each speed.
mk_accidents = pd.DataFrame(accidents_collection.aggregate(pipeline))
mk_accidents.head()

Check the record count — it should match the previously reported value:

In [None]:
len(mk_accidents)

In a simple query, we can create compounded queries in a `.find()` statement by just adding more fields to the initial selection dictionary. We can do the same in a pipeline.

For example, let's check out some weather types:

In [None]:
labels.find_one({'label': 'Weather_Conditions'})['codes']

How many Milton Keynes accidents were there in high winds?

In [None]:
accidents_collection.count_documents({'Local_Authority_(Highway)': 'E06000042',
                           'Weather_Conditions': {"$in": [4, 5, 6]}})

We can use a similar approach with a pipeline:

In [None]:
pipeline = [{'$match': {'Local_Authority_(Highway)': 'E06000042',
                        'Weather_Conditions': {"$in": [4, 5, 6]}}}]

# Show totals for each speed.
mk_high_winds = pd.DataFrame(accidents_collection.aggregate(pipeline))
len(mk_high_winds)

### Limiting the Amount of Data Flowing Through A Pipeline

When testing a pipeline step, sometimes all we need is a single record, or a low number of records, to test it.

To limit the data flowing into a pipeline, you can use a step of the form:

`_limit1 = {'$sample': {'size': 1}}`

and include it as the first step in the pipeline.

If you need more records to test your pipeline step, increase the sample `size`.

You can use a similar trick as the last step in a pipeline to limit the amount of data emitted from the pipeline.

In [None]:
pipeline = [{'$match': {'Local_Authority_(Highway)': 'E06000042',
                        'Weather_Conditions': {"$in": [4, 5, 6]}}},
            {'$sample': {'size': 3}}]

pd.DataFrame(accidents_collection.aggregate(pipeline))

### Projections in the Pipeline

As well as limiting the "length" of data returned from the pipeline (that is, the number of items returned), we can also limit the "width" by projecting just the fields we are interested in.

The `$project` operator allows us to define a projection determining fields are presented in records returned from the pipeline:

In [None]:
pipeline = [{'$match': {'Local_Authority_(Highway)': 'E06000042',
                        'Weather_Conditions': {"$in": [4, 5, 6]}}},
            
            {'$project': {'Weather_Conditions':1, 'Speed_limit':1,
                          'Accident_Severity':1, 'Number_of_Vehicles':1,
                          '_id':0 }},
            
            {'$sample': {'size': 3}}]

pd.DataFrame(accidents_collection.aggregate(pipeline))

## The `$unwind` Aggregation Operator

If we inspect a single document from the `accidents` collection, we notice that certain fields, such as `Casualties` and `Vehicles` are not simple `attribute:value` pairs but instead may contain a list of dictionaries: 

In [None]:
accidents_collection.find_one({ 'Number_of_Casualties': {'$eq':2}}, {'_id':1, 'Casualties':1})

You may recall that the *pandas* `.explode()` function will "unravel" a list within a particular column to create new rows, one per list item with the other column values retained.

For example, given the following dataframe:

In [None]:
df = pd.DataFrame({"items": ["item1", "item2"],
                    "listed": [["item1-element1", "item1-element2"],
                              ["item1-element1", "item1-element2"]]})
df

We can explode the *listed* column:

In [None]:
df.explode(column='listed')

The `$unwind` aggregation operator ([docs](https://docs.mongodb.com/manual/reference/operator/aggregation/unwind/))  performs a similar operation to "unwind", "unpack" or "expand" an array list in a document to create a new set of documents, each of which contains *one* of the list elements along with the other items from the same original document.

Let's see how it works using a simple example.

To keep the example really simple, we'll limit the aggregation pipeline as follows:

- retrieve just records containing two casualties using the `{'$match': <expr>}` operator: `{'$match': {'Number_of_Casualties': {'$eq':2}}}`;
- sample just *two* of those records using the `{'$match': {'$size': INT}}` operator: `{'$sample': {'size': 2}}`;

We'll then use the `$unwind` operator to unwind the casualties, and a `$project` operator to limit the scope of the fields we return in the final result.

In [None]:
pipeline = [{'$match': {'Number_of_Casualties': {'$eq':2}}},
            {'$sample': {'size': 2}},
            {'$unwind': '$Casualties'},
            {'$project': {'_id':0, 'Accident_Index':1, 'Casualties':1}}]

unwound_casulaties = list(accidents_collection.aggregate(pipeline))
unwound_casulaties

If we normalise those results to a *pandas* dataframe, we get one row per casualty:

In [None]:
pd.json_normalize(unwound_casulaties)

## What next?

If you are working through this Notebook as part of an inline exercise, return to the module materials now.

If you are working through this set of Notebooks as a whole, move on to `15.4 Grouping and summarising operations in aggregation pipelines`.