In [1]:
# !pip install pymongo

In [2]:
from pymongo import MongoClient

In [3]:
matplotlib inline

In [4]:
mongo_client = MongoClient('18.236.138.158', 27016)
database_reference = mongo_client.twitter

In [5]:
collection_reference = database_reference.instructor_test_group

In [6]:
collection_reference.count()

20000

In [7]:
cursor_sampl = collection_reference.aggregate([{'$sample': {'size': 20}}])

In [8]:
len(list(cursor_sampl)) 

20

| | | | | |
|-|-|-|-|-|
| `_id`        | `truncated`                 |`user`            |`extended_tweet` | `favorited`                   |
| `created_at` | `in_reply_to_status_id`     |`geo`             |`quote_count`    | `retweeted`                   |
| `id`         | `in_reply_to_status_id_str` |`coordinates`     |`reply_count`    | `filter_level`                |
| `id_str`     | `in_reply_to_user_id`       |`place`           |`retweet_count`  | `lang`                        |
| `text`       | `in_reply_to_user_id_str`   |`contributors`    |`favorite_count` | `timestamp_ms`                |
| `source`     | `in_reply_to_screen_name`   |`is_quote_status` |`entities`       |                               |
























![](https://www.evernote.com/l/AAEO7gpKcKdI5YJeGrni4GhdlBiBWdTa3YgB/image.png)

![](https://i.imgflip.com/245tp9.jpg)

## The Aggregation Pipeline

A call to the aggregation framework defines a pipeline (figure 6.1), the **aggregation pipeline**, where the output from each step in the pipeline provides input to the next step. Each step executes a single operation on the input documents to transform the input and generate output documents.

![](https://www.evernote.com/l/AAGxerRxKLZNFrjqxlYK2HPz1R11tr95FFkB/image.png)

### Useful Aggregation Pipeline Operations

- `$project` // Specify fields to be placed in the output document.
- `$match` // Select documents to be processed, similar to find().
- `$limit` // Limit the number of documents to be passed to the next step.
- `$skip` // Skip a specified number of documents.
- `$unwind` // Expand an array, generating one output document for each array entry.
- `$group` // Group documents by a specified key.
- `$sort` // Sort documents.
- `$geoNear` // Select documents near a geospatial location.
- `$out` // Write the results of the pipeline to a collection (new in v2.6).
- `$redact` // Control access to certain data (new in v2.6).

In [9]:
PROJECT = "$project"
MATCH = "$match"
LIMIT = "$limit"
UNWIND = "$unwind"
GROUP = "$group"
SORT = "$sort"
COUNT = "$count"

In [10]:
test_group = database_reference.instructor_test_group

In [11]:
not_empty = { "$ne" : None }

cursor = test_group.aggregate([
    { MATCH : { "geo" : not_empty }},
    { COUNT : "geo"}
])

In [12]:
next(cursor)

{'geo': 2952}

In [13]:
match_non_null_geo = { MATCH : { "geo" : not_empty }}
count_geo = { COUNT : "geo"}

dag_count_non_null_geo = [
    match_non_null_geo,
    count_geo
]

In [14]:
next(test_group.aggregate(dag_count_non_null_geo))

{'geo': 2952}

### Group Template

    { $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }
    
#### Accumulators

- `$sum`
- `$avg`
- `$first`
- `$last`
- `$max`
- `$min`
- `$stdDevPop`
- `$stdDevSamp`

In [15]:
greater_than_10 = { "$gt" : 10 }
sum_1 = { "$sum" : 1 }

def group_and_count(key):
    return { GROUP : {
                 "_id"   : key,
                 "count" : sum_1
                }
           }

match_count_gt = { MATCH : { "count" : greater_than_10 } }

sort_by_count_descending = { SORT : { "count" : -1 } }

def limit(val):
    return { LIMIT : val }

In [16]:
list(test_group.aggregate(
    [
        group_and_count('$lang'),
        match_count_gt, 
        sort_by_count_descending,
        limit(10)
    ]
))


[{'_id': 'en', 'count': 16996},
 {'_id': 'und', 'count': 1815},
 {'_id': 'es', 'count': 295},
 {'_id': 'tl', 'count': 126},
 {'_id': 'fr', 'count': 121},
 {'_id': 'pt', 'count': 76},
 {'_id': 'ht', 'count': 66},
 {'_id': 'ja', 'count': 61},
 {'_id': 'ar', 'count': 49},
 {'_id': 'it', 'count': 48}]

In [22]:
not_an_empty_array = { "$ne" : [] }
match_non_empty_hashtag_arrays = { MATCH : { "entities.hashtags" : not_an_empty_array } }
project_to_text_only = { PROJECT : { "text" : "$entities.hashtags.text" , "_id" :0} }
unwind_text = { UNWIND : "text" }

list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        limit(10)
    ]
))


OperationFailure: path option to $unwind stage should be prefixed with a '$': text

In [18]:
list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        group_and_count('$text'),
        match_count_gt, 
        sort_by_count_descending,
        limit(10)
    ]
))

[{'_id': 'job', 'count': 395},
 {'_id': 'Hiring', 'count': 308},
 {'_id': 'LosAngeles', 'count': 286},
 {'_id': 'CareerArc', 'count': 240},
 {'_id': 'hiring', 'count': 149},
 {'_id': 'Job', 'count': 107},
 {'_id': 'Jobs', 'count': 107},
 {'_id': 'earthquake', 'count': 67},
 {'_id': 'LA', 'count': 56},
 {'_id': 'losangeles', 'count': 49}]

In [19]:
job_hashtags = ['job', 'jobs', 'hiring', 'careerarc']
location_hashtags = ['california', 'losangeles', 'la', 'santamonica', 'glendale', 'paloalto']
project_to_lower = { PROJECT : { "text" : {"$toLower" : "$text"} } }
match_not_in_bad = { MATCH : { "_id" : { "$nin" : job_hashtags + location_hashtags}}}

list(test_group.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        project_to_lower,
        group_and_count('$text'),
        match_not_in_bad,
        match_count_gt, 
        sort_by_count_descending,
        limit(50)
    ]
))

[{'_id': 'earthquake', 'count': 67},
 {'_id': 'goldenglobes', 'count': 56},
 {'_id': 'quake', 'count': 46},
 {'_id': 'art', 'count': 40},
 {'_id': 'healthcare', 'count': 38},
 {'_id': 'superbowl', 'count': 28},
 {'_id': 'retail', 'count': 26},
 {'_id': 'sales', 'count': 25},
 {'_id': 'rn', 'count': 25},
 {'_id': 'marketing', 'count': 25},
 {'_id': 'gonancygo', 'count': 24},
 {'_id': 'hospitality', 'count': 23},
 {'_id': 'grammys', 'count': 22},
 {'_id': 'repost', 'count': 22},
 {'_id': 'it', 'count': 21},
 {'_id': 'releasethememo', 'count': 21},
 {'_id': 'timesup', 'count': 19},
 {'_id': 'nsng', 'count': 19},
 {'_id': 'love', 'count': 19},
 {'_id': 'clerical', 'count': 18},
 {'_id': 'businessmgmt', 'count': 17},
 {'_id': 'tv', 'count': 17},
 {'_id': 'hollywood', 'count': 16},
 {'_id': 'trumpshutdown', 'count': 16},
 {'_id': 'script', 'count': 16},
 {'_id': 'beverlyhills', 'count': 15},
 {'_id': 'dtla', 'count': 15},
 {'_id': 'comedy', 'count': 15},
 {'_id': 'actorslife', 'count': 14},
