Spark Environment

In [0]:
# install java libs and spark.
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
! tar xf spark-2.4.4-bin-hadoop2.7.tgz
! pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"


**Mounting Drive**


In [15]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


**Util Program**


In [0]:
import sys


def progressbar(total, progress, prefix=""):
    """
    Displays or updates a console progress bar.
    Original source: https://stackoverflow.com/a/15860757/1391441
    """
    barLength, status = 20, ""
    progress = float(progress) / float(total)
    if progress >= 1.:
        progress, status = 1, "\r\n"
    block = int(round(barLength * progress))
    text = "\r{} [{}] {:.0f}% {}".format(
        prefix, "#" * block + "-" * (barLength - block), round(progress * 100, 0),
        status)
    sys.stdout.write(text)
    sys.stdout.flush()

**Data_handler Program**


In [0]:
import os
import glob
import shutil
import urllib3
import functools
import pandas as pd



DATA_SOURCE = 'http://www.rdatasciencecases.org/Data/Airline/'
DATA_DIR = os.path.join('data')


def download_handler(file_path, data_url):
    """
    downloads the data from stat computing website.
    """

    with urllib3.PoolManager().request('GET', data_url, preload_content=False) as r:
        if r.status == 200:
            with open(file_path, 'wb') as w:
                shutil.copyfileobj(r, w)
        else:
            print('internet connection error.')


def check_dataset():
    file_list = []
    for year in range(1987, 2009):
        file_path = '{}/raw_{}.csv.bz2'.format(DATA_DIR, year)
        file_list.append(file_path)

    if not any(map(os.path.exists, file_list)):
        return True
    else:
        return False


def download_dataset():
    """
    download all the years of flight data into data folder
    """
    if check_dataset():

        if not os.path.exists(DATA_DIR):
            os.mkdir(DATA_DIR)

        year_range = range(1987, 2009)
        for ind, year in enumerate(year_range):
            # vars
            data_url = '{}/{}.csv.bz2'.format(DATA_SOURCE, year)
            file_path = '{}/raw_{}.csv.bz2'.format(DATA_DIR, year)

            # download
            download_handler(file_path, data_url)
            
            # progress
            progressbar(len(year_range), ind + 1, 'download status: ')
    else:
        print('data downloaded. you can skip this step or delete data folder to download again.')


def read_as_dataframe():
    """
    Read all files into one single dataframe.
    PS: this should work, unfortunately, my computer is slow to load all into one. 
    I didn't try this code, but if this works, you only need to loop for the rows.
    """
    par_func = functools.partial(pd.read_csv, compression='bz2', encoding='ISO-8859-1', memory_map=True)
    file_list = glob.glob(os.path.join(DATA_DIR, '*.csv.bz2'))
    df = pd.concat(map(par_func, file_list))

    return df


**Importing Librarie**s


In [0]:
import os
import glob
import pandas as pd
import functools
import numpy as np
from functools import reduce
from itertools import groupby

**Downloading** **Dataset**


In [20]:
download_dataset()

