# Accident analysis map-reduce
This Notebook takes you through how to create map-reduce based queries in MongoDB.

Note that this database only just fits in the memory of the VM. Before you start the activities in this Notebook, make sure you have no other running Notebooks. 

If any of the queries takes more than a few minutes to complete, it's probably because one of the shard servers has run out of memory and failed. Rerun the first three cells and try again. 

If you get more than a couple of failures, reboot the whole VM (_not_ suspend) and try again.

Stop the single-server Mongo instance and start the shard cluster. Look at the output of this command: it could well have failures the first time you run it. If so, just run it again until it works.

In [None]:
!sudo service mongod stop
!sudo /etc/mongo-shards-down
!sudo /etc/mongo-shards-up

In [None]:
# Import the required libraries and open the connection to Mongo

import collections
import datetime
import matplotlib as mpl

import pandas as pd
import scipy.stats

import folium
import uuid

import pymongo

# Needed to create map-reduce jobs
from bson.code import Code

In [None]:
# Open a connection to the Mongo server, open the accidents database and name the collections of accidents and labels

# Note the different port number for this cluster
client = pymongo.MongoClient('mongodb://localhost:27017/')

db = client.accidents
accidents = db.accidents
labels = db.labels
roads = db.roads

## Rerun cells above
If a map-reduce query fails, try rerunning the cells above to restart the Mongo shard cluster.

In [None]:
# Load the expanded names of keys and human-readable codes into memory

expanded_name = collections.defaultdict(str)
for e in labels.find({'expanded': {"$exists": True}}):
    expanded_name[e['label']] = e['expanded']
    
label_of = collections.defaultdict(str)
for l in labels.find({'codes': {"$exists": True}}):
    for c in l['codes']:
        try:
            label_of[l['label'], int(c)] = l['codes'][c]
        except ValueError: 
            label_of[l['label'], c] = l['codes'][c]

## Map-reduce
To start, a very simple query that finds the number of casualties in each district local authority. 

These functions will serve as a template for all the later examples.

In [None]:
# Mapping: for each document, emit the number of casualties for this accident, 
#  with the key of the district code.
# Note that we can't consistently use the dot notation for keys as JavaScript
#  doesn't like bracket characters in variable names.

mapper = Code("""
    function () {
        emit(this['Local_Authority_(Highway)'], 
            {Number_of_Casualties: this.Number_of_Casualties});
    }
""")

In [None]:
# Reducing: this will be passed a set of casualty numbers, all with the 
#  same district code. Reducing them is easy: just add up all the numbers

reducer = Code("""
    function(key, emits) {
        var total = {Number_of_Casualties : 0}
        for (var i in emits) {
            total.Number_of_Casualties += emits[i].Number_of_Casualties;
        }
        return total;
    }
""")

In [None]:
result = accidents.map_reduce(mapper, reducer, 'myresults')
result

The result is stored in a temporary collection. We can either use the `result` variable to access that collection, or we could open a new PyMongo connection to it.

In [None]:
[r for r in result.find(limit=5)]

In [None]:
# Place the results in a pandas Series for plotting.
casualties_by_district_ss = pd.Series(
    {label_of['Local_Authority_(Highway)', r['_id']] : 
     r['value']['Number_of_Casualties']
     for r in result.find()})
casualties_by_district_ss

In [None]:
casualties_by_district_ss[['Aberdeenshire', 'Bedford', 'Wolverhampton']].plot(kind='bar', legend=False)

## Map-reduce with a query
We can also add a `query` to the `map_reduce` call. Only documents that match the query will be included in the mapping phase. In this case, we can use the query to restrict the processing to only accidents that occurred in 2009.

In [None]:
result = accidents.map_reduce(mapper, reducer, 'myresults', 
            query = {'Datetime': {"$gte": datetime.datetime(2009, 1, 1), 
                                 "$lt": datetime.datetime(2010, 1, 1)}})
result

In [None]:
[r for r in result.find(limit=5)]

