# Taller 5.2  Mongo Map Reduce

Juan Navarro, <jsnavarroa@unal.edu.co>

In [21]:
from pymongo import MongoClient
from pprint import pprint
from bson.code import Code
import pandas as pd

In [22]:
client = MongoClient('localhost', 27017)

# Get the database and collection
db = client.bda
collection = db.restaurants

pprint(db.restaurants.find_one())

{'_id': ObjectId('5beca9cb22cadaee4f01cfc1'),
 'address': {'building': '2780',
             'coord': [-73.98241999999999, 40.579505],
             'street': 'Stillwell Avenue',
             'zipcode': '11224'},
 'borough': 'Brooklyn',
 'cuisine': 'American ',
 'grades': [{'date': datetime.datetime(2014, 6, 10, 0, 0),
             'grade': 'A',
             'score': 5},
            {'date': datetime.datetime(2013, 6, 5, 0, 0),
             'grade': 'A',
             'score': 7},
            {'date': datetime.datetime(2012, 4, 13, 0, 0),
             'grade': 'A',
             'score': 12},
            {'date': datetime.datetime(2011, 10, 12, 0, 0),
             'grade': 'A',
             'score': 12}],
 'name': 'Riviera Caterer',
 'restaurant_id': '40356018'}


## Cuenta los restaurantes agrupados por zipcode usando Map reduce

A continuación se realiza la implementación del conteo de restaurantes usando el código del ejemplo y la suma.

In [23]:
mapper = Code("""
                function () {
                    emit(this.address.zipcode, this.restaurant_id);
                }
               """)

reducer = Code("""
                function (zipcode, restaurants) {
                    return restaurants.length;
                }
                """)

mapper_sum = Code("""
                function () {
                    emit(this.address.zipcode, 1);
                }
               """)

reducer_sum = Code("""
                function (key, values) {
                    return Array.sum(values);
                }
                """)

df = pd.DataFrame(columns={'_id'}).set_index('_id')

# Group the mappers and reducers
variants = (
    ('Count_Length', mapper, reducer, 'restaurants_count'),
    ('Count_Sum', mapper_sum, reducer_sum, 'restaurants_count_sum')
)

for name, mapper, reducer, out in variants:
    
    # Execute the map reduce
    result = db.restaurants.map_reduce(mapper, reducer, out=out)
    cursor = result.find()

    # Expand the cursor and construct the DataFrame
    other = pd.DataFrame(list(cursor)).set_index('_id')
    
    # Join the results
    df = df.join(other, how='outer', rsuffix=name)
    
    df = df.rename(columns={ df.columns[-1]: name })

# Print the results
df = df.rename_axis('zipcode', axis='index')
df.transpose()

zipcode,10001,10002,10003,10004,10005,10006,10007,10009,10010,10011,...,11432,11433,11434,11435,11436,11691,11692,11693,11694,11697
Count_Length,38,21,65,19,8,6,8,23,11,39,...,12,40897493,6,14,40824512,2,40550548,2,7,40366356
Count_Sum,75,29,139,25,15,13,16,46,27,90,...,21,1,9,22,1,4,1,3,12,1


La función reductora del ejemplo no es idempotente, por lo tanto el valor reportado para el conteo es incorrecto. Por ejemplo si se ejecuta el siguiente código en Mongo Shell el resultado es 2, cuando debe ser 3, porque hay 3 unos en total.

```javascript

var reducer = function (zipcode, restaurants) {
  return restaurants.length;;
};

var myKey = 'myKey';
var valuesIdempotent = [1,
                         reducer(myKey, [ 1, 1 ] )
                       ];
printjson(reducer(myKey, valuesIdempotent)); // 2, should be 3
```

# Cuente los restaurantes que están entre las latitudes -75, -74 y las longitudes 40 y 42

Para el conteo se aplica un filtrado inicial y se emite la misma llave en la función de mapeo. El filtrado inicial selecciona los restaurantes dentro del área definida por los puntos extremos. Luego aplica la función de mapeo para contar todos los documentos obtenidos.

In [24]:
mapper = Code("""
                function () {
                    emit("Total", 1);
                }
               """)

reducer = Code("""
                function (key, values) {
                    return Array.sum(values);
                }
                """)

# Execute the map reduce
result = db.restaurants.map_reduce(mapper, reducer, "restaurants_geo_count",
                                  query={"address.coord": 
                                         { "$geoWithin": 
                                          { "$box":  [ [ -75, 42 ], [ -74, 40 ] ] } }})
cursor = result.find()

# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))
    
df

Unnamed: 0,_id,value
0,Total,594.0


# Calcule el promedio de los puntajes agrupados por zipcode

