# Big Data Analytics Final

## Pubmed Central Topic Visualization

### Leonardo Neves 
### AndrewID: lneves

For this task, I have decided to divide my problem into some subtasks:

#### 1. Preprocessing
For the preprocessing part, that will be done offline, I will tokenize the documents, remove, stopwords, punctuation, numbers and special characters and stem the words. I will them store them back into a file.
#### 2. TF-IDF
Here I will calculate the TF-IDF of all files and documents. I plan on using this metric as a measure of importance when making the term correlations. Instead of summing each co-ocurrence as one, I plan on summing the TF-IDF values, so that my related terms have more chances of being relevant for the document and being a topic.

Here we make the assumption that terms with higher TF-IDF would have a higher probability of being a topic.
#### 3. Input handling
By having computed the TF-IDF, we can work on handling the input. The user input will be preprocessed like the data we have and we will look for all the documents which the input term occur. From those documents, we will find the other terms that co-occur with this term and rank them by the sum of the the tf-idf values. From this, we have the 10 top ocurrences per term. We can now do the same for this 10 terms to find the second level of co-ocurrences and build the visualization.

#### 4. Data Visualization
From the coocurrences we find, we will generate the flare.json and create the visuzalization using D3.


### Step 1 - Preprocessing

For the preprocessing part, I have used the nltk library to create functions that would tokenize, remove stopwords and special characters and stem the words.  joblib was used to make this process parallel and use all the cores I have to compute the results. For the stopwords problem, we have seen some research related stopwords such as et, al and fig. A list with those stopwords was created.

In [109]:
'''Same code as in preprocess.py'''
import os,fnmatch
from nltk import word_tokenize
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
import re
from joblib import Parallel, delayed

def process_term(term):
    '''Process an input term. Throws ValueError exception if term could not be processed.'''
    processed_list = process_tokens([term.lower()])
    if len(processed_list) == 0:
        raise ValueError('Input argument is a stopword or does not exist on the corpus')
    return processed_list[0]

def process_tokens(tokens):
    '''Treats a list of tokens, removing stopwords, special chars and stemming the terms. 
    For single term, use process_term'''
    
    research_stopwords = set(["et","al","fig","nm","imag"])
    
    no_special_and_stopwords = [re.sub('[^A-Za-z0-9]+', '', word) for word in tokens if (word not in stopwords.words('english') 
                                                                                         and word not in research_stopwords)]
    #stem the terms
    porter_stemmer = PorterStemmer()
    return[porter_stemmer.stem(word) for word in no_special_and_stopwords if word != ""]

def process_file(filepath,f):
    '''For each file, creates a new file with tonekized and stemmed words.'''
    #open original file and create new file
    current = open(filepath + os.sep + f,"r")
    proc_file = open(os.getcwd() + os.sep +"processedDocuments"+os.sep + "process_"+f,"w")
    #tokenize the words and remove tabs and eol
    tokens = [word.strip("\n").strip("\t").strip() for word in word_tokenize(current.read().decode("utf8")) if word.strip("\n").strip("\t").strip()!= ""]
    #preprocess tokens
    stemmed_words = process_tokens(tokens)
    #persist changes to disk
    proc_file.write(" ".join(stemmed_words))
    proc_file.close()
    current.close()
    
def preprocess_current_folder():
    #if output folder does not exist, creates it
    if not os.path.exists(os.getcwd() + os.sep +"processedDocuments"):
        os.makedirs(os.getcwd() + os.sep +"processedDocuments")
    #Walk through all folders and subfolders looking for txt files
    for folder in os.listdir(os.getcwd()):
        #skip the folder with processed documents
        if folder == "processedDocuments":
            continue
        if os.path.isdir(folder) and not folder.startswith("."):

            for subdir, dirs, files in os.walk(os.getcwd() + os.sep + folder):

                for dir in dirs:
                    filepath = subdir + os.sep + dir
                    #parallel for to handle multiple files at once
                    Parallel(n_jobs=-1)(delayed(process_file)(filepath,f) for f in os.listdir(filepath) if f.endswith(".txt"))

### Step 2 - TF-IDF
From the extracted values using this python code, I have adapted a code I have built from a previous class to compute TF-IDF using Hadoop. For this code, I have created five passes of mapreduce. The first just compute a word frequency for each term. The second computes the per document word count. The result from the first two passes is used on the third pass to compute the TF-IDF and, in the fourth pass, we just sort the values per tf-idf and store them in Cassandra. 

The fifth pass was created before I had the idea of using Cassandra to optimize the process, so it would create a map from each word to the documents they show up. After I have finished this part, I realized that using Cassandra with the proper indexes would allow me to get the two queries I wanted, the documents a word appears and the words on a document, much easier and faster, since the hardwork would be done by the optimized Cassandra CQL.

The code for this part can be found on the java_code folder. Please read the readme instructions to run it.

