# LSA in pyspark

In this notebook, we'll implement the Latent Semantic Analysis (LSA) in pyspark

In [36]:
from pyspark.mllib.linalg import Vectors,DenseVector
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.stem.snowball import SnowballStemmer
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.feature_extraction.text import CountVectorizer
from scipy.sparse import csr_matrix

## Pre processing and computing the tf-idf matrix

In [46]:
stemmer = SnowballStemmer("english")

def pre_process(line):
    undesired = ["[", "]", "/", "'", "''", '"', '""', "&", ";", ",", ".", ""]
    return [stemmer.stem(word) for word in word_tokenize(line) if word not in stopwords.words("english") and word not in undesired]
    
documents = sc.textFile("anarchism_clean.txt").map(pre_process)
cv = CountVectorizer()
X_data = []
for prgph in documents.collect():
    X_data.append(" ".join(word for word in prgph))
vectorized = cv.fit_transform(X_data)

In [47]:
#hashingTF = HashingTF()
#tf = hashingTF.transform(documents)
#idf = IDF().fit(tf)
#tfidf = idf.transform(tf)
#tfidf.cache()

tf_operator = TfidfTransformer().fit(vectorized)
X = tf_operator.transform(vectorized)

In [5]:
tfidf.collect()[0].indices[0], tfidf.collect()[0].values[0]

(19605, 4.4697322150929875)

In [6]:
tfidf.collect()[0]

SparseVector(1048576, {19605: 4.4697, 33950: 3.7766, 36751: 3.0034, 58131: 4.4697, 71479: 4.8752, 78557: 4.4697, 80303: 2.9293, 103856: 4.8752, 104239: 4.8752, 104251: 4.3236, 110561: 2.7958, 120541: 4.1821, 121485: 4.4697, 129749: 3.0034, 130175: 3.1704, 169818: 4.8752, 172174: 4.8752, 174017: 3.6224, 183018: 4.8752, 183064: 3.0034, 186843: 11.183, 204364: 8.3641, 213079: 11.7171, 219717: 4.8752, 234843: 3.3711, 241604: 4.4697, 246259: 8.3641, 251525: 3.6224, 274871: 3.4889, 293620: 2.5726, 297254: 7.2449, 297909: 3.7766, 300012: 4.8752, 364791: 6.7084, 368996: 3.3711, 369003: 2.5238, 381981: 4.4697, 382964: 2.8603, 382966: 3.3711, 382967: 3.7766, 385317: 3.6224, 393139: 1.8548, 405588: 4.8752, 418086: 2.9293, 441877: 8.3641, 449119: 4.8752, 449905: 4.4697, 474779: 6.7422, 480946: 4.8752, 490231: 3.9589, 494521: 4.1821, 525521: 4.8752, 533989: 4.8752, 535067: 3.7766, 535972: 2.9293, 541251: 4.8752, 542678: 4.8752, 550870: 3.3711, 552359: 13.6757, 555981: 4.8752, 562497: 4.4697, 563337

In [62]:
docs = [[j for i in range(len(tfidf.collect()[j].indices))] for j in range(len(tfidf.collect()))]

In [63]:
indices = [[i for i in tfidf.collect()[j].indices] for j in range(len(tfidf.collect()))]
values = [[i for i in tfidf.collect()[j].values] for j in range(len(tfidf.collect()))]

In [64]:
indices_list = [index for line in indices for index in line]
values_list = [value for line in values for value in line]
docs_list = [doc for line in docs for doc in line]

#df = sqlContext.createDataFrame(tfidf.collect(), ['features'])

In [66]:
indices_set = set(indices_list)
indices_map = {}
counter = 0
for index in indices_set:
    if index not in indices_map:
        indices_map.update({index: counter})
        counter += 1

indices_list_mapped = [indices_map[index] for index in indices_list]
indices_list_mapped[1]

407

In [67]:
indices_list_mapped[0], values_list[0], docs_list[0]

(1168, 4.4697322150929875, 0)

In [68]:
m = csr_matrix((values_list,(docs_list, indices_list_mapped)))

In [70]:
svd = TruncatedSVD(n_components = 500)
svd.fit(m)

TruncatedSVD(algorithm='randomized', n_components=500, n_iter=5,
       random_state=None, tol=0.0)