# Map Reduce Algorithm

Map reduce is very functional algorithm where three parts of it can easily executed on different machines. In this assignment, we will try to implement this algorithm into a >100 million rows dataset.

For comparison, we will have two ways of getting counts of carriers.

1. Serial way - Looping through each record and counting each airline's flight
2. Map reduce way - map, reduce and sort, and collect way to counn the flights

In [1]:
import os
import glob
import pandas as pd
import utils
import data_handler

## Data Preperation

In [2]:
data_handler.download_dataset()

data downloaded. you can skip this step or delete data folder to download again.


## Variables

In [3]:
# all files under data folder
file_list = sorted(glob.glob(os.path.join('data', '*.csv.bz2')))

In [4]:
carrier_counts = {}

# Serial Way

Here, we are getting a list of files under the data folder. The serial way requires to loop through all of the records and update one by one. 

In [5]:
%%time
for ind, data_file in enumerate(file_list):
    # read current data
    df = pd.read_csv(data_file, encoding='ISO-8859-1', memory_map=True, low_memory=False)
    
    # unique airlines in dataset
    carriers = df.UniqueCarrier.unique()
    
    # update the global carrier_count
    for key in carriers:
        if key not in carrier_counts:
            carrier_counts.update({key: 0})
    
    # loop through each row in dataframe 
    for carrier in df.UniqueCarrier:
        carrier_counts[carrier] += 1

    # info 
    prefix ='Shape: {} ; {} Mb'.format(
        df.shape, round(df.memory_usage().sum() / 1e+6,2))
    
    # track the progress
    utils.progressbar(len(file_list), ind + 1, prefix=prefix)

Shape: (7009728, 29) ; 1626.26 Mb [####################] 100% 
Wall time: 10min 49s


In [6]:
# results
carrier_counts

{'PS': 83617,
 'TW': 3757747,
 'UA': 13299817,
 'WN': 15976022,
 'EA': 919785,
 'HP': 3636682,
 'NW': 10292627,
 'PA (1)': 316167,
 'PI': 873957,
 'CO': 8145788,
 'DL': 16547870,
 'AA': 14984647,
 'US': 14075530,
 'AS': 2878021,
 'ML (1)': 70622,
 'AQ': 154381,
 'MQ': 3954895,
 'OO': 3090853,
 'XE': 2350309,
 'TZ': 208420,
 'EV': 1697172,
 'FL': 1265138,
 'B6': 811341,
 'DH': 693047,
 'HA': 274265,
 'OH': 1464176,
 'F9': 336958,
 'YV': 854056,
 '9E': 521059}

# Map Reduce Way

So This dataset is very big have millions of records that's why i will use two columns 'FlightNum', 'UniqueCarrier' for creating a dataframe for running the MapReduce Algorithm.

In [None]:
from functools import reduce
from itertools import groupby

data = pd.DataFrame()
cols_to_Load = ['FlightNum', 'UniqueCarrier']
for ind, data_file in enumerate(file_list):
    df = pd.DataFrame(pd.read_csv(data_file, encoding='ISO-8859-1', memory_map=True, low_memory=False, usecols= cols_to_Load))
    data=data.append(df)

In [8]:
carriers = data.UniqueCarrier

In [9]:
%%time
# phase 1.  Mapping

mapping=map(lambda x:(x,1),carriers) 
#print(list(mapping))

Wall time: 997 µs


In [13]:
%%time
# Phase 2.  Shuffling or Sorting

sorted_mapping=sorted(mapping)
#print(sorted_mapping)

Wall time: 0 ns


In [11]:
%%time
# Phase 3. Reducing
grouper=groupby(sorted_mapping, lambda p:p[0])
final=map(lambda l:(l[0],reduce(lambda x, y:x+y,map(lambda p: p[1], l[1]))), grouper)

print(list(final))

[('9E', 521059), ('AA', 14984647), ('AQ', 154381), ('AS', 2878021), ('B6', 811341), ('CO', 8145788), ('DH', 693047), ('DL', 16547870), ('EA', 919785), ('EV', 1697172), ('F9', 336958), ('FL', 1265138), ('HA', 274265), ('HP', 3636682), ('ML (1)', 70622), ('MQ', 3954895), ('NW', 10292627), ('OH', 1464176), ('OO', 3090853), ('PA (1)', 316167), ('PI', 873957), ('PS', 83617), ('TW', 3757747), ('TZ', 208420), ('UA', 13299817), ('US', 14075530), ('WN', 15976022), ('XE', 2350309), ('YV', 854056)]
Wall time: 41 s


## Conclusion

Here we did two ways of getting counts of carriers.

1.Serial way - Looping through each record and counting each airline's flight (Time taking 10min 49s).

2.Map reduce way - map, reduce and sort, and collect way to counn the flights
Mapping involves processing a large data set parallelly to generate <key,value> pairs. These <key,value> pairs are fed to 
reduce which combines the data tuples into a smaller set.Word Count is one of the simplest applications of MapReduce. 
Here we have a huge dataset and we want to count the frequency of words. We run Map on this dataset to generate <key,value> 
pairs.(Time taking 997 µs,0ns,41s = 41s 997µs)