In [1]:
from lib import create_mongo_client_to_database_collection

collection_reference = create_mongo_client_to_database_collection('twitter', 'tweets')

In [2]:
collection_reference.count_documents({})

310387

In [3]:
cursor_sampl = collection_reference.aggregate([{'$sample': {'size': 20}}])

In [4]:
len(list(cursor_sampl)) 

20

| | | | | |
|-|-|-|-|-|
| `_id`        | `truncated`                 |`user`            |`extended_tweet` | `favorited`                   |
| `created_at` | `in_reply_to_status_id`     |`geo`             |`quote_count`    | `retweeted`                   |
| `id`         | `in_reply_to_status_id_str` |`coordinates`     |`reply_count`    | `filter_level`                |
| `id_str`     | `in_reply_to_user_id`       |`place`           |`retweet_count`  | `lang`                        |
| `text`       | `in_reply_to_user_id_str`   |`contributors`    |`favorite_count` | `timestamp_ms`                |
| `source`     | `in_reply_to_screen_name`   |`is_quote_status` |`entities`       |                               |
























![](https://www.evernote.com/l/AAEO7gpKcKdI5YJeGrni4GhdlBiBWdTa3YgB/image.png)

![](https://i.imgflip.com/245tp9.jpg)

## The Aggregation Pipeline

A call to the aggregation framework defines a pipeline (figure 6.1), the **aggregation pipeline**, where the output from each step in the pipeline provides input to the next step. Each step executes a single operation on the input documents to transform the input and generate output documents.

![](https://www.evernote.com/l/AAGxerRxKLZNFrjqxlYK2HPz1R11tr95FFkB/image.png)

### Useful Aggregation Pipeline Operations

- `$project` // Specify fields to be placed in the output document.
- `$match` // Select documents to be processed, similar to find().
- `$limit` // Limit the number of documents to be passed to the next step.
- `$skip` // Skip a specified number of documents.
- `$unwind` // Expand an array, generating one output document for each array entry.
- `$group` // Group documents by a specified key.
- `$sort` // Sort documents.
- `$geoNear` // Select documents near a geospatial location.
- `$out` // Write the results of the pipeline to a collection (new in v2.6).
- `$redact` // Control access to certain data (new in v2.6).

In [5]:
PROJECT = "$project"
MATCH   = "$match"
LIMIT   = "$limit"
UNWIND  = "$unwind"
GROUP   = "$group"
SORT    = "$sort"
COUNT   = "$count"

In [6]:
not_empty = { "$ne" : None }

In [7]:
cursor = collection_reference.aggregate([
    { MATCH : { "geo" : not_empty }},
    { COUNT : "geo"}
])

In [8]:
next(cursor)

{'geo': 38344}

In [9]:
match_non_null_geo = { MATCH : { "geo" : not_empty }}
count_geo = { COUNT : "geo"}

dag_count_non_null_geo = [
    match_non_null_geo,
    count_geo
]

In [10]:
next(collection_reference.aggregate(dag_count_non_null_geo))

{'geo': 38344}

### Group Template

    { $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }
    
#### Accumulators

- `$sum`
- `$avg`
- `$first`
- `$last`
- `$max`
- `$min`
- `$stdDevPop`
- `$stdDevSamp`

In [11]:
greater_than_10 = { "$gt" : 10 }
sum_1 = { "$sum" : 1 }

def group_and_count(key):
    return { GROUP : {
                 "_id"   : key,
                 "count" : sum_1
                }
           }

match_count_gt_10 = { MATCH : { "count" : greater_than_10 } }

sort_by_count_descending = { SORT : { "count" : -1 } }

def limit(val):
    return { LIMIT : val }

In [12]:
group_and_count("$text")

{'$group': {'_id': '$text', 'count': {'$sum': 1}}}

In [13]:
list(collection_reference.aggregate(
    [
        group_and_count('$lang'),
        match_count_gt_10, 
        sort_by_count_descending,
        limit(5)
    ]
))


[{'_id': 'en', 'count': 257374},
 {'_id': 'und', 'count': 29395},
 {'_id': 'es', 'count': 5480},
 {'_id': 'ko', 'count': 2584},
 {'_id': 'fr', 'count': 2456}]

In [14]:
not_an_empty_array             = { "$ne" : [] }
match_non_empty_hashtag_arrays = { MATCH : { "entities.hashtags" : not_an_empty_array } }
project_to_text_only           = { PROJECT : { "text" : "$entities.hashtags.text", "_id" :0 } }
unwind_text                    = { UNWIND : "$text" }

In [15]:
list(collection_reference.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        limit(10)
    ]
))

[{'text': 'Dodgers'},
 {'text': 'DeadBirds'},
 {'text': 'Dodgers'},
 {'text': 'DeadBirds'},
 {'text': 'thestruggleisreal'},
 {'text': 'actorslife'},
 {'text': 'poolday'},
 {'text': 'heatwave'},
 {'text': 'littlethings'},
 {'text': 'LHHH'}]

In [16]:
list(collection_reference.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        group_and_count('$text'),
        match_count_gt_10, 
        sort_by_count_descending,
        limit(10)
    ]
))

