# Importing libraries for Map Reduce Assignment

In [1]:
import os
import glob
import pandas as pd
import data_handler
import util
import functools
import numpy as np
from functools import reduce
from itertools import groupby

# Data Preparation

Downloaded the data from "http://stat-computing.org/dataexpo/2009/" website. Using user-defined library data_handler. The dowload status shows the task completed 100%.

In [2]:
data_handler.download_dataset()


download status:  [####################] 100% 


# Creating & Sorting data files

Funtion glob.glob is used to list the data files with their full path and then got sorted by year.

In [2]:
# all files under data folder
file_list = sorted(glob.glob(os.path.join('data', '*.csv.bz2')))

# Loading the data as dataframe

The whole data which is stored in 22 data files in zip format(.bz2) consists of the size of 12gb. The system random access memory was limited and do not allow to load the complete data. As suggested, only the data important for map reduce is loaded to surpass the difficulty.

In [3]:
#col_names=['Year', 'UniqueCarrier']
df1 = pd.DataFrame()
for ind, data_file in enumerate(file_list):
    # read current data
    df = pd.DataFrame(pd.read_csv(data_file, encoding='ISO-8859-1', memory_map=True, low_memory=False, usecols=['Year', 'UniqueCarrier']))
    df1=df1.append(df)

# Looking the shape of loaded data

The len() shows data consists of 123 million of records from year 1987 to 2008. Using the head and tail function reflects that the complete  data has been loaded. 

In [6]:
print(df1.tail())
print(df1.head())
len(df1)

         Year UniqueCarrier
7009723  2008            DL
7009724  2008            DL
7009725  2008            DL
7009726  2008            DL
7009727  2008            DL
   Year UniqueCarrier
0  1987            PS
1  1987            PS
2  1987            PS
3  1987            PS
4  1987            PS


123534969

# Running the job without map reduce

The Unique Carrier and their frequency is obtained without using map reduce algorithm.

In [30]:
%%time
carrier_counts = {}
# unique airlines in dataset
carriers = df1.UniqueCarrier.unique()
    
    # update the global carrier_count
for key in carriers:
    if key not in carrier_counts:
        carrier_counts.update({key: 0})
    
# loop through each row in dataframe 
for carrier in df1.UniqueCarrier:
    carrier_counts[carrier] += 1

# info 
prefix ='Shape: {} ; {} Mb'.format(
        df1.shape, round(df.memory_usage().sum() / 1e+6,2))
    
# track the progress
util.progressbar(len(file_list), ind + 1, prefix=prefix)

Shape: (123534969, 2) ; 112.16 Mb [####################] 100% 
Wall time: 1min


In [31]:
%%time
carrier_counts

Wall time: 0 ns


{'PS': 83617,
 'TW': 3757747,
 'UA': 13299817,
 'WN': 15976022,
 'EA': 919785,
 'HP': 3636682,
 'NW': 10292627,
 'PA (1)': 316167,
 'PI': 873957,
 'CO': 8145788,
 'DL': 16547870,
 'AA': 14984647,
 'US': 14075530,
 'AS': 2878021,
 'ML (1)': 70622,
 'AQ': 154381,
 'MQ': 3954895,
 'OO': 3090853,
 'XE': 2350309,
 'TZ': 208420,
 'EV': 1697172,
 'FL': 1265138,
 'B6': 811341,
 'DH': 693047,
 'HA': 274265,
 'OH': 1464176,
 'F9': 336958,
 'YV': 854056,
 '9E': 521059}

In [None]:
#df1.to_csv("result.csv") ## Saving the data in the system.

# Map Reduce way

Map reduce way - map, reduce and sort, and collect way to count the flights

In [5]:
carriers = df1.UniqueCarrier
len(carriers)


123534969

# Map Phase

For each line of flight data, I extracted the carrier code and make a key value pair as below.

In [27]:
%%time
mapping=map((lambda x: (x, 1)), carriers)


Wall time: 0 ns


In [28]:
uniqueValues = set(mapping)

In [29]:
for value in uniqueValues:  
    print(value)  
print(len(uniqueValues))

('TW', 1)
('HA', 1)
('UA', 1)
('PA (1)', 1)
('TZ', 1)
('FL', 1)
('ML (1)', 1)
('DL', 1)
('AS', 1)
('WN', 1)
('PI', 1)
('EV', 1)
('US', 1)
('YV', 1)
('AA', 1)
('XE', 1)
('F9', 1)
('OH', 1)
('MQ', 1)
('NW', 1)
('AQ', 1)
('EA', 1)
('PS', 1)
('CO', 1)
('9E', 1)
('B6', 1)
('OO', 1)
('DH', 1)
('HP', 1)
29


# Shuffle and Sort Phase

In this phase, I listed the pairs from the map phase and grouped all values of each key together. After this, I sorted the data by the key.

In [23]:
%%time
sorted_mapping=sorted(mapping)



Wall time: 4.85 s


# Reduce Phase

In this phase, I read each key and list of values from shuffle and sort phase. I also added the total # of ones in the carrier code's list together.

In [8]:
%%time
grouper=groupby(sorted_mapping, lambda p:p[0])
final=map(lambda l: (l[0], reduce(lambda x, y:x+y,
                                 map(lambda p: p[1], l[1]))), grouper)

print(list(final))

[('9E', 521059), ('AA', 14984647), ('AQ', 154381), ('AS', 2878021), ('B6', 811341), ('CO', 8145788), ('DH', 693047), ('DL', 16547870), ('EA', 919785), ('EV', 1697172), ('F9', 336958), ('FL', 1265138), ('HA', 274265), ('HP', 3636682), ('ML (1)', 70622), ('MQ', 3954895), ('NW', 10292627), ('OH', 1464176), ('OO', 3090853), ('PA (1)', 316167), ('PI', 873957), ('PS', 83617), ('TW', 3757747), ('TZ', 208420), ('UA', 13299817), ('US', 14075530), ('WN', 15976022), ('XE', 2350309), ('YV', 854056)]
Wall time: 24.8 s


# Conclusion

Map reduce is very functional algorithm where three parts of it can easily executed. In this assignment, I tried to implement map reduce algorithm into a >100 million rows of dataset. I have a dataset of size 1.6 GB (compressed, and 12 GB when uncompressed), with the records of nearly 120 million records. The goal is to use map-reduce algorithm to get the count of number of flights for each carrier.

For comparison, I used two ways of getting counts of carriers.

Looping through each record and counting each airline's flight.

Map reduce way - map, reduce and sort, and collect way to count the flights.

Hence it can be concluded from above, map-reduce does not only take less time but also easy to implement.

