# Week 10: Aggregations and MapReduce

# Today

- Professional Requirements
- Small useful skills
- MapReduce
- Aggregations in MongoDB

# Announcements, Updates


# Last Week's Lab

- Questions about Mongo?

## Review

## Library and Data Curation Skills

*From job analysis of Earth Science data managers (under preparation; Cowan, Collier, Bishop, Mayernik, Organisciak):*
    
Please indicate any programming languages and/or scripting that you use in your position (check all that apply)

Markup Languages (e.g., HTML, XML)  34

Python 26

R 24

Other 18

JavaScript 13

Total Selections 115

---
Other: LAMP, API, ETL, PERL, SQL (2), GRASS GIS, Visual FoxPro (2), C, C#, PHP (2), BASH shell (2), Java (4), MATLAB

*From job analysis of Earth Science data managers (under preparation; Cowan, Collier, Bishop, Mayernik, Organisciak):*

Please indicate any software and tools you use in your position (check all that apply)

Jupyter Notebook 23

RStudio 22

Any GIS 21

NetCDF 17

Other 14

Scrum 8

Apache Spark 4

Apache Hadoop 3

Total Selections 112

---
Other: Eclipse (2), IDLE (2), Notepad ++ (2), GIT (2), Excel (2), Confluence (2), Kibana, Oracle, FreeFileSync, GeoMapApp, PostgreSQL, Panoply, ToolsUI, Linux shell, Google Docs, Zoom, RazorSQL, GitHub, JIRA, VisualFoxPro


