# Introduction

In this project, we will play around with "map reduce" frameworks using data scraped from Wikipedia. We'll implement a simplified version of the `grep` command-line utility to search for data in 54 megabytes worth of articles scraped from Wikipedia (saved in the 'wiki' folder).

We will create three processes in total. We will run each process twice, with a 'slow' version processing the entire group of files in one batch, and a 'fast' version using a map reduce approach. We will them compare the time differences between 'slow' and 'fast' versions (and ensure they are identical). We'll test all the processes using the target word 'data.'

First, we import libraries and define our map reduce function, appropriately named `map_reduce`.

In [1]:
import math
import functools
from multiprocessing import Pool
import time
import re
import pandas as pd
import os

def make_chunks(data, num_chunks):
    chunk_size = math.ceil(len(data) / num_chunks)
    return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

def map_reduce(data, num_processes, mapper, reducer):
    chunks = make_chunks(data, num_processes)
    pool = Pool(num_processes)
    chunk_results = pool.map(mapper, chunks)
    return functools.reduce(reducer, chunk_results)

def run_fns(f1, f2):
    start = time.time()
    slow = f1([os.path.join('wiki', file) for file in files])
    end = time.time()
    slow_time = end - start

    start = time.time()
    fast = f2()
    end = time.time()
    fast_time = end - start
    
    assert slow == fast
    print(round(slow_time,2), round(fast_time,2), round(fast_time/slow_time,2))
    return (slow, fast)

In [2]:
# list the first five scraped files

files = os.listdir('wiki')
files[:5]

['Bay_of_ConcepciC3B3n.html',
 'Bye_My_Boy.html',
 'Valentin_Yanin.html',
 'Kings_XI_Punjab_in_2014.html',
 'William_Harvey_Lillard.html']

In [3]:
len(files)

999

In [4]:
# print the total number of lines by looping over every file

lines = 0
for file in [os.path.join('wiki', file) for file in files]:
    with open(file) as f:
        lines += len(f.readlines())
        
lines

499797

In [5]:
# print the total number of lines but this time by using the map_reduce function

def map_files(file_list):
    lines = 0
    for file in [os.path.join('wiki', file) for file in file_list]:
        with open(file) as f:
            lines += len(f.readlines())
    return lines

map_reduce(files, 8, map_files, lambda x,y: x+y)

499797

# `Grep` Processes

The first process goes through each file and check to see if the 'target' word appears in the file's HTML. The process returns a dictionary with the file name as key and the lines that contain the target.

In [6]:
def map_grep(file_list):
    greps = {}
    for i, file in enumerate(file_list):
        with open(file) as f:
            lines = [line for line in f.readlines()]
        for idx, line in enumerate(lines):
            if target in line:
                if file in greps: greps[file].append(idx)
                else: greps[file] = [idx]
    return greps

def reduce_grep(d1, d2):
    d1.update(d2)
    return d1

def map_reduce_grep(num_processes=8):
    file_names = [os.path.join('wiki', file) for file in files]
    return map_reduce(file_names, num_processes, map_grep, reduce_grep)

target = 'data'
grep_slow, grep_fast = run_fns(map_grep, map_reduce_grep)

0.51 0.32 0.63


# `Grep` Process with Case Insensitivity

This process goes through each file and check to see if the 'target' word appears in the file's HTML, but first converts the HTML to lower case.

In [7]:
def map_grep_insensitive(file_list):
    greps = {}
    for i, file in enumerate(file_list):
        with open(file) as f:
            lines = [line for line in f.readlines()]
        for idx, line in enumerate(lines):
            if target in line.lower():
                if file in greps: greps[file].append(idx)
                else: greps[file] = [idx]
    return greps

def map_reduce_grep_insensitive(num_processes=8):
    file_names = [os.path.join('wiki', file) for file in files]
    return map_reduce(file_names, num_processes, map_grep_insensitive, reduce_grep)

target = 'data'
grep_insensitive_slow, grep_insensitive_fast = run_fns(map_grep_insensitive, map_reduce_grep_insensitive)