download status:  [####################] 100% 


In [31]:
import os
os.getcwd()

'/content'

**Creating & Sorting data files**

Funtion glob.glob is used to list the data files with their full path and then got sorted by year.

In [0]:

# all files under data folder
file_list = sorted(glob.glob(os.path.join('data', '*.csv.bz2')))

In [0]:

carrier_counts = {}

**Loading the data as dataframe**

The whole data which is stored in 22 data files in zip format(.bz2) consists of the size of 12gb. The system random access memory was limited and do not allow to load the complete data. As suggested, only the data important for map reduce is loaded to surpass the difficulty.

In [0]:
df1 = pd.DataFrame()
for ind, data_file in enumerate(file_list):
    # read current data
    df = pd.DataFrame(pd.read_csv(data_file, encoding='ISO-8859-1', memory_map=True, low_memory=False, usecols=['Year', 'UniqueCarrier']))
    df1=df1.append(df)


**Running the job without map reduce**

The Unique Carrier and their frequency is obtained without using map reduce algorithm.

In [29]:
for ind, data_file in enumerate(file_list):
    # read current data
    df = pd.read_csv(data_file, encoding='ISO-8859-1', memory_map=True, low_memory=False)
    
    # unique airlines in dataset
    carriers = df.UniqueCarrier.unique()
    
    # update the global carrier_count
    for key in carriers:
        if key not in carrier_counts:
            carrier_counts.update({key: 0})
    
    # loop through each row in dataframe 
    for carrier in df.UniqueCarrier:
        carrier_counts[carrier] += 1

    # info 
    prefix ='Shape: {} ; {} Mb'.format(
        df.shape, round(df.memory_usage().sum() / 1e+6,2))
    
    # track the progress
    progressbar(len(file_list), ind + 1, prefix=prefix)

Shape: (7009728, 29) ; 1626.26 Mb [####################] 100% 


For loading data, first I used above program but it was loading only 7009728 observations so I used below program that loaded 123534969 observations.

In [24]:
%%time
carrier_counts = {}
# unique airlines in dataset
carriers = df1.UniqueCarrier.unique()
    
    # update the global carrier_count
for key in carriers:
    if key not in carrier_counts:
        carrier_counts.update({key: 0})
    
# loop through each row in dataframe 
for carrier in df1.UniqueCarrier:
    carrier_counts[carrier] += 1

# info 
prefix ='Shape: {} ; {} Mb'.format(
        df1.shape, round(df.memory_usage().sum() / 1e+6,2))
    
# track the progress
progressbar(len(file_list), ind + 1, prefix=prefix)

Shape: (123534969, 2) ; 112.16 Mb [####################] 100% 
CPU times: user 27.9 s, sys: 33 ms, total: 27.9 s
Wall time: 27.9 s


In [40]:
%%time
carrier_counts

CPU times: user 3 µs, sys: 1e+03 ns, total: 4 µs
Wall time: 5.48 µs


{'9E': 521059,
 'AA': 14984647,
 'AQ': 154381,
 'AS': 2878021,
 'B6': 811341,
 'CO': 8145788,
 'DH': 693047,
 'DL': 16547870,
 'EA': 919785,
 'EV': 1697172,
 'F9': 336958,
 'FL': 1265138,
 'HA': 274265,
 'HP': 3636682,
 'ML (1)': 70622,
 'MQ': 3954895,
 'NW': 10292627,
 'OH': 1464176,
 'OO': 3090853,
 'PA (1)': 316167,
 'PI': 873957,
 'PS': 83617,
 'TW': 3757747,
 'TZ': 208420,
 'UA': 13299817,
 'US': 14075530,
 'WN': 15976022,
 'XE': 2350309,
 'YV': 854056}

**Map Reduce way**

Map reduce way - map, reduce and sort, and collect way to count the flights

In [0]:
carriers = df1.UniqueCarrier

**Map Phase**

For each line of flight data, I extracted the carrier code and make a key value pair.

In [26]:
%%time
mapping=map((lambda x: (x, 1)), carriers)

CPU times: user 76 µs, sys: 2 µs, total: 78 µs
Wall time: 83.7 µs


**Shuffle and Sort Phase**


In [27]:
%%time
sorted_mapping=sorted(mapping)


CPU times: user 26.8 s, sys: 5.23 s, total: 32 s
Wall time: 32.1 s


**Reduce Phase**

In [28]:
%%time
grouper=groupby(sorted_mapping, lambda p:p[0])
final=map(lambda l: (l[0], reduce(lambda x, y:x+y,map(lambda p: p[1], l[1]))), grouper)
print(list(final))

[('9E', 521059), ('AA', 14984647), ('AQ', 154381), ('AS', 2878021), ('B6', 811341), ('CO', 8145788), ('DH', 693047), ('DL', 16547870), ('EA', 919785), ('EV', 1697172), ('F9', 336958), ('FL', 1265138), ('HA', 274265), ('HP', 3636682), ('ML (1)', 70622), ('MQ', 3954895), ('NW', 10292627), ('OH', 1464176), ('OO', 3090853), ('PA (1)', 316167), ('PI', 873957), ('PS', 83617), ('TW', 3757747), ('TZ', 208420), ('UA', 13299817), ('US', 14075530), ('WN', 15976022), ('XE', 2350309), ('YV', 854056)]
CPU times: user 34.3 s, sys: 2.99 ms, total: 34.3 s
Wall time: 34.3 s


**Conclusion**

In this assignment, I used Spark in Google Colab to do the mapreduce way. 