# Map Reduce Algorithm

Map reduce is very functional algorithm where three parts of it can easily executed on different machines. In this assignment, we will try to implement this algorithm into a >100 million rows dataset.

For comparison, we will have two ways of getting counts of carriers.

1. Serial way - Looping through each record and counting each airline's flight
2. Map reduce way - map, reduce and sort, and collect way to counn the flights

In [1]:
import os
import glob
import pandas as pd
import utils
import data_handler

## Data Preperation

In [2]:
data_handler.download_dataset()

download status:  [####################] 100% 


## Variables

In [3]:
# all files under data folder
file_list = sorted(glob.glob(os.path.join('data', '*.csv.bz2')))

In [4]:
carrier_counts = {}

## Serial Way

Here, we are getting a list of files under the data folder. The serial way requires to loop through all of the records and update one by one. 

If you think your computer is fast enough, you can also try `read_as_dataframe` in data_handler module. You can simply use the following code rather than looping through each file.

```py
# inital value dictionary
carrier_counts = {}

# read all datasets into one
df = data_handler.read_as_dataframe()

# unique careers
carriers = df.UniqueCarrier.unique()

# get the counts
for carrier in df.UniqueCarrier:
    carrier_counts[carrier] += 1
```

Since my computer wasn't able to finish it, I prefered the loop. Hopefully, you won't run into these issues thanks to the Map Reduce Algorithm!

In [5]:
for ind, data_file in enumerate(file_list):
    # read current data
    df = pd.read_csv(data_file, encoding='ISO-8859-1', memory_map=True, low_memory=False) 
    
    # unique airlines in dataset
    carriers = df.UniqueCarrier.unique()
    
    # update the global carrier_count
    for key in carriers:
        if key not in carrier_counts:
            carrier_counts.update({key: 0})
    
    # loop through each row in dataframe 
    for carrier in df.UniqueCarrier:
        carrier_counts[carrier] += 1

    # info 
    prefix ='Shape: {} ; {} Mb'.format(
        df.shape, round(df.memory_usage().sum() / 1e+6,2))
    
    # track the progress
    utils.progressbar(len(file_list), ind + 1, prefix=prefix)

Shape: (7009728, 29) ; 1626.26 Mb [####################] 100% 


In [6]:
carrier_counts

{'PS': 83617,
 'TW': 3757747,
 'UA': 13299817,
 'WN': 15976022,
 'EA': 919785,
 'HP': 3636682,
 'NW': 10292627,
 'PA (1)': 316167,
 'PI': 873957,
 'CO': 8145788,
 'DL': 16547870,
 'AA': 14984647,
 'US': 14075530,
 'AS': 2878021,
 'ML (1)': 70622,
 'AQ': 154381,
 'MQ': 3954895,
 'OO': 3090853,
 'XE': 2350309,
 'TZ': 208420,
 'EV': 1697172,
 'FL': 1265138,
 'B6': 811341,
 'DH': 693047,
 'HA': 274265,
 'OH': 1464176,
 'F9': 336958,
 'YV': 854056,
 '9E': 521059}

## Map Reduce Way

In [7]:
from functools import reduce
from itertools import groupby 

datas = pd.DataFrame()
columns = ['Year','UniqueCarrier']
for ind, data_file in enumerate(file_list):
    df = pd.DataFrame(pd.read_csv(data_file, encoding='ISO-8859-1', memory_map=True, low_memory=False, usecols=columns))
    
    datas = datas.append(df)

In [8]:
carriers = datas.UniqueCarrier

# Map Reduce Algorithm phases

In [9]:
# Phase 1 :- Mapping

mapping = map(lambda x: (x, 1),carriers)

In [10]:
# Phase 2 :- Shuffling

sorted_mapping = sorted(mapping)

In [11]:
# Phase 3 :- Reducing

grouper = groupby(sorted_mapping, lambda p: p[0])
final = map(lambda 1: (1[0], reduce(lambda x, y: x + y, map(lambda p: p[1], 1[1]))),grouper)

print(list(final))

[('9E', 521059), ('AA', 14984647), ('AQ', 154381), ('AS', 2878021), ('B6', 811341), ('CO', 8145788), ('DH', 693047), ('DL', 16547870), ('EA', 919785), ('EV', 1697172), ('F9', 336958), ('FL', 1265138), ('HA', 274265), ('HP', 3636682), ('ML (1)', 70622), ('MQ', 3954895), ('NW', 10292627), ('OH', 1464176), ('OO', 3090853), ('PA (1)', 316167), ('PI', 873957), ('PS', 83617), ('TW', 3757747), ('TZ', 208420), ('UA', 13299817), ('US', 14075530), ('WN', 15976022), ('XE', 2350309), ('YV', 854056)]


## Conclusion

Here, we are getting counts of carriers in two ways.

1) Serial way - Looping through each record and counting each airline's flight

2) Map reduce way - map, reduce and sort, and collect way to count the flights

Map reduce way means Map Reduce algorithm contains Three important task mapping, sorting and reducing. With using these, split into small parts and asign them to multiple system.