<a href="https://colab.research.google.com/github/sdkchris/Projects/blob/main/Calculate_TF_IDF_score_in_a_Distributed_setting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Calculate TF-IDF score in a Distributed setting

**Definition:**

TF-IDF or term frequency–inverse document frequency, is a numerical statistic that is intended to reflect how important a word is to a document in a collection or corpus. [Wikipedia]

It is often used as a weighting factor in searches of information retrieval, text mining, and user modeling. The tf–idf value increases proportionally to the number of times a word appears in the document and is offset by the number of documents in the corpus that contain the word, which helps to adjust for the fact that some words appear more frequently in general. tf–idf is one of the most popular term-weighting schemes today. 

**Methodology:**

We will use the concept of parrallel distributed sustems with MPI or Message Passing Interface, which is a standardized message-passing library interface specification. MPI is a very abstract description on how messages can be exchanged between different processes. 

codingame.com claims that MPI is good for parallelism and high performance computing. In a very straightforward approach you can parallelise a code to do SIMD parallelism : Single Instruction, Multiple Data. That means all of your process will be applying the same treatment on a big pool of data that will be distributed among them. But since MPI does not force you to launch only one program, it is also very convenient when trying to do MIMD parallelism : Multiple Instruction, Multiple Data.

  >Master-Worker architecture

**Business applications:**
* Sentiment analysis,
* Information retrieval, 
* Text summarization,
* keyword extraction,
* etc. 

**Data Used:**
* fetch_20newsgroups form sklearn datasets

## Step 1: Data cleaning and text tokenization

In [2]:
 !pip3 install mpi4py

Collecting mpi4py
  Downloading mpi4py-3.1.3.tar.gz (2.5 MB)
