# Cleaup

In this notebook, I will try to use functional programming side of python as much as possible.  
As this might make code easier to migrate to Spark platform.   
The first cell is to read the simple data file with respect to lines.(Just like what Hadoop MapReduce and Spark do!)

In [1]:
import re
import pandas as pd
from nltk.stem import PorterStemmer
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from itertools import groupby
import numpy as np
import math

with open('../data/data_simple.xml', 'r') as f:
    raw_data = f.readlines()

print('Number of simple data set size: {}'.format(len(raw_data)))

Number of simple data set size: 1001


In [2]:
# use regex to extract our data(id--content pair)
id_pattern = re.compile('<row Id=\"([\d]*)\"')
content_pattern = re.compile('Text=\"([\W\w]*)\"')
noise_pattern = re.compile('&[#]*[\w]*;')

def job_filter(input_str: str) -> bool:
    return id_pattern.search(input_str) and content_pattern.search(input_str)

def job_extract(input_str: str) -> tuple:
    postid = id_pattern.search(input_str).group(1)
    content = content_pattern.search(input_str).group(1)
    content = noise_pattern.sub('', content)
    return postid, content

def job_cleanup_format(input_tuple: tuple) -> tuple:
    postid, content = input_tuple
    return int(postid), content.strip()

# use functional programming approach as much as possible ... 
records_has_content = filter(job_filter, raw_data)
extracted_records = map(job_extract, records_has_content)
id_content_list = map(job_cleanup_format, extracted_records)

# make id_content_list a list for inspect
# we don't need to change it to list in production
id_content_list = list(id_content_list)
DOCUMENTS_COUNT = len(id_content_list)
print('The number of records now: {}'.format(len(id_content_list)))
id_content_list[:4]

The number of records now: 998


[(6,
  "The explicit cast to double in the first answer isn't necessary - identifying the constant as 5000.0 (or as 5000d) is sufficient."),
 (12, 'Binary Data in MYSQL'),
 (13, 'databasemysql'),
 (14, 'How do I store binary data in mysql?')]

In [3]:
tokenizer = RegexpTokenizer(r'\w+')
stemmer = PorterStemmer()
stop_words = set(stopwords.words('english'))

# to lower case, no punctuation, stemmed, filter stop words
# this python version is highly depend on nltk, may consider sth else later
def job_split_content(input_tuple: tuple) -> list:
    postid, input_str = input_tuple
    input_str = input_str.lower()
    raw_tokens = tokenizer.tokenize(input_str)
    stemmed_tokens = [stemmer.stem(token) for token in raw_tokens]
    stemmed_tokens = map(stemmer.stem, raw_tokens)
    stemmed_tokens_without_stopword = filter(lambda i: i not in stop_words, stemmed_tokens)
    return postid, list(stemmed_tokens_without_stopword)

# this step seems to use up a lot of time!
%time id_tokens = list(map(job_split_content, id_content_list))

CPU times: user 3.48 s, sys: 15.3 ms, total: 3.5 s
Wall time: 3.51 s


In [4]:
corpus_words = set()
for record in id_tokens:
    _, token_list = record
    corpus_words.update(token_list)

In [5]:
corpus_words = list(corpus_words)
WORD_COUNT = len(corpus_words)

In [6]:
def job_word_to_index(input_tuple):
    postid, tokens = input_tuple
    content_indexed = [corpus_words.index(token) for token in tokens] # [index1, index2, index1, index1, index3], for example
    content_freq = dict()
    for key, grouped in groupby(content_indexed):
        content_freq[key] = len(list(grouped))
    indexs = set(content_freq.keys())
    result_list = list()
    for i in range(WORD_COUNT):
        if i in indexs:
            result_list.append(content_freq.get(i))
        else:
            result_list.append(0)
        
    return postid, np.array(result_list)

%time id_freq_dicts = list(map(job_word_to_index, id_tokens))

CPU times: user 3.81 s, sys: 31.7 ms, total: 3.85 s
Wall time: 3.85 s


Now **id\_freq\_dicts** is a term frequency (TF) matrix. Next we will try to convert it into a TF-IDF matrix   
I would say IDF is the most time consuming part for now. Because it required aggregation among all records.(which is distributed among clusters)

In [7]:
def job_idf_for_word(word):
    word_index = corpus_words.index(word)
    
    # notice this step is actually performed by spark, not this whole function
    # the reason i use map instead of filter is that 
    # filter will return a huge data structure. but map will just return true or false.
    documents_contain_word = np.array(list(map(lambda i:i[1][word_index] > 0, id_freq_dicts)))
    # add all the true value up. the result is the document frequency (document count)
    document_frequency = np.sum(documents_contain_word)
    return math.log(DOCUMENTS_COUNT/document_frequency)
 
# get the idf array
%time idf_array = np.array([job_idf_for_word(word) for word in corpus_words])

CPU times: user 2.67 s, sys: 47.7 ms, total: 2.71 s
Wall time: 2.68 s


In [8]:
# multiple each tf with idf, we get tfidf matrix
%time id_tfidf = list(map(lambda input_tuple: (input_tuple[0], input_tuple[1] * idf_array), id_freq_dicts))

CPU times: user 24 ms, sys: 21.3 ms, total: 45.2 ms
Wall time: 46.7 ms


In [9]:
id_tfidf[0][1][id_tfidf[0][1]!=0]

array([ 5.80714099,  5.51945892,  6.90575328,  3.44001737,  2.46310202,
        4.60316818,  5.80714099,  2.86270201,  2.54904445,  6.90575328,
        4.82631173,  5.51945892])

In [11]:
print('The approximate time of our cleanup process is about :{}'.format(3.51+3.85+2.68+0.46))

The approximate time of our cleanup process is about :10.5
