In [1]:
from pymongo import MongoClient

%matplotlib inline

mongo_client = MongoClient('this_mongo')
database_reference = mongo_client.twitter

In [2]:
database_reference.collection_names()

['vineland_test_group', 'tweets']

In [3]:
collection_reference = database_reference.vineland_test_group

In [4]:
collection_reference.count()

10015

## The Aggregation Pipeline

A call to the aggregation framework defines a pipeline (figure 6.1), the **aggregation pipeline**, where the output from each step in the pipeline provides input to the next step. Each step executes a single operation on the input documents to transform the input and generate output documents.

![](https://www.evernote.com/l/AAGxerRxKLZNFrjqxlYK2HPz1R11tr95FFkB/image.png)

### Useful Aggregation Pipeline Operations

- `$project` // Specify fields to be placed in the output document.
- `$match` // Select documents to be processed, similar to find().
- `$limit` // Limit the number of documents to be passed to the next step.
- `$skip` // Skip a specified number of documents.
- `$unwind` // Expand an array, generating one output document for each array entry.
- `$group` // Group documents by a specified key.
- `$sort` // Sort documents.
- `$geoNear` // Select documents near a geospatial location.
- `$out` // Write the results of the pipeline to a collection (new in v2.6).
- `$redact` // Control access to certain data (new in v2.6).

In [5]:
PROJECT = "$project"
MATCH = "$match"
LIMIT = "$limit"
UNWIND = "$unwind"
GROUP = "$group"
SORT = "$sort"
COUNT = "$count"

In [16]:
test_group = database_reference.vineland_test_group

In [17]:
cur = test_group.aggregate([
    { MATCH : { "geo" : { "$ne" : None }}},
    { COUNT : "geo"}
])

In [18]:
next(cur)

{'geo': 985}

In [19]:
match_non_null_geo = { MATCH : { "geo" : { "$ne" : None }}}
count_geo = { COUNT : "geo"}

dag_count_non_null_geo = [
    match_non_null_geo,
    count_geo
]

In [20]:
next(test_group.aggregate(dag_count_non_null_geo))

{'geo': 985}

### Group Template

    { $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }
    
#### Accumulators

- `$sum`
- `$avg`
- `$first`
- `$last`
- `$max`
- `$min`
- `$stdDevPop`
- `$stdDevSamp`

In [21]:
greater_than_10 = { "$gt" : 10}
sum_1 = { "$sum" : 1 }

def group_and_count(key):
    return { GROUP : {
                 "_id" : key,
                 "count" : sum_1
                }
           }

match_count_gt = { MATCH : { "count" : greater_than_10 } }

sort_by_count = { SORT : {"count" : -1}}

def limit(val):
    return { LIMIT : val }

In [22]:
list(test_group.aggregate(
    [
        group_and_count('$lang'),
        match_count_gt, 
        sort_by_count,
        limit(10)
    ]
))


[{'_id': 'en', 'count': 8812},
 {'_id': 'und', 'count': 845},
 {'_id': 'es', 'count': 81},
 {'_id': 'tl', 'count': 49},
 {'_id': 'ht', 'count': 28},
 {'_id': 'in', 'count': 21},
 {'_id': 'pt', 'count': 20},
 {'_id': 'fr', 'count': 19},
 {'_id': 'tr', 'count': 16},
 {'_id': 'nl', 'count': 12}]

In [23]:
not_an_empty_array = { "$ne" : [] }
match_non_empty_hashtag_arrays = { MATCH : { "entities.hashtags" : not_an_empty_array}}
project_to_text_only = { PROJECT : { "text" : "$entities.hashtags.text", "_id" :0 }}
unwind_text = { UNWIND : "$text" }

list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        limit(10)
    ]
))


[{'text': 'job'},
 {'text': 'Roanoke'},
 {'text': 'Sales'},
 {'text': 'Hiring'},
 {'text': 'CareerArc'},
 {'text': 'ImpeachTrump'},
 {'text': 'Resist'},
 {'text': 'SuperBowl'},
 {'text': 'ImWithKap'},
 {'text': 'StopPoliceBrutality'}]

In [24]:
list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        group_and_count('$text'),
        match_count_gt, 
        sort_by_count,
        limit(10)
    ]
))

[{'_id': 'job', 'count': 269},
 {'_id': 'CareerArc', 'count': 215},
 {'_id': 'Hiring', 'count': 199},
 {'_id': 'hiring', 'count': 119},
 {'_id': 'Job', 'count': 71},
 {'_id': 'Jobs', 'count': 71},
 {'_id': 'SuperBowl', 'count': 45},
 {'_id': 'Wilmington', 'count': 41},
 {'_id': 'ImWithKap', 'count': 33},
 {'_id': 'BoycottSuperBowlSponsors', 'count': 31}]

In [25]:
job_hashtags = ['job', 'jobs', 'hiring', 'careerarc']
location_hashtags = ['california', 'losangeles', 'la', 'santamonica', 'glendale', 'paloalto']
project_to_lower = { PROJECT : { "text" : {"$toLower" : "$text"} } }
match_not_in_bad = { MATCH : { "_id" : { "$nin" : job_hashtags + location_hashtags}}}

list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        project_to_lower,
        group_and_count('$text'),
        match_not_in_bad,
        match_count_gt, 
        sort_by_count,
        limit(50)
    ]
))

[{'_id': 'superbowl', 'count': 51},
 {'_id': 'flyeaglesfly', 'count': 46},
 {'_id': 'wilmington', 'count': 41},
 {'_id': 'imwithkap', 'count': 33},
 {'_id': 'boycottsuperbowl2018', 'count': 31},
 {'_id': 'takeaknee', 'count': 31},
 {'_id': 'stoppolicebrutality', 'count': 31},
 {'_id': 'boycottsuperbowlsponsors', 'count': 31},
 {'_id': 'eagles', 'count': 24},
 {'_id': 'columbia', 'count': 24},
 {'_id': 'superbowllii', 'count': 23},
 {'_id': 'retail', 'count': 22},
 {'_id': 'healthcare', 'count': 19},
 {'_id': 'vineland', 'count': 16},
 {'_id': 'hospitality', 'count': 16},
 {'_id': 'bristol', 'count': 16},
 {'_id': 'labor', 'count': 15},
 {'_id': 'it', 'count': 14},
 {'_id': 'vawx', 'count': 14},
 {'_id': 'engineering', 'count': 14},
 {'_id': 'swvawx', 'count': 14},
 {'_id': 'stillmissinghotch', 'count': 13},
 {'_id': 'veterans', 'count': 13},
 {'_id': 'nohotchnowatch', 'count': 13},
 {'_id': 'sales', 'count': 13},
 {'_id': 'nursing', 'count': 12},
 {'_id': 'rn', 'count': 11},
 {'_id': '