Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix headings and code blocks for rta-hierarchical

Signed-off-by: Rick Copeland <rick@arborian.com>
  • Loading branch information...
commit 2e348219ba7fc6c7b90cf56d6dea41720b9ca4a4 1 parent 095d264
@rick446 authored
View
174 source/tutorial/usecase/real-time-analytics-hierarchical-aggregation.txt
@@ -1,14 +1,15 @@
+=============================================
Real Time Analytics: Hierarchical Aggregation
=============================================
Problem
--------
+=======
You have a large amount of event data that you want to analyze at
multiple levels of aggregation.
-Solution overview
------------------
+Solution Overview
+=================
For this solution we will assume that the incoming event data is already
stored in an incoming 'events' collection. For details on how we could
@@ -32,56 +33,55 @@ job is illustrated below:
Note that the events rolling into the hourly collection is qualitatively
different than the hourly statistics rolling into the daily collection.
-Aside: Map-Reduce Algorithm
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
+.. note::
-Map/reduce is a popular aggregation algorithm that is optimized for
-embarrassingly parallel problems. The psuedocode (in Python) of the
-map/reduce algorithm appears below. Note that we are providing the
-psuedocode for a particular type of map/reduce where the results of the
-map/reduce operation are *reduced* into the result collection, allowing
-us to perform incremental aggregation which we'll need in this case.
+ **Map/reduce** is a popular aggregation algorithm that is optimized for
+ embarrassingly parallel problems. The psuedocode (in Python) of the
+ map/reduce algorithm appears below. Note that we are providing the
+ psuedocode for a particular type of map/reduce where the results of the
+ map/reduce operation are *reduced* into the result collection, allowing
+ us to perform incremental aggregation which we'll need in this case.
-::
+ .. code-block:: python
- def map_reduce(icollection, query,
- mapf, reducef, finalizef, ocollection):
- '''Psuedocode for map/reduce with output type="reduce" in MongoDB'''
- map_results = defaultdict(list)
- def emit(key, value):
- '''helper function used inside mapf'''
- map_results[key].append(value)
+ def map_reduce(icollection, query,
+ mapf, reducef, finalizef, ocollection):
+ '''Psuedocode for map/reduce with output type="reduce" in MongoDB'''
+ map_results = defaultdict(list)
+ def emit(key, value):
+ '''helper function used inside mapf'''
+ map_results[key].append(value)
- # The map phase
- for doc in icollection.find(query):
- mapf(doc)
+ # The map phase
+ for doc in icollection.find(query):
+ mapf(doc)
- # Pull in documents from the output collection for
- # output type='reduce'
- for doc in ocollection.find({'_id': {'$in': map_results.keys() } }):
- map_results[doc['_id']].append(doc['value'])
+ # Pull in documents from the output collection for
+ # output type='reduce'
+ for doc in ocollection.find({'_id': {'$in': map_results.keys() } }):
+ map_results[doc['_id']].append(doc['value'])
- # The reduce phase
- for key, values in map_results.items():
- reduce_results[key] = reducef(key, values)
+ # The reduce phase
+ for key, values in map_results.items():
+ reduce_results[key] = reducef(key, values)
- # Finalize and save the results back
- for key, value in reduce_results.items():
- final_value = finalizef(key, value)
- ocollection.save({'_id': key, 'value': final_value})
+ # Finalize and save the results back
+ for key, value in reduce_results.items():
+ final_value = finalizef(key, value)
+ ocollection.save({'_id': key, 'value': final_value})
-The embarrassingly parallel part of the map/reduce algorithm lies in the
-fact that each invocation of mapf, reducef, and finalizef are
-independent of each other and can, in fact, be distributed to different
-servers. In the case of MongoDB, this parallelism can be achieved by
-using sharding on the collection on which we are performing map/reduce.
+ The embarrassingly parallel part of the map/reduce algorithm lies in the
+ fact that each invocation of mapf, reducef, and finalizef are
+ independent of each other and can, in fact, be distributed to different
+ servers. In the case of MongoDB, this parallelism can be achieved by
+ using sharding on the collection on which we are performing map/reduce.
-Schema design
--------------
+Schema Design
+=============
When designing the schema for event storage, we need to keep in mind the
necessity to differentiate between events which have been included in
@@ -92,9 +92,9 @@ our event logging process as it has to fetch event keys one-by one.
If we are able to batch up our inserts into the event table, we can
still use an auto-increment primary key by using the find\_and\_modify
-command to generate our \_id values:
+command to generate our ``_id`` values:
-::
+.. code-block:: python
>>> obj = db.my_sequence.find_and_modify(
... query={'_id':0},
@@ -110,7 +110,7 @@ we'll assume that we are calculating average session length for
logged-in users on a website. Our event format will thus be the
following:
-::
+.. code-block:: javascript
{
"userid": "rick",
@@ -124,7 +124,7 @@ the number of sessions to enable us to incrementally recompute the
average session times. Each of our aggregate documents, then, looks like
the following:
-::
+.. code-block:: javascript
{
_id: { u: "rick", d: ISODate("2010-10-10T14:00:00Z") },
@@ -140,7 +140,7 @@ document. This will help us as we incrementally update the various
levels of the hierarchy.
Operations
-----------
+==========
In the discussion below, we will assume that all the events have been
inserted and appropriately timestamped, so our main operations are
@@ -150,8 +150,8 @@ case, we will assume that the last time the particular aggregation was
run is stored in a last\_run variable. (This variable might be loaded
from MongoDB or another persistence mechanism.)
-Aggregate from events to the hourly level
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Aggregate From Events to the Hourly Level
+-----------------------------------------
Here, we want to load all the events since our last run until one minute
ago (to allow for some lag in logging events). The first thing we need
@@ -160,7 +160,7 @@ and PyMongo to interface with the MongoDB server, note that the various
functions (map, reduce, and finalize) that we pass to the mapreduce
command must be Javascript functions. The map function appears below:
-::
+.. code-block:: python
mapf_hour = bson.Code('''function() {
var key = {
@@ -188,7 +188,7 @@ was performed.
Our reduce function is also fairly straightforward:
-::
+.. code-block:: python
reducef = bson.Code('''function(key, values) {
var r = { total: 0, count: 0, mean: 0, ts: null };
@@ -207,7 +207,7 @@ finalize results can lead to difficult-to-debug errors. Also note that
we are ignoring the 'mean' and 'ts' values. These will be provided in
the 'finalize' step:
-::
+.. code-block:: python
finalizef = bson.Code('''function(key, value) {
if(value.count > 0) {
@@ -221,7 +221,7 @@ Here, we compute the mean value as well as the timestamp we will use to
write back to the output collection. Now, to bind it all together, here
is our Python code to invoke the mapreduce command:
-::
+.. code-block:: python
cutoff = datetime.utcnow() - timedelta(seconds=60)
query = { 'ts': { '$gt': last_run, '$lt': cutoff } }
@@ -241,14 +241,14 @@ Because we used the 'reduce' option on our output, we are able to run
this aggregation as often as we like as long as we update the last\_run
variable.
-Index support
-^^^^^^^^^^^^^
+Index Support
+~~~~~~~~~~~~~
Since we are going to be running the initial query on the input events
frequently, we would benefit significantly from and index on the
timestamp of incoming events:
-::
+.. code-block:: python
>>> db.stats.hourly.ensure_index('ts')
@@ -257,14 +257,14 @@ index has the advantage of being right-aligned, which basically means we
only need a thin slice of the index (the most recent values) in RAM to
achieve good performance.
-Aggregate from hour to day
-~~~~~~~~~~~~~~~~~~~~~~~~~~
+Aggregate from Hour to Day
+--------------------------
In calculating the daily statistics, we will use the hourly statistics
as input. Our map function looks quite similar to our hourly map
function:
-::
+.. code-block:: python
mapf_day = bson.Code('''function() {
var key = {
@@ -295,7 +295,7 @@ hourly aggregations, we can, in fact, use the same reduce and finalize
functions. The actual Python code driving this level of aggregation is
as follows:
-::
+.. code-block:: python
cutoff = datetime.utcnow() - timedelta(seconds=60)
query = { 'value.ts': { '$gt': last_run, '$lt': cutoff } }
@@ -318,27 +318,26 @@ aggregating from the stats.hourly collection into the stats.daily
collection.
Index support
-^^^^^^^^^^^^^
+~~~~~~~~~~~~~
Since we are going to be running the initial query on the hourly
statistics collection frequently, an index on 'value.ts' would be nice
to have:
-::
+.. code-block:: python
>>> db.stats.hourly.ensure_index('value.ts')
Once again, this is a right-aligned index that will use very little RAM
for efficient operation.
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-Other aggregations
-~~~~~~~~~~~~~~~~~~
+Other Aggregations
+------------------
Once we have our daily statistics, we can use them to calculate our
weekly and monthly statistics. Our weekly map function is as follows:
-::
+.. code-block:: python
mapf_week = bson.Code('''function() {
var key = {
@@ -360,7 +359,7 @@ subtracting days until we get to the beginning of the week. In our
weekly map function, we will choose the first day of the month as our
group key:
-::
+.. code-block:: python
mapf_month = bson.Code('''function() {
d: new Date(
@@ -381,7 +380,7 @@ are identical to one another except for the date calculation. We can use
Python's string interpolation to refactor our map function definitions
as follows:
-::
+.. code-block:: python
mapf_hierarchical = '''function() {
var key = {
@@ -426,7 +425,7 @@ as follows:
Our Python driver can also be refactored so we have much less code
duplication:
-::
+.. code-block:: python
def aggregate(icollection, ocollection, mapf, cutoff, last_run):
query = { 'value.ts': { '$gt': last_run, '$lt': cutoff } }
@@ -439,7 +438,7 @@ duplication:
Once this is defined, we can perform all our aggregations as follows:
-::
+.. code-block:: python
cutoff = datetime.utcnow() - timedelta(seconds=60)
aggregate(db.events, db.stats.hourly, mapf_hour, cutoff, last_run)
@@ -455,20 +454,20 @@ So long as we save/restore our 'last\_run' variable between
aggregations, we can run these aggregations as often as we like since
each aggregation individually is incremental.
-Index support
-^^^^^^^^^^^^^
+Index Support
+~~~~~~~~~~~~~
Our indexes will continue to be on the value's timestamp to ensure
efficient operation of the next level of the aggregation (and they
continue to be right-aligned):
-::
+.. code-block:: python
>>> db.stats.daily.ensure_index('value.ts')
>>> db.stats.monthly.ensure_index('value.ts')
Sharding
---------
+========
To take advantage of distinct shards when performing map/reduce, our
input collections should be sharded. In order to achieve good balancing
@@ -479,22 +478,29 @@ makes sense as the most significant part of the shard key.
In order to prevent a single, active user from creating a large,
unsplittable chunk, we will use a compound shard key with (username,
-timestamp) on each of our collections: >>> db.command('shardcollection',
-'events', { ... key : { 'userid': 1, 'ts' : 1} } ) { "collectionsharded"
-: "events", "ok" : 1 } >>> db.command('shardcollection', 'stats.daily',
-{ ... key : { '\_id': 1} } ) { "collectionsharded" : "stats.daily", "ok"
-: 1 } >>> db.command('shardcollection', 'stats.weekly', { ... key : {
-'\_id': 1} } ) { "collectionsharded" : "stats.weekly", "ok" : 1 } >>>
-db.command('shardcollection', 'stats.monthly', { ... key : { '\_id': 1}
-} ) { "collectionsharded" : "stats.monthly", "ok" : 1 } >>>
-db.command('shardcollection', 'stats.yearly', { ... key : { '\_id': 1} }
-) { "collectionsharded" : "stats.yearly", "ok" : 1 }
+timestamp) on each of our collections:
+
+.. code-block:: python
+
+ >>> db.command('shardcollection','events', {
+ ... key : { 'userid': 1, 'ts' : 1} } )
+ { "collectionsharded": "events", "ok" : 1 }
+ >>> db.command('shardcollection', 'stats.daily')
+ { "collectionsharded" : "stats.daily", "ok": 1 }
+ >>> db.command('shardcollection', 'stats.weekly')
+ { "collectionsharded" : "stats.weekly", "ok" : 1 }
+ >>> db.command('shardcollection', 'stats.monthly')
+ { "collectionsharded" : "stats.monthly", "ok" : 1 }
+ >>> db.command('shardcollection', 'stats.yearly')
+ { "collectionsharded" : "stats.yearly", "ok" : 1 }
We should also update our map/reduce driver so that it notes the output
should be sharded. This is accomplished by adding 'sharded':True to the
output argument:
-… out={ 'reduce': ocollection.name, 'sharded': True }) …
+.. code-block:: python
+
+ ... out={ 'reduce': ocollection.name, 'sharded': True })...
Note that the output collection of a mapreduce command, if sharded, must
-be sharded using \_id as the shard key.
+be sharded using ``_id`` as the shard key.
Please sign in to comment.
Something went wrong with that request. Please try again.