0.72 0.44 0.62


Here, we'll print out the files for which the case insensitive process yielded more matches than the original process.

In [8]:
for file in sorted(list(grep_insensitive_fast.keys())):
    if grep_insensitive_fast[file] != grep_fast[file]:
        print(f"{file[5:-5]}: {len(grep_insensitive_fast[file]) - len(grep_fast[file])}")

1953E2809354_FA_Cup_qualifying_rounds: 1
83_(number): 1
A_Beautiful_Valley: 1
Acceptance_(Heroes): 1
Agaritine_gammaglutamyltransferase: 2
Alex_Kurtzman: 1
Amborella: 1
Antibiotic_use_in_livestock: 1
Appa_(film): 1
Avengers_Academy: 1
Bahmanabade_Olya: 1
Battle_of_Wattignies: 1
Benny_Lee: 1
Bias: 1
Bibiana_Beglau: 1
Blue_SWAT: 1
Boardman_Township_Mahoning_County_Ohio: 1
Brownfield_(software_development): 1
C11orf30: 1
C389cole_des_Mines_de_Douai: 1
Camp_Nelson_Confederate_Cemetery: 1
Claire_Danes: 2
Claudia_Neidig: 1
Cobble_Hill_Brooklyn: 1
Code_page_1023: 3
Colchester_Village_Historic_District: 1
Companys_procC3A9s_a_Catalunya: 1
Copamyntis_infusella: 1
Craig_Chester: 1
Cryptographic_primitive: 1
CurtissWright_Hangar_(Columbia_South_Carolina): 1
Daniel_Cerone: 1
Danish_Maritime_Safety_Administration: 1
Dean_Kukan: 1
Demographics_of_American_Samoa: 1
Derek_Acorah: 1
Devil_on_Horseback: 1
Dewoitine_D.21: 1
Don_Parsons_(ice_hockey): 1
Don_Raye: 1
Doumanaba: 1
Dragnet_(franchise): 6
Ek_Di

# `Grep` Process with Match Indices

The final process does the same as the previous one, except it returns the line number per file, and the index of the line (possibly more than one) for each match. 

In [9]:
def map_grep_indices(file_list):
    greps = {}
    for i, file in enumerate(file_list):
        with open(file) as f:
            lines = [line for line in f.readlines()]
        for idx, line in enumerate(lines):
            matches = [m.start() for m in re.finditer(target, line.lower())]
            if len(matches) > 0:
                for m in matches:
                    if file in greps: greps[file].append((idx, m))
                    else: greps[file] = [(idx, m)]
    return greps

def map_reduce_grep_indices(num_processes=8):
    file_names = [os.path.join('wiki', file) for file in files]
    return map_reduce(file_names, num_processes, map_grep_indices, reduce_grep)

target = 'data'
grep_indices_slow, grep_indices_fast = run_fns(map_grep_indices, map_reduce_grep_indices)

2.92 1.37 0.47


Finally, we'll convert the results to a `pandas` dataframe and sample 10 rows.

In [10]:
df = pd.DataFrame()
for file in grep_indices_fast:
    for entry in grep_indices_fast[file]:
        row = pd.DataFrame([{'file': file, 'line': entry[0], 'index' : entry[1]}])
        df = pd.concat([df, row])

df = df.reset_index()
df.sample(10).iloc[:, -3:]

Unnamed: 0,file,line,index
20587,wiki/Cyclone_Gamede.html,36,608
10039,wiki/Bulbophyllum_biflorum.html,121,453
3927,wiki/Tim_Spencer_(singer).html,143,159
19290,wiki/United_Nations_Security_Council_Resolutio...,392,1139
19277,wiki/United_Nations_Security_Council_Resolutio...,113,409
8398,wiki/List_of_lakes_in_Independence_County_Arka...,107,787
10662,wiki/Kendalia_Texas.html,62,593
9170,wiki/Asparuh_Peak.html,143,785
14756,wiki/List_of_Polish_consorts.html,825,645
6499,wiki/List_of_Bellator_MMA_events.html,2071,1152