The code inside this jar will create a Cassandra keyspace and the necessary column family for the data to be stored. From this, we will be able to run our python code on the output. Be aware that the mapreduce passes create temporary results on the results folder, please do not move those files before the job is complete.

### Step 3 - Input Handling

For this step, we will create the functions to perform the queries from cassandra. We also create the function to, given a term, create the flare.json for the visualization part.

In [110]:
from cassandra.cluster import Cluster
import json
#Code similar to create_visualization.py
def process_term(term):
    '''Process an input term. Throws ValueError exception if term could not be processed.'''
    processed_list = process_tokens([term])
    if len(processed_list) == 0:
        raise ValueError('Input argument is a stopword or does not exist on the corpus')
    return processed_list[0]

def connectToCluster():
    KEYSPACE = "big_data_final"
    cluster = Cluster(protocol_version=2)
    session = cluster.connect(KEYSPACE)
    return session

def getDocuments(term,session):
    query = "select document FROM doc_word_map where word = '%s' ;"%term
    rows = session.execute(query)
    return [row.document for row in rows]

def getWordsAndTFIDF(document,session):
    query = "select word, tfidf from doc_word_map where document = '%s';"%document
    rows = session.execute(query)
    return [(row.word,row.tfidf) for row in rows]
def generate_json(term,session, words_per_layer = 5,parent_docs = [],level = 1,layers = 2,seen_set = set()):
    print term
    if level == 0:
        process_term(term)
    docs = getDocuments(term,session)
    if len(docs) == 0:
         raise ValueError('Input argument is a stopword or does not exist on the corpus')
    occ_map = {}
    #only get documents where they co-occur, so each level always co-occurs with previous ones
    if len(parent_docs) > 0:
        docs = list(set(docs) & set(parent_docs))
    #add word to seen_set
    seen_set.add(term)
    research_stopwords = set(["et","al","fig","nm","pdb","imag"])
    for doc in docs:
        word_tfidf =  getWordsAndTFIDF(doc,session)
        for w in word_tfidf:
            if w[0] in seen_set or w[0] in research_stopwords:
                continue
            if not w[0] in occ_map:
                occ_map[w[0]] = 0.0
            occ_map[w[0]] += w[1]
        
    order = sorted(occ_map, key= occ_map.get)[::-1]
    flare_dict = {}
    flare_dict["name"] = term
    flare_dict["children"] = []
    
    for w in order[:words_per_layer]:
        
        if level == layers:
            level_dict = {}
            level_dict["name"] = w
            level_dict["size"] = occ_map[w]
            flare_dict["children"].append(level_dict)
        else:
            level_dict = generate_json(w,session,words_per_layer = words_per_layer,parent_docs = docs,level = level+1,layers = layers,seen_set = seen_set)
            level_dict["size"] = occ_map[w]
            flare_dict["children"].append(level_dict)
    #after full chain related to word has been computed, remove word from seen_set
    seen_set.remove(term)
    if level == 1:
        with open('flare.json', 'w') as outfile:
            json.dump(flare_dict, outfile)
    
    else:
        return flare_dict

After creating the methods, we create a command line interface for the user to input the term. Here, the user can input the term, the number of co-occurrences per term and the number of layers.

In [112]:
session = connectToCluster()
def get_user_input():
    while True:
        try:
            term = raw_input("Please input the term you want to visualize: ")
            if term == "":
                break
            words_per_layer = input("Please input the numbers of co-occurrences per term: ")
            layers = input("Please input the numbers of layers for the visualization: ")

            generate_json(term,session,layers = layers,words_per_layer = words_per_layer)
            print "Created Json!"
            break
        except ValueError:
            print "Input argument is a stopword or does not exist on the corpus, please try again"
get_user_input()


Please input the term you want to visualize: 


In [113]:
from IPython.display import IFrame

def show_plot(file_path,w,h):
    with open(file_path,'r') as f:
        s=f.read()
    return IFrame('files/'+file_path,w,h)

I have faced difficulties with the size of the data. My computer do not have disk space to hold all the data, what makes even the sampling difficult, since I can't sample from a big number of folders. Also, even by making preporcessing parallel, it still takes several hours to process a reasonable amount of documents. On the cluster, joblib did not work and the preprocessing was not parallel, what took a really long time. I have also faced problems with Java heapspace on my computer and regarding the splitsize on the cluster, mostly because of the amount of data.

### Step 4 - Visualization

For our first test, we computed our flare.json with 10 terms for reach word and 4 layers. We got the following result: (for this one, I will be showing a picture because it makes the notebook to be slow when rendered).
<img src="html_final/1000.png">


It is clear that the image has too much information. Also, there are some repetitions within the same chain. Protein -> gene -> protein is not as informative as we expected since it is completely redundant. We will filter those kinds of situations.

In addition, words like et, al, pdb(As I Googled, it is a database for proteins. Not sure if we should filter this one) and image should also be considered domain stopwords and will be filtered by our scripts. 

