# Neroma Kossi : 3A ENSAE, AS-DS
> Projet du cours d'éléments logiciels pour le traitement de données massives

In [1]:
%%html
<img src="Apache_Spark_logo.svg.png",width=10,height=10>

Our porject consists of parallelising the Latent Dirichlet Allocation (**LDA**) algorithm. The base paper is [PLDA, a parallel gibbs sampling based algorithm](https://www.semanticscholar.org/paper/PLDA%3A-Parallel-Latent-Dirichlet-Allocation-for-Wang-Bai/376ffb536c3dc5675e9ab875b10b9c4a1437da5d).

The main idea is  to run concurrent Gibb's sampling algorithms. This could be done via a distributed framework like MPI or mapReduce, we will be considering the last one in this project. Pyspark will be the standard library for the mapReduce architecture.

# Table of contents

>## 1. Create the Spark context

 > ## 2. Data pre-processing
  * **2.1. Load the data from file**
  * **2.2. Preprocessing**
  * **2.3. Building the vocabulary and the set of docs**
    * 2.3.1. Building the vocabularies (one per partition)
    * 2.3.2. Building docMaps : the set of all the documents (one per partition)
    * 2.3.3. Test if vocabularies and docMaps are correctly buil
  * **2.4. Prepare the data for the Gibbs samplers**
      * 2.4.1. Encode corpus
      * 2.4.2. Save the whole work
      
>## 3. Parallel LDA with mapReduce
  * **3.1. Set some parameters**
  * **3.2. Run the algorithm**
  * **3.3. Post-training analysis**

>## 4. Conclusion

In [2]:
from pyspark import SparkConf,  SparkContext  # Spark

In [3]:
import numpy as np # math ops
import os, shutil, json #File ops
import pickle as pkl # Serialiser

from datetime import datetime
import time

In [4]:
# Some utilities saved into custom modules

from nlp import preprocessAndGetTokens
from fileUtils import load, pickleLoader, dump, saveByPartition

# 1. Create the Spark Context

In [5]:
driver_memory = '2g' # Max memory available for the driver
executor_memory = '400m' # Max memory by executor
# We have to set those params before instantiating the SparkContext, other It would be too late
pyspark_submit_args = ' --driver-memory {0} --executor-memory {1} pyspark-shell'\
                                .format(driver_memory, executor_memory)
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [6]:
conf = SparkConf().setAll([
     ('spark.app.name', 'pLDA'), 
     ('spark.master', 'local[*]'), # the number of cores is set to max
    ('spark.scheduler.mode', 'FAIR')])

In [7]:
spark = SparkContext(conf = conf) # Here we create the Spark context

In [8]:
spark._conf.getAll()

[('spark.rdd.compress', 'True'),
 ('spark.app.name', 'pLDA'),
 ('spark.driver.memory', '2g'),
 ('spark.driver.host', '192.168.0.41'),
 ('spark.scheduler.mode', 'FAIR'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '37025'),
 ('spark.app.id', 'local-1549608841145'),
 ('spark.ui.showConsoleProgress', 'true')]

# 2. Data pre-processing


> Our dataset is made of **abc-news** article headlines available on [kaggle](https://www.kaggle.com/therohk/million-headlines). the data contains headlines published over a period of 15 years by the reputable Australian news source ABC (Australian Broadcasting Corp.). 

> Here, we will foucus on  infering interresting topics from this corpus.

## 2.1. Load the data from file

In [9]:
def processDoc(doc):
    """This is a wrapper that calls the preprocessAndGetTokens function. The latest function will apply 
    some basic nlp tehchnics on the paper's abstract : lowercase-isation, stopwords removing, stemming..."""
    
    return np.array(preprocessAndGetTokens(doc))

In [10]:
nbPartitions = 10  # Set the number of partitions, this is important as our Gibbs sampler is designed to 
                # lunch one sampler per partition 

> **Let's read the data**

In [11]:
# data = spark.pickleFile("corpus/bigSample/part-00000")
data = spark.textFile("corpus/abc-news/abc-news.csv")\
            .repartition(nbPartitions)\
                            .map(lambda x : tuple(x.split(",")))

In [12]:
%%time
# fomart == (doc_id, doc_abstract, doc_title)
data.take(3)

CPU times: user 892 µs, sys: 10.8 ms, total: 11.7 ms
Wall time: 2.21 s


[('000090-20030219', 'man jailed over keno fraud'),
 ('000091-20030219', 'man with knife hijacks light plane'),
 ('000092-20030219', 'martin to lobby against losing nt seat in fed')]

In [13]:
# %%time
# if os.path.exists("matrix/docTitles/") :
#     shutil.rmtree("matrix/docTitles/")
# data.map(lambda x :  (x[0], x[2])).saveAsPickleFile("matrix/docTitles/") # Save doc titles

## 2.2. Preprocessing

In [14]:
%%time

#Now we do all the preprocessing, and save the dataset
folder = "corpus/train/"
if os.path.exists(folder) :
    shutil.rmtree(folder)
    
data = data.mapValues(processDoc)\
                    .filter(lambda x : len(x[1]) > 0)\
                    
data.saveAsPickleFile("corpus/train/", 500)

CPU times: user 9.41 ms, sys: 4.93 ms, total: 14.3 ms
Wall time: 14.6 s


In [15]:
data.take(1) # A sample of the tokenized dataset

[('000090-20030219', array(['fraud', 'keno', 'jail', 'man'], dtype='<U5'))]

> ***Here, our dataset is in the primal format `(docId, docTokens)`. Next, we will assign a random topic to each word in a document. We will also need to build the Vocaulary and the set of the documents.*** 

## 2.3. Building the vocabulary and the set of docs (one per partition)

**Reloading and partionning the dataset**

In [16]:
corpus = spark.pickleFile("corpus/train" ).repartition(nbPartitions)
corpus.getNumPartitions()

10

In [17]:
%%time
corpus.take(1)

CPU times: user 1.14 ms, sys: 8.25 ms, total: 9.38 ms
Wall time: 2.39 s


[('000920-20030223',
  array(['pope', 'war', 'avert', 'urg', 'blair'], dtype='<U5'))]

### 2.3.1. Build the vocabularies (one per partition)

In [18]:
import importlib, builder
importlib.reload(builder)

<module 'builder' from '/home/nerk/Documents/3A_ENSAE/mapReduceLda/builder.py'>

In [19]:
from builder  import makeVocabularies, makeVocabulariesFolder, getUniqueWords, getUniqueWords2

In [20]:
makeVocabulariesFolder() # Instantiate the vocabularies' folder

In [21]:
%%time

# Here we compute the set of unique words. As word can sometimes be very long, we'd rather retain only their ids
# In next steps, we will assign to each word a number ranging from 0 to V-1, where V == size of ours vocabs
uniqueWordsByPartition = corpus.mapPartitionsWithIndex(getUniqueWords2).collect()

CPU times: user 13 ms, sys: 14 ms, total: 27 ms
Wall time: 2.52 s


In [22]:
# corpus.glom().map(len).collect()

In [23]:
# Number of documents & words per partition

L = [{"Partition": "%02d"%i, "ndocs": len(x[0]), "nvocabs": len(x[1])} for x, i 
             in zip(uniqueWordsByPartition, range(nbPartitions))  ]
L[:5]

[{'Partition': '00', 'ndocs': 6, 'nvocabs': 4},
 {'Partition': '01', 'ndocs': 3, 'nvocabs': 3},
 {'Partition': '02', 'ndocs': 2, 'nvocabs': 4},
 {'Partition': '03', 'ndocs': 3, 'nvocabs': 7},
 {'Partition': '04', 'ndocs': 3, 'nvocabs': 3}]

In [24]:
print("Totoal docs : %d "%sum(l["ndocs"] for l in L))

Totoal docs : 30 


In [25]:
%%time
# Here we build the vocabularies, one per partition

makeVocabularies(uniqueWordsByPartition) # Build and save the vocabularies

Vocabulary 0 successfully built
Vocabulary 1 successfully built
Vocabulary 2 successfully built
Vocabulary 3 successfully built
Vocabulary 4 successfully built
Vocabulary 5 successfully built
Vocabulary 6 successfully built
Vocabulary 7 successfully built
Vocabulary 8 successfully built
Vocabulary 9 successfully built

 Global vocabulary  built too
CPU times: user 1.24 s, sys: 68.2 ms, total: 1.31 s
Wall time: 1.05 s


In [26]:
del uniqueWordsByPartition # free up somme memory

### 2.3.2. Make docMaps :  the set of all the documents

In [27]:
from builder import makeDocsMaps, makeDocsMapsFolder

In [28]:
makeDocsMapsFolder() # Instantiate the documents' folder

In [29]:
%%time

corpus.mapPartitionsWithIndex(makeDocsMaps).collect()

CPU times: user 7.91 ms, sys: 0 ns, total: 7.91 ms
Wall time: 819 ms


['docMap 0 successfully built',
 'docMap 1 successfully built',
 'docMap 2 successfully built',
 'docMap 3 successfully built',
 'docMap 4 successfully built',
 'docMap 5 successfully built',
 'docMap 6 successfully built',
 'docMap 7 successfully built',
 'docMap 8 successfully built',
 'docMap 9 successfully built']

### 2.3.3. Test if vocabularies and docMaps are correctly built

As voacabularies & docMaps was successfully built, let's load them

In [30]:
%%time
vocabAll = load("matrix/vocabulary/vocabAll")

vocabs = [load("matrix/vocabulary/vocab__%04d__"%ind) for ind in range(nbPartitions)] 

CPU times: user 413 ms, sys: 28.3 ms, total: 441 ms
Wall time: 459 ms


In [31]:
print("Total words in Vocab : ", len(vocabAll))

Total words in Vocab :  20715


In [32]:
%%time
from builder import loadDocsAll
docsAll = loadDocsAll(nbPartitions)

docs = [load("matrix/docsMap/docs__%04d__"%ind) for ind in range(nbPartitions)] 

CPU times: user 137 ms, sys: 20.5 ms, total: 158 ms
Wall time: 159 ms


In [33]:
%%time
nbDocs = list(map(len, docs)) # Number of documents per partition
nbVocabs = list(map(len, vocabs)) # Number of unique words (vocabulary) per partition
print(nbDocs[:2], nbVocabs[:2])

[14993, 15008] [8935, 8976]
CPU times: user 236 µs, sys: 29 µs, total: 265 µs
Wall time: 205 µs


## 2.4. Prepare the data for the training step
> This step involves encoding the corpus and adding topics : using ids instead of full text

### 2.4.1. Encoding the corpus

In [34]:
from builder  import encodeAddTopics

In [35]:
%%time 

# The corpius is in full text again, let's change it in the next step
corpus.take(1)

CPU times: user 5.82 ms, sys: 0 ns, total: 5.82 ms
Wall time: 47.1 ms


[('000920-20030223',
  array(['pope', 'war', 'avert', 'urg', 'blair'], dtype='<U5'))]

In [36]:
nbTopics = 10

In [37]:
# We can notice that all the words have been encoded into symbolic ids, topics  have been added too
corpus2 = corpus.mapPartitionsWithIndex(lambda ind, part : encodeAddTopics(ind, part,docs[ind],
                                                                           vocabs[ind], nbTopics), 
                                       preservesPartitioning = True)

In [38]:
%%time
corpus2.take(1) # Just word's and doc's ids now, topics have been added too

CPU times: user 8.34 s, sys: 226 ms, total: 8.56 s
Wall time: 9.23 s


[(0, (6724, array([6001, 8581,  522, 8373,  866]), array([2, 3, 1, 5, 3])))]

### 2.4.2. Save the whole work for the next step 

In [39]:
import fileUtils
importlib.reload(fileUtils)

<module 'fileUtils' from '/home/nerk/Documents/3A_ENSAE/mapReduceLda/fileUtils.py'>

In [40]:
from fileUtils import saveAsPickleFile, saveByPartition

In [41]:
%%time
# saveAsPickleFile(corpus2)
if os.path.exists("initial_train"):
    shutil.rmtree("initial_train")
corpus2.saveAsPickleFile("initial_train", 1)

CPU times: user 8.35 s, sys: 59.6 ms, total: 8.41 s
Wall time: 17.5 s


In [42]:
# %%time

# corpus2.mapPartitionsWithIndex(lambda ind, part :
#                     saveByPartition(ind, part, "corpus/train2", batchsize=10))\
#                             .collect()

> Here is the end of the data preprocessing, the data is in the right format now and we can run our `Gibbs samplers`. Let's sart

In [43]:
del data, corpus, corpus2 # free up some memories

# 3. Parallel LDA (mapReduce version)

> Here the ML part

## 3.1. Define some parameters

In [44]:
nbVocabAll = len(vocabAll)
alpha = 0.5
beta = 0.5

In [45]:
from builder  import init

In [46]:
# from builder import makeConfig, updateConfig, get_now
# makeConfig(id = "all", countWordsUpdated = {str(ind):False for ind in range(nbPartitions)}, time = get_now())

## 3.2. Training

In [47]:
import importlib, model, builder, fileUtils
importlib.reload(model)
importlib.reload(builder)
importlib.reload(fileUtils)
from fileUtils import saveAsPickleFile
from model import pldaMap0
from builder import updateCountWordsAll, init

In [48]:
# pldaMap(0, 1, alpha, beta, len(vocabAll), nbTopics)

In [49]:
# rdd = spark.pickleFile("pickle/")
# rdd.getNumPartitions()

In [50]:
rdd = spark.pickleFile("initial_train")
# (doc_id, doc_words, doc_topics) <--- the format
rdd.take(1)

[(3, (14685, array([8740, 1817, 3997, 1154]), array([2, 3, 1, 5])))]

In [None]:
%%time

t0 = time.time()
rdd = spark.pickleFile("initial_train").partitionBy(nbPartitions).map(lambda x: x[1])
init(rdd, vocabs, nbDocs, nbVocabs, len(vocabAll), nbTopics)


for i in range(50):
    rdd = rdd.mapPartitionsWithIndex(lambda ind, part : pldaMap0(ind, part, alpha, beta, nbVocabAll, nbTopics),
                       preservesPartitioning= True )
    saveAsPickleFile(rdd)
    rdd = spark.pickleFile("pickle/").partitionBy(nbPartitions).map(lambda x: x[1])
    updateCountWordsAll()
    if i%10 == 0 :
        print("iteration : {0}, Elapsed : {1}".format(i, time.time() - t0))
print("Total time : {}".format(time.time() - t0))

iteration : 0, Elapsed : 35.27850651741028
iteration : 10, Elapsed : 339.45840883255005


## 3.3. Post-training analysis

In [None]:
words = np.array(list(vocabAll.items())) 

In [None]:
countWords = load("matrix/countWords/words_all")
countWords = countWords/countWords.sum(0)

In [None]:
import pandas as pd

In [None]:
ntop = 15
order = np.argsort(countWords, 0)[::-1]

topics = pd.DataFrame(words[order[:ntop], 0])
topics.columns = ["topic%i"%i for i  in topics.columns]
topics

> As we can see, the model successfully finds meaningful topics in less than hundred gibbs sampling steps. The `topic0` seems to talk about **car crashes**, the `topic9` is clearly about **Iraq** ...

# 4. Conclusion

>This project is very educational. Not only did it allow us to deepen our knowledge of Spark, but also to implement one of the most widely used topic modeling algorithms.

>The introduction of parallel Gibbs sampling by Wang & Al. helps to bypass the sequential nature of MCMC algorithms and to take advantage of the power of tools such as Spark.

>A next step would have been to be able to analyze the speed-up gained via this parallelized scheme.