<h1> MapReduce implementation in Python for analyzing health news tweets </h1>

<h3> Objective </h3> 
To implement a MapReduce framework in Python for parallel processing.

<h3> Dataset </h3>

The data was collected in 2015 using Twitter API. This dataset contains health news from more than 15 major health news agencies such as BBC, CNN, and NYT.


<p>Source: https://archive.ics.uci.edu/ml/datasets/Health+News+in+Twitter</p>

<p>Reference: Karami, A., Gangopadhyay, A., Zhou, B., & Kharrazi, H. (2017). Fuzzy approach topic discovery in health and medical corpora. International Journal of Fuzzy Systems, 1-12.</p>


<h3> Setting up </h3>

In [8]:
import os 
import pandas as pd 
import time 
import math 
import functools 
from multiprocessing import Pool 

In [9]:
# getting the paths for files
src_dir = os.listdir('data/source')
src_files = [i for i in src_dir if os.path.isdir(i) is False]
src_folder = 'data/source'

raw_folder = 'data/raw'

In [10]:
# Data encoding

from chardet import detect

def get_encoding(file):
    """ Given a file path, find the encoding type of the file """
    with open(file, 'rb') as f:
        data = f.read()
    return detect(data)['encoding']

# Convert encoding of source files to utf-8 and stored as new files in raw folder 
for file_name in src_files: 
    src_file = os.path.join(src_folder, file_name)
    encoded_file = os.path.join(raw_folder, file_name)
    # get the original encoding 
    current_encoding = get_encoding(src_file)
    with open(src_file, 'r', encoding=current_encoding, errors='ignore') as f:
        with open(encoded_file, 'w+', encoding='utf-8', errors='ignore') as e: 
            data = f.read()
            e.write(data)

<h3> MapReduce framework </h3>

In this step, we will implement our MapReduce function for processing the data. The steps are: 
1. Break data into chunks
2. Use a mapper function to process each chunk of data 
3. Use a reducer function to combine results produced by mapper function 

The mapper and reducer functions can be tailored depending on the type of processing or analysis we want to perform on the data. The three steps described above can be generalised into a map_reduce function.


In [11]:
# Implement function to break data into chunks 
def create_chunks(data, num_chunks):
    """
    Break data into desired number of chunks 
    Args:
    data(list): The data to break into chunks 
    num_chunks(int): Number of chunks 
    
    Returns: 
    list: a list of chunks of data 
    """
    # calculate chunk size 
    chunk_size = math.ceil(len(data)/num_chunks) 
    return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

In [12]:
# Implement MapReduce function 
def map_reduce(data, num_processes, mapper, reducer): 
    
    """Run processes on a given set of data
    Args: 
    data: the data to process 
    num_processes: number of processes to run in parallel 
    mapper: the function to map to each process 
    reducer: the function to combine results of mapper 
    
    Returns:
    The global result produced from reducer. 
    
    """
    # break data into chunks 
    chunks = create_chunks(data, num_processes) 
    # apply mapper function to each chunk in parallel fashion 
    with Pool(num_processes) as pool: # create a pool of processes
        chunk_results = pool.map(mapper, chunks)
        # apply reducer function to results of mapper 
    overall_result = functools.reduce(reducer, chunk_results)
    return overall_result

In [13]:
# Reading in the encoded raw files
raw_dir = os.listdir('data/raw') 
raw_files = [i for i in raw_dir if os.path.isdir(i) is False]
data = []
for file in raw_files: 
    with open(os.path.join(raw_folder, file)) as f: 
        lines = [line for line in f.readlines()]
        data.append(lines)

<h3> Searching for occurence of a topic </h3> 

Given a keyword as topic, count the number of tweets and keep track of tweet IDs that contain the topic. 

In [14]:
import itertools 
# import the mapper and reducer functions
from mapper import mapper_count_topic 
from reducer import reducer_count_topic

# prepare the data 
tweets_from_all_agencies = list(itertools.chain(*data))

In [15]:
# Search for tweets with the keyword 'vaccines' 
result = map_reduce(tweets_from_all_agencies, 5, mapper_count_topic, reducer_count_topic)
result

{'keyword': 'vaccines',
 'count': 57,
 'ids': ['575049588454858752',
  '572575901025759232',
  '572528136426004480',
  '572509390789599232',
  '572504407927078913',
  '563170245184339969',
  '563156321470799872',
  '563133823014416384',
  '512386620340043777',
  '491373383708594177',
  '484092846803517440',
  '484084952745906176',
  '459761474647646208',
  '459453569813716992',
  '459425265501085696',
  '426072899012030464',
  '292382429497421824',
  '161992104862679040',
  '581215093372878850',
  '576030577008181248',
  '571026422896173056',
  '563093546367205376',
  '563002551097102338',
  '562335004802711552',
  '561860255005822976',
  '561278423424057344',
  '560096182631157760',
  '558336774599348224',
  '557406204474163200',
  '547533001677373441',
  '530331709284548608',
  '530076621752254466',
  '526770084224970752',
  '525303540869496833',
  '525293713279692802',
  '525209668550139904',
  '525001078635769856',
  '524591933864554497',
  '517369325003747328',
  '5173549372240076

<h3> Displaying the contents of the match</h3> 


In [16]:
def display_content_as_df(tweet_ids=[]):
    """ Take a list of tweet ids and return the content as dataframe""" 
    # iterate through each tweet 
    # split by |, check for matching of first element 
    # if matched, grap third element as content, append to list called contents 
    # return as a dictionary: ids, contents
    # return df from dict 
    matched_tweets = {'ids': [], 'contents': [] } 
    for tweet in tweets_from_all_agencies: 
        for tweet_id in tweet_ids: 
            split_tweet = tweet.split("|")
            if tweet_id in split_tweet: 
                matched_tweets['ids'].append(tweet_id)
                matched_tweets['contents'].append(split_tweet[2]) 
    return pd.DataFrame(matched_tweets)

In [17]:
ids = ['575049588454858752',
  '572575901025759232',
  '572528136426004480',
  '572509390789599232',
  '572504407927078913',
  '563170245184339969',
  '563156321470799872',
  '563133823014416384',
  '512386620340043777'] 

result_df = display_content_as_df(ids)
result_df.columns = ['Tweet ID', 'Content'] 
result_df

Unnamed: 0,Tweet ID,Content
0,575049588454858752,"With Ebola crisis easing, efforts to test new ..."
1,572575901025759232,74% of doctors surveyed said they have complie...
2,572528136426004480,Doctors often delay vaccines for young childre...
3,572509390789599232,Pediatricians and family medicine doctors feel...
4,572504407927078913,Vaccine-wary parents pressure doctors to delay...
5,563170245184339969,"When it comes to vaccines, random online comme..."
6,563156321470799872,The good: People believed @CDCgov's advice abo...
7,563133823014416384,"On the Internet, the @CDCgov and random online..."
8,512386620340043777,"Not having ANY Ebola drugs, vaccines is a prob..."