On an interactive map, however, this number of terms is not so bad. We can zoom in for the terms we are looking for and, as my json also carries the size of each term co-occurrence confidence, analyze their contribution. We can see an example here:




In [125]:
show_plot("html_final/interactive_1000.html",600,900)

This Visualization might not be accurate since it was only run with around 13k documents. With a larger corpus, the results could be different.

We now try a smaller number, having a 4 co-occurences per term and only 2 layers.



In [129]:
show_plot("html_final/4_2_dendogram.html",900,800)

In [None]:

This looks better. We can see that we still find some branches to repeate the words of other branches, but no more stopwords are shown and we do not have repetition on the same branch. Let's try a different type of visualization.


In [126]:
show_plot("html_final/4_2_circle.html",900,800)

This is an interesting plot, since we can check each layer and the contribution for each layer. I have tried it with the 4 layers experiment, but it gets really slow because of the number of terms. Let's try for a different word as input.

In [127]:
show_plot("html_final/6_2_dna.html",900,800)

We have looked into dna as the input term. Instead of 4 terms per word, I have decided to look into 6, in order to understand how many terms would be reasonable for this visualization. This is pretty interesting, but has the drawback of the words being rotated. A visualization that I found really interesting was this tree one.

In [128]:
show_plot("html_final/tree2.html",900,800)

With this one, although it is not possible to know the confidence level of each word, we are able to expand exactly on each branch alone, making it easier to follow the co-occurences.

By searching for the terms on Google, it seems that all of the terms I found make sense. For only having a smaller number of documents, the repetitions are expected. Furthermore, I was expecting to have uninteresting words on this stage and was planning of trying POSTagging on the initial input and only get the nouns and verbs to the output.  Since I am already only having nouns and verbs as results, I will not pursue this experiment.

Let's now run an example on a subsample of 15k documents. I didn't do it before because it took a long time to run, since the cluster do not allow me to use joblib for the preprocessing and I can't have all the folder to sample on my personal machine. This new subset has 3 documents from each folder.

In [119]:
get_user_input()

Please input the term you want to visualize: 


In [120]:
show_plot("html_final/tree1.html",900,800)

We can see that are differences among the top co-occurences. This might be because of the way sampling was done. We got only 3 documents per folder and, assuming each folder talks about a particular set of topics, we have a really small subset of documents that talk about protein. On the other hand, by using the previous sampling, we got all documents from the same folder, so the topics were more easily found for "protein". The drawback of the previous method was that we limited the options for the user, since they could only ask about topics from a specific set of papers. On the other hand, we now have a high number of terms that can be searched. mrsa was not available at our first sampling technique but now we can look for it (even if it has only 26 documents with the topic). We will also Try the bubble chart, to see if it helps understand the result better.

In [122]:
get_user_input()

Please input the term you want to visualize: 


In [123]:
show_plot("html_final/tree_mrsa.html",900,800)

In [124]:
show_plot("html_final/bubbles.html",1000,800)

By reading on the topic, I found that all the terms make absolute sense. We might be missing some important terms, but this could be because we do not have enough papers on the subject for the important terms to appear often.

The bubble chart is interesting because we can see the confidence level of each topic. Clearly, the term smt19969, which is an antimicrobial agent, is more of a trending topic than pji, which is a specific infection that can be caused by MRSA. Probably, because the smt19969 is a common treatment, it appears on more documents (and is important for each of those documents) than pji, that probably is the subject of a fewer number of papers. Despite of that, it is really confusing to understand the relation between the words. It would be better to stick with our two visualizations, the expanding tree and the sunburst interactive visualization.

### Conclusion

This was a really interesting project for us to understand how dealing with huge amounts of data might make our problems much more difficult. It was able to understand some limitations of our tools, such as Hadoop and Cassandra. Hadoop was not able to perform well on a single machine, even if it is a powerful machine like the cluster. This is because it is supposed to work on commodity hardware and the default configurations are made for each datanode to do small work, rather than creating a lot of work for a single node. Cassandra is the same thing, it has limitations of the size of a query result on a single node, what made it harder for me to use the cluster and handle more data.

Understanding how to handle text data was also really interesting. I put into practice concepts like stemming, tokenization, POSTagging (my first experiment was using it, but it was too simple to be here), stopwords removal ( and not only the default ones, but the ones related to the domain) and character removal.

Using Cassandra to do what Cassandra does better, queries, saved me a lot of effort on coding the mappings and probably made the computation to be more efficient.  Using Hadoop for the TF-IDF might not have brought me the most efficient program, but I wanted to know better about writting map-reduce and Hadoop to Cassandra integration. I think, from the learning perspective, this was better than looking for implementations of TF-IDF.

My code is organized like this. For the Hadoop part, the project is inside the folder java_code. The preprocess.py and create_visualization.py scripts are inside python_code. create_visualization.py will create the flare.json file and, if there is an HTTPServer running on http://127.0.0.1:8080, it will open two visualizations on your browser.

More information on running the code on the readme.md file