*From Nickoal Eichmann-Kalwara: [Digital Scholarship Activities at CU Boulder:Preliminary Results of a Campus Survey (2019)](https://osf.io/vpdhc/)*

![](https://github.com/organisciak/Scripting-Course/blob/master/images/questions1.png?raw=true)

![](https://github.com/organisciak/Scripting-Course/blob/master/images/questions2.png?raw=true)

### Job posting: Metadata Services Manager

May 27

![](https://github.com/organisciak/Scripting-Course/blob/master/images/job1.png?raw=true)

# Toy Box of Useful Skills

## File Paths

`.` - Current folder

`..` - Parent folder

`folder1/folder2` - Directory named `folder2`, within a `folder1` directory, within the current directory

`../data` - Go up one directory, then into the 'data' folder

## `groupby(...).apply()`

In [10]:
import pandas as pd
example = pd.DataFrame({'A': 'a a b'.split(), 'B': [1,2,3], 'C': [4,6, 5]})
example

Unnamed: 0,A,B,C
0,a,1,4
1,a,2,6
2,b,3,5


In [15]:
def difference(x):
    return x['C'].max() - x['B'].min()

example.groupby('A').apply(difference)

A
a    5
b    2
dtype: int64

In [4]:
# Example from a project - if two person's names need to be split (i.e. on the second space)

def split_multiple_names(x):
    names = x.split(' ')
    
    full_names = []
    while len(names) > 0:
        if len(names) == 3:
            name = " ".join(names)
            names = []
        else:
            name = " ".join(names[:2])
            names = names[2:]
        full_names.append(name)
    return full_names
    
split_multiple_names('Jada Pinkett Smith')

['Jada Pinkett Smith']

In [5]:
split_multiple_names('Will Smith Jada Pinkett Smith')

['Will Smith', 'Jada Pinkett Smith']

## JSON Auto-indent in Jupyter / Colab

Add a line break:
    - after open braces
    - after commas
    - before and after close braces

In [6]:
{ "test": {
    "key1": "value",
    "key2": "value" }
}

{'test': {'key1': 'value', 'key2': 'value'}}

## Geographic Queries in MongoDB

In [7]:
from pymongo import MongoClient

# Loading my credentials from a file
with open('credentials.txt', mode='r') as f:
    user, mongopw, cluster_url = [l.strip() for l in f.readlines()]

client = MongoClient("mongodb+srv://{}:{}@{}/test?retryWrites=true&w=majority".format(user, mongopw, cluster_url))
db = client.week10

Create a collection called 'fastfood', and tell Mongo that the 'geometry' key of each document is geographic:

In [8]:
db.fastfood.create_index([('geometry', '2dsphere')])

'geometry_2dsphere'

Import data of fast food restaurants. (Data not posted publicly - you can download from files section of Canvas)

*Thanks Jennings Anderson, PhD student in the EPIC lab, CU Boulder*

In [11]:
import json
with open('../data/fastfood.geojson', encoding='utf-8') as f:
    jsondata = json.load(f)
db.fastfood.insert_many(jsondata)

<pymongo.results.InsertManyResult at 0x7f197b2f2680>

In [None]:
# Delete all documents if needed
db.fastfood.delete_many({})

In [12]:
db.fastfood.estimated_document_count()

42073

Here's what the data looks like:

In [13]:
db.fastfood.find_one({})

{'_id': ObjectId('6227cf5055f4393b5c0eb981'),
 'type': 'Feature',
 'properties': {'@id': 'relation/1059567',
  'amenity': 'fast_food',
  'building': 'yes',
  'cuisine': 'american',
  'name': 'Sonic Drive-In',
  'type': 'multipolygon'},
 'geometry': {'type': 'Polygon',
  'coordinates': [[[-96.1059697, 35.9987409],
    [-96.1059488, 35.9982857],
    [-96.1059014, 35.9982873],
    [-96.1059078, 35.9984123],
    [-96.1058381, 35.9984147],
    [-96.1058317, 35.9982896],
    [-96.1057768, 35.9982915],
    [-96.1057978, 35.9987468],
    [-96.1058574, 35.9987447],
    [-96.1058517, 35.9986359],
    [-96.1059198, 35.9986336],
    [-96.1059255, 35.9987424],
    [-96.1059697, 35.9987409]]]},
 'id': 'relation/1059567'}

To find restaurants near me, create a point according to the GeoJSON schema:

In [14]:
my_location =  { 'type' : "Point" ,
                 'coordinates' : [-104.961896, 39.676617] 
                }

In [15]:
query = {
    'geometry': {
        '$near': {
            '$geometry' : my_location
        }
    }
}
results = db.fastfood.find(query).limit(10)
list(results)

[{'_id': ObjectId('6227cf5055f4393b5c0f121c'),
  'type': 'Feature',
  'properties': {'@id': 'node/631845194',
   'amenity': 'fast_food',
   'name': "Ben and Jerry's Ice Cream"},
  'geometry': {'type': 'Point', 'coordinates': [-104.9597847, 39.6785848]},
  'id': 'node/631845194'},
 {'_id': ObjectId('6227cf5055f4393b5c0f1238'),
  'type': 'Feature',
  'properties': {'@id': 'node/638620276',
   'addr:housenumber': '2081',
   'addr:street': 'South University Boulevard',
   'amenity': 'fast_food',
   'name': "Mustard's Last Stand"},
  'geometry': {'type': 'Point', 'coordinates': [-104.9596249, 39.6789283]},
  'id': 'node/638620276'},
 {'_id': ObjectId('6227cf5055f4393b5c0f1733'),
  'type': 'Feature',
  'properties': {'@id': 'node/976518442',
   'amenity': 'fast_food',
   'description': 'sandwich shop',
   'name': "Jimmy John's"},
  'geometry': {'type': 'Point', 'coordinates': [-104.959803, 39.6790585]},
  'id': 'node/976518442'},
 {'_id': ObjectId('6227cf5055f4393b5c0f10eb'),
  'type': 'Feat

See Jennings Anderson's website for [examples of web tools built with Mongo and Geospatial data](https://www.townsendjennings.com/maps/)

![Screenshot of map tools](https://github.com/organisciak/Scripting-Course/blob/master/images/jennings-maps.png?raw=true)

## Geographic data in Pandas

There's a spin-off from Pandas called `GeoPandas`.

[Tutorial: Getting Started on Geospatial Analysis with Python, GeoJSON and GeoPandas](https://www.twilio.com/blog/2017/08/geospatial-analysis-python-geojson-geopandas.html)

# MapReduce

MapReduce is a framework from processing really large datasets, distributed across multiple threads, processes, or machines.

Two parts:

*Map*: Split the input into segments, to do something on it.

*Reduce*: Simplify the individually processed segments into one output.

(reduce is not simply 'combine' as we saw with SAC)

### Archetypal Example: Counting Words

![MapReduce Example](https://github.com/organisciak/Scripting-Course/blob/master/images/mapreduce_example2.png?raw=true)

*via http://hci.stanford.edu/courses/cs448g/a2/files/map_reduce_tutorial.pdf*

Some examples to consider:
    
- *Sorting*: How would we sort a *reeaaally* big list? e.g. Sort everything on Amazon by price.
- *Searching*: How do we determine how much a word shows up in 1 billion web pages?

MapReduce generally works by simplifying the problem to key-value pairs. 

MongoDB Example, via https://docs.mongodb.com/manual/tutorial/map-reduce-examples/: **Return the Total Price Per Customer**

Data structure:
```
{
     cust_id: "abc123",
     ord_date: new Date("Oct 04, 2012"),
     status: 'A',
     price: 25,
     items: [ { sku: "mmm", qty: 5, price: 2.5 },
              { sku: "nnn", qty: 5, price: 2.5 } ]
}
```

- Map step: return key-value pair of `cust_id: price`
- Reduce step: sum all the values for alike keys

**Calculate Average Quantity Per Item**

Data structure:
```
{
     cust_id: "abc123",
     ord_date: new Date("Oct 04, 2012"),
     status: 'A',
     price: 25,
     items: [ { sku: "mmm", qty: 5, price: 2.5 },
              { sku: "nnn", qty: 5, price: 2.5 } ]
}
```

- Map step: return key value pairs where the key is the 'sku' of each item, and the value is an object of `{count:1, quantity:  X}`
- Reduce step: Sum count and quantity for each key
- Finalize: Divide quantity/count for an average

![](../images/mapreduce_example.jpg)

*https://www.tutorialspoint.com/map_reduce/map_reduce_introduction.htm*

# Lab Exercises

Work on the first three questions of the lab.

Snowball help:
Once you get an answer look around and see if anybody is still trying to get there. Help them out!

# Data Aggregation Pipeline

*Aggregations* in MongoDB is a pipeline for combining data processing actions in MongoDB.

Things you may want to do:

- **match**: Select a subset of data (as you can do with 'find')
- **sort**: Order data by the values of a certain key
- **group**: Group data based on a key - like 'groupby' in Pandas
- **limit**: Trim the number of documents in the dataset
- **unwind**: Deconstruct an array, so that there is a document for every value of the array
- **project**: Select specific fields (like with the second argument to 'find')

These are in fact the names of *stages* of the pipeline:

- **\$match**: Select a subset of data (as you can do with 'find')
- **\$sort**: Order data by the values of a certain key
- **\$group**: Group data based on a key - like 'groupby' in Pandas
- **\$limit**: Trim the number of documents in the dataset
- **\$unwind**: Deconstruct an array, so that there is a document for every value of the array
- **\$project**: Select specific fields (like with the second argument to 'find')

Let's consider these in detail, in practice.

In [16]:
db = client.week9
db.cooking.find_one({})

{'_id': ObjectId('5ec70b98585a751f26094e3b'),
 'id': 12734,
 'cuisine': 'italian',
 'ingredients': ['chopped tomatoes',
  'fresh basil',
  'garlic',
  'extra-virgin olive oil',
  'kosher salt',
  'flat leaf parsley']}

Basics of the aggregations pipeline:

`db.collectionName.aggregate(pipeline)`

where

```python
pipeline = [
    stage1,
    stage2,
    ...
    and_so_on
]
```

# $match

Same as `find`, but a good place to start

In [17]:
pipeline = [
    {
        "$match": { "cuisine": "greek" }
    }
]

results = db.cooking.aggregate(pipeline)
list(results)[:2]

[{'_id': ObjectId('5ec70b98585a751f26094e99'),
  'id': 4635,
  'cuisine': 'greek',
  'ingredients': ['minced garlic',
   'dried oregano',
   'red wine vinegar',
   'olive oil',
   'boneless chop pork',
   'lemon juice']},
 {'_id': ObjectId('5ec70b98585a751f26094eec'),
  'id': 18031,
  'cuisine': 'greek',
  'ingredients': ['fresh dill',
   'yoghurt',
   'salt',
   'myzithra',
   'large eggs',
   'cheese',
   'feta cheese',
   'phyllo',
   'kefalotyri',
   'ground black pepper',
   'extra-virgin olive oil',
   'onions']}]

# $sort

Provide an object where the field names to sort by are the keys, and '-1' or '1' specify to sort in ascending or descending order.

Here, we sort by alphabetical order on 'cuisine' - we'll try something more useful shortly.

In [18]:
pipeline = [
    {
        "$sort": { "cuisine": -1 }
    }
]

results = db.cooking.aggregate(pipeline)
list(results)[:1]

[{'_id': ObjectId('5ec70b98585a751f26094e7f'),
  'id': 4715,
  'cuisine': 'vietnamese',
  'ingredients': ['sweetened condensed milk', 'ice', 'espresso']}]

# $project

Select the columns that you want in the results, or exclude columns.

In [44]:
pipeline = [
    { "$match": { "cuisine": "greek" }  },
    {
        "$project": {"_id": 0, "ingredients": 0}
    }
]

results = db.cooking.aggregate(pipeline)
list(results)[:2]

[{'cuisine': 'greek', 'id': 10259}, {'cuisine': 'greek', 'id': 34471}]

In [21]:
pipeline = [
    { "$match": { "cuisine": "greek" } },
    { "$project": {"cuisine": 1}  }
]

results = db.cooking.aggregate(pipeline)
list(results)[:2]

[{'_id': ObjectId('5ec70b6b585a751f26094e32'), 'cuisine': 'greek'},
 {'_id': ObjectId('5ec70b98585a751f26094e8f'), 'cuisine': 'greek'}]

`$project` is usually for your benefit (it's more readable!), but that's not bad! If you're only focused on one or two pieces of information, it's easier to see that information with `$project`

## Lab Excercise

Work on the next question of the lab (writing an aggregation pipeline to answer the previous question).

# $limit

Same as `limit(n)`.

In [42]:
pipeline = [
    {
        "$match": { "cuisine": "greek" }
    },
    {
        "$limit": 1
    }
]

results = db.cooking.aggregate(pipeline)
len(list(results))

1

# $unwind

Expand each item in a list to it's own document.

Before:
    
```python
[{
  'cuisine': 'greek',
  'ingredients': ['romaine lettuce',
   'black olives',
   'feta cheese crumbles']
}]
```

After

```python
[
    {'cuisine': 'greek', 'ingredients': 'romaine lettuce'},
    {'cuisine': 'greek', 'ingredients': 'black olives'},
    {'cuisine': 'greek', 'ingredients': 'feta cheese crumbles'}
]
```

Step by step: what do the results below represent?

In [20]:
pipeline = [
    {"$match": {"cuisine": "greek" }},
    { "$limit": 2 },
    {"$project": {"_id":0, "id":0 }}
]
  
results = db.cooking.aggregate(pipeline)
list(results)

[{'cuisine': 'greek',
  'ingredients': ['romaine lettuce',
   'black olives',
   'grape tomatoes',
   'garlic',
   'pepper',
   'purple onion',
   'seasoning',
   'garbanzo beans',
   'feta cheese crumbles']},
 {'cuisine': 'greek',
  'ingredients': ['ground pork',
   'finely chopped fresh parsley',
   'onions',
   'salt',
   'vinegar',
   'caul fat']}]

**When you're referring to a field in a value (rather than a *key*), precede the name with '\$'**

'cuisine' is referred to in a key here:

```
{ "$match": { "cuisine": "greek" } }
```

'ingredients' is referred to in a value:

```
{ "$unwind": "$ingredients" }
```

*How does this differ?*

```
pipeline = [
    { "$match": { "cuisine": "greek" } },
    { "$limit": 1 },
    { "$unwind": '$ingredients' }
]
```
vs.

In [54]:
pipeline = [
    { "$match": { "cuisine": "greek" } },
    { "$unwind": '$ingredients' },
    { "$limit": 1 }
]

results = db.cooking.aggregate(pipeline)
list(results)

[{'_id': ObjectId('5ec70b6b585a751f26094e32'),
  'cuisine': 'greek',
  'id': 10259,
  'ingredients': 'romaine lettuce'}]

```python
[
    { "$match": { "cuisine": "greek" } },
    { "$unwind": '$ingredients' },
    { "$limit": 1 }
]
```

This is quick, because MongoDB is smart - it sees that there's a `$limit` after the `$unwind`, so internally it optimizes the search and doesn't unwind all 40k results.

### Exercises

Do the next question in the lab. (How much do the following categories show up?)

# $group

It's our split-apply-combine pattern in MongoDB.

In [19]:
pipeline = [
    { 
        "$group": {
            "_id": "$cuisine",
            "num_matching": { "$sum": 1 }
        }
    }
]

results = db.cooking.aggregate(pipeline)
list(results)

[{'_id': 'mexican', 'num_matching': 6438},
 {'_id': 'spanish', 'num_matching': 989},
 {'_id': 'southern_us', 'num_matching': 4320},
 {'_id': 'british', 'num_matching': 804},
 {'_id': 'indian', 'num_matching': 3003},
 {'_id': 'moroccan', 'num_matching': 821},
 {'_id': 'greek', 'num_matching': 1175},
 {'_id': 'korean', 'num_matching': 830},
 {'_id': 'filipino', 'num_matching': 755},
 {'_id': 'jamaican', 'num_matching': 526},
 {'_id': 'irish', 'num_matching': 667},
 {'_id': 'italian', 'num_matching': 7838},
 {'_id': 'french', 'num_matching': 2646},
 {'_id': 'cajun_creole', 'num_matching': 1546},
 {'_id': 'japanese', 'num_matching': 1423},
 {'_id': 'vietnamese', 'num_matching': 825},
 {'_id': 'chinese', 'num_matching': 2673},
 {'_id': 'brazilian', 'num_matching': 467},
 {'_id': 'russian', 'num_matching': 489},
 {'_id': 'thai', 'num_matching': 1539}]

Sort and limit:

In [20]:
pipeline = [
    { 
        "$group": {
            "_id": "$cuisine",
            "num_matching": { "$sum": 1 }
        } 
    },
    {
        "$sort": {"num_matching": -1 }
    },
    {
        "$limit": 3
    }
]

results = db.cooking.aggregate(pipeline)
list(results)

[{'_id': 'italian', 'num_matching': 7838},
 {'_id': 'mexican', 'num_matching': 6438},
 {'_id': 'southern_us', 'num_matching': 4320}]

In [21]:
pipeline = [
    { "$unwind": "$ingredients" },
    { 
        "$group": {
            "_id": "$ingredients",
            "count": { "$sum": 1 }
        }
    },
    { "$sort": {"count": -1 }  },
    { "$limit": 10 }
]

results = db.cooking.aggregate(pipeline)
list(results)

[{'_id': 'salt', 'count': 18049},
 {'_id': 'onions', 'count': 7972},
 {'_id': 'olive oil', 'count': 7972},
 {'_id': 'water', 'count': 7457},
 {'_id': 'garlic', 'count': 7380},
 {'_id': 'sugar', 'count': 6434},
 {'_id': 'garlic cloves', 'count': 6237},
 {'_id': 'butter', 'count': 4848},
 {'_id': 'ground black pepper', 'count': 4785},
 {'_id': 'all-purpose flour', 'count': 4632}]

`$group` follows the following pattern:

```python
"$group": {
            "_id": GROUPING_CONDITIONS,
            FIELD: ACCUMULATOR
        }
```

You always need an id. It can be a string (to group by a single column), an object where the keys are new names and the values are the fields that your grouping by, or `None`, which groups the entire dataset into a single point.

The `_id` can be an object with multiple values.

In [22]:
pipeline = [
    { "$unwind": "$ingredients" },
    { 
        "$group": {
            "_id": {"cuisine": "$cuisine", "ingredients": "$ingredients"},
            "num_matching": { "$sum": 1 }
        }
    },
    { "$sort": {"num_matching": -1 }  },
    { "$limit": 4 }
]

results = db.cooking.aggregate(pipeline)
list(results)

[{'_id': {'cuisine': 'italian', 'ingredients': 'salt'}, 'num_matching': 3454},
 {'_id': {'cuisine': 'italian', 'ingredients': 'olive oil'},
  'num_matching': 3111},
 {'_id': {'cuisine': 'mexican', 'ingredients': 'salt'}, 'num_matching': 2720},
 {'_id': {'cuisine': 'southern_us', 'ingredients': 'salt'},
  'num_matching': 2290}]

here I named the id keys as the same thing as the incoming information:
    
```
"_id": {"cuisine": "$cuisine", "ingredients": "$ingredients"}
```

But they can be named whatever you like. e.g.

```
"_id": {"foo": "$cuisine", "bar": "$ingredients"}
```

Count total number of ingredients by grouping on `None`:

In [23]:
pipeline = [
    { "$unwind": "$ingredients" },
    { 
        "$group": {
            "_id": None,
            "num_matching": { "$sum": 1 }
        }
    },
    { "$sort": {"num_matching": -1 }  },
    { "$limit": 3 }
]

results = db.cooking.aggregate(pipeline)
list(results)

[{'_id': None, 'num_matching': 428275}]

## Lab Exercise

Next question - writing an aggregation pipeline that needs '$group' to count categories in the data.

## Groupby operators

- `$sum`
  - Using `{ "$sum": 1 }` returns a count, but you can also sum a numeric set of values with `{ "$sum": "$keyName" }`
- `$avg`
- `$first`
- `$last`
- `$min`
- `$max`

How would you get the average number of ingredients per cuisine type?

- First step: how do you get a count of ingredients per each individual recipe?

In [None]:
pipeline = [
    { "$unwind": "$ingredients" },
    { 
        "$group": {
            "_id": {'cuisine':'$cuisine', 'id': '$id'},
            "num_ingredients": { "$sum": 1 }
        }
    },
    { 
        "$group": {
            "_id": {'cuisine':'$_id.cuisine'},
            "average_ingredients": { "$avg": "$num_ingredients" }
        }
    },
    {
        "$sort": { "average_ingredients": 1}
    }
]

results = db.cooking.aggregate(pipeline)
list(results)