[{'_id': 'job', 'count': 1702},
 {'_id': 'Hiring', 'count': 1343},
 {'_id': 'CareerArc', 'count': 1300},
 {'_id': 'LosAngeles', 'count': 1244},
 {'_id': 'losangeles', 'count': 789},
 {'_id': 'earthquake', 'count': 659},
 {'_id': 'hiring', 'count': 637},
 {'_id': 'TeenChoice', 'count': 572},
 {'_id': 'ChoiceFandom', 'count': 550},
 {'_id': 'california', 'count': 506}]

In [17]:
job_hashtags      = ['job', 'jobs', 'hiring', 'careerarc']
location_hashtags = ['california', 'losangeles', 'la', 'santamonica', 'glendale', 'paloalto']
project_to_lower  = { PROJECT : { "text" : {"$toLower" : "$text"} } }
match_not_in_bad  = { MATCH : { "_id" : { "$nin" : job_hashtags + location_hashtags}}}

In [18]:
list(collection_reference.aggregate(
    [
        match_non_empty_hashtag_arrays,
        project_to_text_only,
        unwind_text,
        project_to_lower,
        group_and_count('$text'),
        match_not_in_bad,
        match_count_gt_10, 
        sort_by_count_descending,
        limit(50)
    ]
))

[{'_id': 'earthquake', 'count': 659},
 {'_id': 'teenchoice', 'count': 575},
 {'_id': 'choicefandom', 'count': 554},
 {'_id': 'repost', 'count': 511},
 {'_id': 'directioners', 'count': 451},
 {'_id': 'votevela', 'count': 449},
 {'_id': 'quake', 'count': 422},
 {'_id': 'gazagenocide', 'count': 400},
 {'_id': 'gazaunderattack', 'count': 399},
 {'_id': 'bds', 'count': 399},
 {'_id': 'boycottisrael', 'count': 396},
 {'_id': 'greatreturnmarch', 'count': 392},
 {'_id': 'mtvhottest', 'count': 379},
 {'_id': 'hollywood', 'count': 374},
 {'_id': 'music', 'count': 320},
 {'_id': 'love', 'count': 311},
 {'_id': 'art', 'count': 251},
 {'_id': 'summer', 'count': 213},
 {'_id': 'dtla', 'count': 198},
 {'_id': 'bb20', 'count': 197},
 {'_id': 'tbt', 'count': 192},
 {'_id': '8yearsofonedirection', 'count': 190},
 {'_id': 'lafc', 'count': 185},
 {'_id': 'dodgers', 'count': 179},
 {'_id': 'lafcvla', 'count': 174},
 {'_id': 'hiphop', 'count': 148},
 {'_id': 'wynonnaearp', 'count': 148},
 {'_id': 'silverlak