In [None]:
# Place the results in a pandas Series for plotting.
casualties_by_district_09_ss = pd.Series(
    {label_of['Local_Authority_(Highway)', r['_id']] : 
     r['value']['Number_of_Casualties']
     for r in result.find()})
casualties_by_district_09_ss

In [None]:
casualties_by_district_09_ss[['Aberdeenshire', 'Bedford', 'Wolverhampton']].plot(kind='bar', legend=False)

## Map-reduce with a composite key for intermediate results
There are a lot of district authorities. Let's look at the number of accidents per district authority, split by year. To keep the combinations of authority and year separate, we need to include both values in the key for the intermediate result documents.

In [None]:
# A very similar mapping function to the one above, but this time the key is a compound one, 
#  consisting of the district code and the year of the accident.

mapper = Code("""
    function () {
        emit({district : this['Local_Authority_(Highway)'], year : this.Datetime.getFullYear()}, 
            {Number_of_Casualties: this.Number_of_Casualties});
    }
""")

In [None]:
# The reducer is the same as above. The data for the different categories is kept separate
#  by the different values for the key.
reducer = Code("""
    function(key, emits) {
        var total = {Number_of_Casualties : 0}
        for (var i in emits) {
            total.Number_of_Casualties += emits[i].Number_of_Casualties;
        }
        return total;
    }
""")

In [None]:
result = accidents.map_reduce(mapper, reducer, 'myresults')
result

In [None]:
# How many district-year pairs are there?
result.find().count()

In [None]:
# What do the results look like?
[r for r in result.find(limit=5)]

We can now put these results in a DataFrame.

In [None]:
casualties_by_district_year_long_df = pd.DataFrame([
    {'District name': label_of['Local_Authority_(Highway)', r['_id']['district']],
     'District code': r['_id']['district'],
     'Year': datetime.datetime(int(r['_id']['year']), 12, 31),
     'Number_of_Casualties': r['value']['Number_of_Casualties']}
    for r in result.find()])
casualties_by_district_year_long_df

In [None]:
casualties_by_district_year_df = casualties_by_district_year_long_df.pivot('Year', 'District name', 'Number_of_Casualties')
casualties_by_district_year_df

In [None]:
casualties_by_district_year_df[['Aberdeenshire', 'Bedford', 'Wolverhampton']].plot()

## Activity 1
Use map-reduce to find the number of casualties per police force per year.

The solution is in the [`16.3solutions`](16.3solutions.ipynb) Notebook.

In [None]:
# Insert your solution here.

## Activity 2

Use map-reduce to find the number of accidents from 2009-12, broken down by month. Place the results in a *pandas* Series and plot it. Ensure you get the same answers as we found using the aggregation pipeline earlier. (This pipeline is repeated below.)

Notes:
1. The JavaScript function `.Datetime.getMonth()` will return the month number.
2. JavaScript has its month numbers in the range 0-11, not the 1-12 that Python's `datetime.datetime` expects.
3. Construct the *pandas* Series by passing it a `dict` of `<date>: <count>` pairs so that the resulting Series is in the right order, whatever order the results are returned by Mongo.

The solution is in the [`16.3solutions`](16.3solutions.ipynb) Notebook.

In [None]:
# Solution using aggregation pipeline
results = accidents.aggregate([
    {'$project': {'month': {'$month': '$Datetime'},
                  'year': {'$year': '$Datetime'}}},
    {'$group': {'_id': {'month': '$month', 'year': '$year'},
                'count': {'$sum': 1}}},
    {'$sort': {'_id': 1}}
])

accidents_by_month_ap_ss = pd.Series({datetime.datetime(m['_id']['year'], m['_id']['month'], 1):
                                m['count'] for m in results})
# A hack to change the dates to the end of the month
accidents_by_month_ap_ss.index = accidents_by_month_ap_ss.index.to_period('M').to_timestamp('M')
accidents_by_month_ap_ss

In [None]:
accidents_by_month_ap_ss.plot()

In [None]:
# Insert your solution here.