El puntaje usado es el de la primera evaluación, que es la más reciente. En la función de reducción se acumulan la suma de los puntajes y la cantidad de puntajes sumados por código de área. Después de la reducción, la función de finalización calcula el promedio. Los valores usados y el resultado se almacenan en la colección de salida.

In [28]:
# Emit the first (most recent) score on grades array
mapper = Code("""
                function () {
                    var score = {sum: this.grades[0].score, quantity: 1};
                    
                    emit(this.address.zipcode, score);
                };
               """)
# Accumulate the sum and quantity of values
reducer = Code("""
                function (key, values) {
                    var reducedVal = { sum: 0, quantity: 0 };
                    
                    for(var i=0; i < values.length; i++){
                        var value = values[i]
                        reducedVal.sum += value.sum;
                        reducedVal.quantity += value.quantity;
                    }
                    return reducedVal;
                }
                """)
# Calc the average
finalizer =  Code("""
                function (key, reducedVal) {
                   reducedVal.avg = reducedVal.sum/reducedVal.quantity;
                   
                   return reducedVal;
                }
                """)

# Execute the map reduce
collection = db.restaurants.map_reduce(mapper, reducer, 
                                              "restaurants_avg_score", finalize=finalizer)
cursor = collection.find()

# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))

# Format and show results
df = pd.concat([df, pd.io.json.json_normalize(df['value'])], axis='columns') 
df = df.drop(['value'], axis='columns')
df = df.rename(index=str, columns={"_id": "zipcode"})

df[['zipcode', 'avg']].transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,180,181,182,183,184,185,186,187,188,189
zipcode,10001.0,10002.0,10003.0,10004.0,10005.0,10006.0,10007.0,10009.0,10010.0,10011.0,...,11432.0,11433,11434.0,11435.0,11436,11691,11692,11693.0,11694.0,11697
avg,9.49333,10.3793,10.705,10.36,9.66667,8.46154,9.125,11.2826,8.74074,10.2111,...,10.0476,7,7.88889,11.1818,11,12,13,6.66667,13.0833,18


# Liste el restaurante top(de acuerdo a su score) de cada tipo de cuisine

La función reductora elige el mejor. En la función se mapeo se determina el puntaje a usar y los valores que se deben conservar del documento. En la función reductora se compara por el puntaje y se conserva el primer restaurante con el más alto. 

In [26]:
# Emit the cusine and the first (most recent) score on grades array
mapper = Code("""
                function () {
                    var record = {restaurant_id: this.restaurant_id, 
                                    score: this.grades[0].score,
                                    name: this.name};
                    
                    emit(this.cuisine, record);
                };
               """)
# Keep the (first) restaurant with the highest score
reducer = Code("""
                function (key, values) {
                    var top = { restaurant_id: 0, score: 0, name: ""};
                    
                    for(var i=0; i < values.length; i++){
                        var value = values[i]
                        if(value.score > top.score){
                            top = value
                        }
                    }
                    return top;
                }
                """)

# Execute the map reduce
collection = db.restaurants.map_reduce(mapper, reducer, "restaurants_top_by_cuisine")
cursor = collection.find()

# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))

# Format and show results
df = pd.concat([df, pd.io.json.json_normalize(df['value'])], axis='columns') 
df = df.drop(['value'], axis='columns')
df = df.rename(index=str, columns={"_id": "cuisine"})

df.transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,60,61,62,63,64,65,66,67,68,69
cuisine,Afghan,African,American,Armenian,Asian,Bagels/Pretzels,Bakery,Bangladeshi,Barbecue,"Bottled beverages, including water, sodas, jui...",...,Soul Food,Soups & Sandwiches,Spanish,Steak,Tapas,Tex-Mex,Thai,Turkish,Vegetarian,Vietnamese/Cambodian/Malaysia
name,Khyber Pass,Ebe Ye Yie African Restaurant,West 79Th Street Boat Basin Cafe,The Malt House,Tao Restaurant,Ess-A-Bagel,Rio Dela Plata Bakery,Lahore Delicatessen,Brother Jimmy'S Bbq,Alfonso'S Bar,...,Sam'S Restaurant,Hale & Hearty Soups,Los Arrieros Restaurant,Keens Steakhouse,Cafe Ronda,The Heights Bar & Grill,Arharn Thai Cuisine,Pasha Turkish Restaurant,Angelica Kitchen,Pho Bang Restaurant
restaurant_id,40589545,40832718,40756344,40373912,40795021,40396464,40572960,40805827,40790974,40397763,...,40655946,40825546,40401810,40373656,40799203,40571128,40396267,40559546,40388281,40700664
score,12,12,89,10,13,26,48,7,38,13,...,13,26,36,19,23,25,25,12,13,26
