In [1]:
%%file Ex2.py
from mpi4py import MPI
import os
import fnmatch
import pandas as pd
import re
import numpy as np
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import string
import sys

from collections import Counter
from io import StringIO
import warnings
warnings.filterwarnings("ignore")

print_time = False

#Initialising Communicator
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

#Initialising Variables
token = []
tf_token = []
idf_token = {}
tfidf_token = []
token_count = []
tokdoc_worker = None
doc_num = 4


#Read Data
def read_data(directory):
    if print_time: start = MPI.Wtime()
    filelist = []
    text_doc = []
    filename = []
    for root,dirs,files in os.walk(directory):
        files = [os.path.join(root, f) for f in files]
        for file in files:            
            filename.append(os.path.basename(file))
            f = open(file,'rb')
            text_doc.append(''.join(map(str, f.readlines())))
    if print_time: print("Rank: ",rank,"Reading Data: ",
                         round(MPI.Wtime() - start,4))
    return text_doc

#Data Cleaning and Tokenisation
def clean_token(data):
    if print_time: start = MPI.Wtime()
    stopword_set = set(stopwords.words('english'))
    text_data = []
    for i,text in enumerate(data):
        text = word_tokenize(text)
        text = [word for word in text if word.isalpha()]
        text = [t.lower() for t in text]
        text = [word for word in text if word not in stopword_set]
        text_data.append(text)
    if print_time: print("Rank: ",rank,"Cleaning Data: ",
                         round(MPI.Wtime() - start,4))
    return text_data
        
#Calculate Term Frequency (TF)
def tf(data):
    if print_time: start = MPI.Wtime()
    count = [dict(Counter(token)) for token in data]
    tf_token = [dict(Counter(token)) for token in data]

    for doc in tf_token:
        token_num = len(doc)
        for token in doc:
            doc[token] = doc[token] / token_num 
    if print_time: print("Rank: ",rank,"TF Calculation: ",
                         round(MPI.Wtime() - start,4))
    return tf_token,count


def token_doc(tf_doc):
    tokdoc_freq = {}
    for tf_token in tf_doc:
        for token in tf_token:
            if token not in tokdoc_freq.keys():
                tokdoc_freq[token] = 1
            else:
                tokdoc_freq[token] = tokdoc_freq[token] + 1
    return tokdoc_freq

#Calculate Inverse Document Frequency (IDF)
def idf(tok_list,token_doccnt):
    if print_time: start = MPI.Wtime()
    idf_doccnt = {}
    for token in tok_list:
        idf_doccnt[token] = np.log(doc_num/token_doccnt[token])
    if print_time: print("Rank: ",rank,"IDF Calculation: ",
                         round(MPI.Wtime() - start,4))
    return idf_doccnt

#Calculate Term Frequency - Inverse Document Frequency (TF-IDF)
def tfidf(tokcnt,idf_token):
    if print_time: start = MPI.Wtime()
    tfidf_token = tokcnt
    for doc in tfidf_token:
        for token in doc:
            doc[token] = doc[token] * idf_token[token]
    if print_time: print("Rank: ",rank,"TF-IDF Calculation: ",
                         round(MPI.Wtime() - start,4))
    return tfidf_token
            
if rank == 0: #Master
    directory = "G:/DA - Hildeshim/DDA Lab/Exercise 2/Dataset/"
    data = read_data(directory)
    data_workers = np.array_split(data,size-1)
    #Tokenize and TF
    for workers in range(1,size):
        comm.send(data_workers[workers-1],dest = workers,tag=1) 
        doc_token = comm.recv(source = workers,tag=10)
        token.extend(doc_token)
        comm.send(doc_token,dest = workers,tag=2)
        tf_tok= comm.recv(source = workers,tag=20)
        tf_token.extend(tf_tok[0])
        token_count.extend(tf_tok[1])

    token_doccnt = token_doc(token_count)
    tokdoc_workers = np.array_split(list(token_doccnt.keys()),size-1)
    
    #IDF
    for workers in range(1,size):
        comm.send(tokdoc_workers[workers-1],dest = workers,tag=3)
        comm.send(token_doccnt,dest = workers,tag = 4)
        idf_tok = comm.recv(source = workers, tag=30)
        idf_token.update(idf_tok)
    token_count_workers = np.array_split(token_count,size-1)
    
    #TF-IDF
    for workers in range(1,size):
        comm.send(token_count_workers[workers-1],dest = workers,tag=5)
        comm.send(idf_token,dest = workers,tag=6)
        tfidf_tok = comm.recv(source = workers, tag=40)
        tfidf_token.extend(tfidf_tok)
    
else:#Worker
    data_worker = comm.recv(source=0,tag=1)
    comm.send(clean_token(data_worker), dest = 0,tag=10)
    tf_worker = comm.recv(source=0,tag=2)
    comm.send(tf(tf_worker),dest=0,tag=20)
    tokdoc_worker= comm.recv(source = 0,tag=3)
    token_doccnt = comm.recv(source = 0,tag = 4)
    comm.send(idf(tokdoc_worker,token_doccnt),dest = 0,tag=30)    
    tokcnt_worker = comm.recv(source = 0,tag = 5)
    idf_token = comm.recv(source = 0,tag = 6)
    comm.send(tfidf(tokcnt_worker,idf_token),dest = 0,tag=40)

Overwriting Ex2.py


In [2]:
!mpiexec -n 2 python Ex2.py

Data:  ['b\'From: strom@Watson.Ibm.Com (Rob Strom)\\n\'b\'Subject: Re: [soc.motss, et al.] "Princeton axes matching funds for Boy Scouts"\\n\'b\'Distribution: usa\\n\'b\'Organization: IBM Research\\n\'b\'Lines: 15\\n\'b\'\\n\'b\'In article <N4HY.93Apr5120934@harder.ccr-p.ida.org>, n4hy@harder.ccr-p.ida.org (Bob McGwier) writes:\\n\'b\'\\n\'b\'|> [1] HOWEVER, I hate economic terrorism and political correctness\\n\'b\'|> worse than I hate this policy.  \\n\'b\'\\n\'b\'\\n\'b\'|> [2] A more effective approach is to stop donating\\n\'b\'|> to ANY organizating that directly or indirectly supports gay rights issues\\n\'b\'|> until they end the boycott on funding of scouts.  \\n\'b\'\\n\'b\'Can somebody reconcile the apparent contradiction between [1] and [2]?\\n\'b\'\\n\'b\'-- \\n\'b\'Rob Strom, strom@watson.ibm.com, (914) 784-7641\\n\'b\'IBM Research, 30 Saw Mill River Road, P.O. Box 704, Yorktown Heights, NY  10598\\n\'', 'b\'From: keith@cco.caltech.edu (Keith Allan Schneider)\\n\'b\'Subje