[K     |████████████████████████████████| 2.5 MB 4.6 MB/s 
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (PEP 517) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.3-cp37-cp37m-linux_x86_64.whl size=2185301 sha256=ea4a95f830dcb7d3376d5ff9a7c7754398f03018f5cfb6fc5838bf315e9dfc92
  Stored in directory: /root/.cache/pip/wheels/7a/07/14/6a0c63fa2c6e473c6edc40985b7d89f05c61ff25ee7f0ad9ac
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.3


In [3]:
import numpy as np
import re
import nltk
nltk.download('punkt')
nltk.download('stopwords')
from mpi4py import MPI
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from sklearn.datasets import fetch_20newsgroups

#we will work with Poter Stemmer for word stems 
poter_stemmer = nltk.PorterStemmer()

#mpi routines
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
master = 0

#text processing
tokens  = []

#replace unwated characters with an empty space and convert text to lower char
def text_process(text):
    #remove unwanted characters
    text = re.sub('[^a-zA-A]', ' ', text)
    text = text.lower()
    return(text)

#tokenize cleaned text and remove stopwords    
def txt_tokenize(text):
    #tokenize works by calling word_tokenize
    words = word_tokenize(text)
    #remove stopwords
    stop_words = set(stopwords.words('english'))
    words = [word for word in words if not word in stop_words]
    #capture only stems
    words = [poter_stemmer.stem(word) for word in words]
    return (words)
    
#set starting time
start_time = MPI.Wtime()

if rank == 0:
    #get dataset
    data_set = fetch_20newsgroups(subset='train', remove=('headers','footers','quotes'))
    #convert it into a diction format
    data_set = dict(data_set)
    #get number of documents
    data_array = data_set['data']
    
    #get number of slices to be sent to works
    d_slices = np.array_split(data_array,size)
    
    #send slice to workers 
    for worker in range (size-1): #for every worker except master
        comm.send(d_slices[worker+1],dest=worker+1)
    
    

else: #for every other worker who is not a master, do receive data
    
    
    worker_data = comm.recv(source = master)
    k = 0
    tokenized_dt = {}
    #for every slice of data starting from the first slice, process data
    for data in worker_data: 
        tokenized_dt[k] = txt_tokenize(text_process(data))
        #increament k
        k +=1 
        
    #after each worker has processes its corresponding slice, send it back to master
    dt = [v for v in tokenized_dt.values()]
    comm.send(dt,master)

    
#in case of only one process
if rank==0:
    t = 0
    tokenized_dt = {}
    for data in d_slices[0]:
        tokenized_dt[t] = txt_tokenize(text_process(data))
        t = t+1
    tokens = tokens+[v for v in tokenized_dt.values()]
    for i in range(size-1):
        w_data = comm.recv(source = i+1)
        tokens = tokens + w_data
#end time
end_time = MPI.Wtime()
proecessing_time = end_time - start_time
print('Processing time: {}'. format(proecessing_time))
       
print("\nThe Number of tokenized documents:"+str(len(tokens)))
#print("Sample Tokens for :"+str(data_array[0])+"\n"+str(tokens[0]))
#f = open("tokens.txt","w+")
#f.write(str(tokens))
#f.close()








[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Processing time: 57.79683037299998

The Number of tokenized documents:11314


## Step 2: Calculate Term Frequency (TF) 

Term frequency, tf(t,d), is the relative frequency of term t within document d,

<img src="https://wikimedia.org/api/rest_v1/media/math/render/svg/dd4f8a91dd0d28a11c00c94a13a315a5b49a8070" width=300 height=80>


* Where $ft,d$ is the raw count of a term in a document, i.e., the number of times that term $t$ occurs in document $d$. 

* Boolean "frequencies": tf(t,d) = 1 if $t$ occurs in $d$ and 0 otherwise;

* Term frequency adjusted for document length: $tf(t,d) = ft,d ÷ (number of words in d )$

* Logarithmically scaled frequency: $tf(t,d) = log (1 + ft,d)$
* Augmented frequency, to prevent a bias towards longer documents

source: [https://en.wikipedia.org/wiki/Tf%E2%80%93idf]


In [5]:
import numpy as np
from mpi4py import MPI

#mpi routines
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
master = 0

#calculate term frequency
term_f  = {}
#count frequency of words in documents
def term_freq(data):
    return ([[i, data.count(i)] for i in set(data)])
#set starting time
start_time = MPI.Wtime()

#message passing
if rank==0:
    
    
    print('Number of tokens received:' + str(len(tokens)))
    dt_slices = np.array_split(tokens,size)
    
    #send slices of the array to each workers 
    for worker in range(size-1):
        comm.send(dt_slices[worker+1],dest=worker+1)
    

else:
    #receove data
    worker_dt = comm.recv(source=master)
    #once data received, each worker will calculate term frenq
    tf = {}
    pointer = len(worker_dt)*rank 
    for val in worker_dt:
        tf[pointer] = term_freq(val)
        #move pointer for next worker
        pointer = pointer+1
    
    #send resultant data back to master
    comm.send(tf,dest= master)
    
#in case we have only one worker
if rank ==0:
    t = 0
    tf = {}
    for val in dt_slices[0]:
        tf[t] = term_freq(val)
        #increament index
        t = t+1
    
    #update the intial term frequency dictionary
    term_f.update(tf)
    
    for i in range(size-1):
        w_data = comm.recv(source = i+1)
        term_f.update(w_data)
    for j in term_f:
        term_f[j] = sorted(term_f[j])
    
    print('\nNumber of documents processes:' +str(len(term_f)))
    print('\nSample tokens term frequencies \n' +str(term_f[0]))
    
#end time
end_time = MPI.Wtime()
proecessing_time = end_time - start_time
print('Processing time: {}'. format(proecessing_time))
    

Number of tokens received:11314


  result = getattr(asarray(obj), method)(*args, **kwds)



Number of documents processes:11314

Sample tokens term frequencies 
[['addit', 1], ['anyon', 2], ['bodi', 1], ['bumper', 1], ['call', 1], ['car', 4], ['could', 1], ['day', 1], ['door', 2], ['e', 1], ['earli', 1], ['engin', 1], ['enlighten', 1], ['f', 1], ['front', 1], ['funki', 1], ['histori', 1], ['info', 1], ['know', 1], ['late', 1], ['look', 2], ['made', 1], ['mail', 1], ['model', 1], ['n', 1], ['name', 1], ['pleas', 1], ['product', 1], ['realli', 1], ['rest', 1], ['ricklin', 1], ['saw', 1], ['separ', 1], ['small', 1], ['spec', 1], ['sport', 1], ['tellm', 1], ['whatev', 1], ['wonder', 1], ['year', 1]]
Processing time: 8.542964453999957


## Step 3: Calculate Inverse Document Frequency (IDF)

The inverse document frequency is a measure of how much information the word provides, i.e., if it is common or rare across all documents. It is the logarithmically scaled inverse fraction of the documents that contain the word. 

<img src="https://wikimedia.org/api/rest_v1/media/math/render/svg/ac67bc0f76b5b8e31e842d6b7d28f8949dab7937" width=300 height=80>

* N: total number of documents in the corpus ${\displaystyle N={|D|}}$ 
* ${\displaystyle |\{d\in D:t\in d\}|}$ : number of documents where the term ${\displaystyle t}$ appears. 
* If the term is not in the corpus, this will lead to a division-by-zero. It is therefore common to adjust the denominator to $$ {\displaystyle 1+|\{d\in D:t\in d\}|}.$$


In [None]:
import math

#mpi routines
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
master = 0

#calculate inverse document frequency (IDF)
token_values =[]
IDF = {}
N = 0

#function to calculate idf
def idf(data, token_values):
    lst = []
    for d in data:
        count = 0
        for s in token_values:
            if len({d}.intersection(s)) > 0:
                count = count+1
        lst.append([d, math.log(N/count)])
    return(lst)
    
#set starting time
start_time = MPI.Wtime()

#message passing
if rank ==0:
    
    
    print('Tokens received:'+str(len(tokens)))
    N = len(tokens)
    for t in tokens:
        token_values.append(set(t))
    print('\nset token values:' +str(len(token_values)))
    
    dt_slices = np.array_split(tokens,size)
    
    #send slices to workers
    for worker in range(size-1):
        comm.send([dt_slices[worker+1], token_values], dest = worker+1)
        
        
else:

    N = len(tokens)
    #receive data
    worker_data = comm.recv(source=master)
    #calculate idf
    idf_ = {}
    pointer = len(worker_data[0])*rank
    
    for val in worker_data[0]:
        idf_[pointer] = idf(val,worker_data[1])
        #move pointer
        pointer = pointer+1
    #send data to master
    comm.send(idf_,dest = master)
    
    
if rank == 0:
    t= 0
    idf_={}
    for val in dt_slices[0]:
        idf_[t] = idf(val,token_values)
        #increament index
        t = t+1
    #update idf dictionary
    IDF.update(idf_)
    for i in range(size-1):
        w_data = comm.recv(source = i+1)
        IDF.update(w_data)
        
    for j in IDF:
        IDF[j] = sorted(IDF[j])
    print('\nNumber if documents processes:'+str(len(IDF)))
    print('\nSample tokens inv document frequencies\n' +str(IDF[0]))

#end time
end_time = MPI.Wtime()
proecessing_time = end_time - start_time
print('Processing time: {}'. format(proecessing_time))

Tokens received:11314

set token values:11314


  result = getattr(asarray(obj), method)(*args, **kwds)


## Step 4: Calculate Term Frequency Inverse Document Frequency (TF-IDF) scores

Formula:  
<img src="https://wikimedia.org/api/rest_v1/media/math/render/svg/10109d0e60cc9d50a1ea2f189bac0ac29a030a00" width=300 height=80>

In [None]:
#calculate TF-IDF of token in document
TF_IDF ={}

#set starting time
start_time = MPI.Wtime()

#message passing
if rank ==0:
    
    
    #slice both tf array and idf array into chunks to send to workers
    tf_slices = np.array_split(term_f[0:100],size)
    idf_slices = np.array_split(IDF[0:100],size)
    #send slices to workers
    for worker in range(size-1):
        comm.send([tf_slices[worker+1],idf_slices[worker+1]], dest = worker +1)
    
        
else:
    #receive data
    worker_data = comm.recv(source = master)
    
    #calculate tf-idf on slice received
    local_tf_idf = {}
    pointer = len(worker_data[0])*rank
    for val in worker_data[0]:
        local_tf_idf[pointer] = worker_data[0] * worker_data[1]
        #move pointer
        pointer = pointer + 1
    #send resultant product to master
    comm.send(local_tf_idf, dest = master)

    
#otehrwise 
if rank ==0:
    local_tf_idf={}
    pointer = 0
    #multipliply every value in tf_slice with a corresponding value in idf_slice
    for val in tf_slices[0]:
        local_tf_idf[pointer] = tf_slices[val][1]*idf_slices[val][1]
        #move pointer
        pointer = pointer + 1
    #update TF-IDF dict by adding all local_tf_idf values 
    TF_IDF.update(local_tf_idf)
    
    for j in range(size-1):
        #receive form next worker
        w_data = comm.recv(source= j+1)
        #update TF-IDF dict by adding all worker values 
        TF_IDF.update(w_data)
    print('\nNumber if documents processed:' + str(len(TF_IDF)))
    print('\nSample tokens TF_IDF\n' +str(TF_IDF[0]))
    
 #end time
 end_time = MPI.Wtime()
 proecessing_time = end_time - start_time
 print('Processing time: {}'. format(proecessing_time))
        