## Multiple emissions
The `map` function can emit any number of results for each document, though it cannot look at any other documents in the database. For this example, we can look at the number of casualties in each age band over time. The mapper here emits one intermediate result document for each casualty sub-document in each accident.

In [None]:
mapper = Code("""
    function () {
        for (var i in this.Casualties) {
            emit({age_band : this.Casualties[i].Age_Band_of_Casualty, year : this.Datetime.getFullYear()}, 
                {Number_of_Casualties: 1});
        }
    }
""")

In [None]:
reducer = Code("""
    function(key, emits) {
        var total = {Number_of_Casualties : 0}
        for (var i in emits) {
            total.Number_of_Casualties += emits[i].Number_of_Casualties;
        }
        return total;
    }
""")

In [None]:
result = accidents.map_reduce(mapper, reducer, 'myresults')
result

In [None]:
[r for r in result.find(limit=5)]

In [None]:
casualties_by_age_year_long_df = pd.DataFrame([
        {'Age_Band_of_Casualty': r['_id']['age_band'],
         'year': r['_id']['year'],
         'Number_of_Casualties': r['value']['Number_of_Casualties']}
        for r in result.find()
    ])
casualties_by_age_year_long_df

In [None]:
casualties_by_age_year_df = casualties_by_age_year_long_df.pivot('Age_Band_of_Casualty', 
                                                                 'year', 
                                                                 'Number_of_Casualties')
casualties_by_age_year_df.index = ["Unknown"] + [label_of['Age_Band_of_Casualty', i] 
                                                 for i in casualties_by_age_year_df.index[1:]]
casualties_by_age_year_df.columns = [datetime.datetime(int(y), 12, 31) 
                                     for y in casualties_by_age_year_df.columns]
casualties_by_age_year_df

We can plot the age distribution of casualties, showing each age band as a sequence of bars for the sequence of years:

In [None]:
# Just 2012 data
casualties_by_age_year_df[datetime.datetime(2012, 12, 31)].plot(kind='bar')

In [None]:
ax = casualties_by_age_year_df.plot(kind='bar')
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))

With this data, it's difficult to determine if some age groups are more prone to be involved in accidents than others, as the road census data doesn't include information about the ages of the vehicle occupants.

For instance, the drop in casualty numbers for 16–20 year olds could be because of increased road safety measures, or it could be because fewer young people were driving.

## Activity 3
Do particular ages of people drive particular types of vehicle?

Use a map-reduce query to show how `Age_Band_of_Driver` correlates with `Vehicle_Type`. Use a chi-squared test to see if there are significant differences between vehicle types.

Note: Take account of how many people there are for each driver age/vehicle type combination. Remember that every cell in the results you use for the chi-squared test should have a value of at least 5.

The solution is in the [`16.3solutions`](16.3solutions.ipynb) Notebook.

In [None]:
# Insert your solution here.

## Activity 4
It's a bit of a cliché that middle-aged men suffer a mid-life crisis and deal with it by buying a large motorbike in a vain attempt to recapture their lost youth. Does this actually happen?

Develop a map-reduce query to show the number of accidents involving motorcycles, split by age of driver, sex of driver, and capacity of motorcycle.

Do the proportions of motorcycle engine size vary by age? Use a statistical test to see if any change is significant. As the numbers of male and female riders are very different, do this test for all riders, and treating male and female riders separately.

Notes: 

* You're after vehicle types 2-5 inclusive.
* This query is right on the limit of what this VM will support. When you do the map-reduce, restrict the results to just 2011 and 2012, rather than all the data. Add `query = {'Datetime': {"$gte": datetime.datetime(2011, 1, 1)}}` to the `map_reduce()` call.

The solution is in the [`16.3solutions`](16.3solutions.ipynb) Notebook.

In [None]:
# Insert your solution here.

## Clean up

In [None]:
!sudo /etc/mongo-shards-down
!sudo service mongod start

## What next?
If you are working through this Notebook as part of an inline exercise, return to the module materials now.

If you are working through this set of Notebooks as a whole, you've completed the Part 16 Notebooks.