In [None]:
import findspark
findspark.init()

from operator import add
from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.mllib.linalg import Vectors,DenseMatrix
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

import numpy as np
from scipy.sparse import csr_matrix as sp

from sklearn.preprocessing import normalize

from math import log

In [None]:
sc = SparkContext()

In [None]:
class SVD(JavaModelWrapper):
    """Wrapper around the SVD scala case class"""
    @property
    def U(self):
        """ Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True."""
        u = self.call("U")
        if u is not None:
        	return RowMatrix(u)

    @property
    def s(self):
        """Returns a DenseVector with singular values in descending order."""
        return self.call("s")

    @property
    def V(self):
        """ Returns a DenseMatrix whose columns are the right singular vectors of the SVD."""
        return self.call("V")


In [None]:
def computeSVD(row_matrix, k, computeU=False, rCond=1e-9):
    """
    Computes the singular value decomposition of the RowMatrix.
    The given row matrix A of dimension (m X n) is decomposed into U * s * V'T where
    * s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.
    * U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A')
    * v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A' X A)
    :param k: number of singular values to keep. We might return less than k if there are numerically zero singular values.
    :param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1
    :param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value.
    :returns: SVD object
    """
    java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond))
    return SVD(java_model)

def pre_process(line):
    return [stemmer.stem(word) for word in word_tokenize(line) if word not in stopwords.words('english')]

In [None]:
def map_tf(document):
    doc_map = {}
    for term in document:
        if not term in doc_map:
            doc_map[term] = 0
        doc_map[term] += 1
    return [(x, doc_map[x]) for x in doc_map]

In [None]:
k = 50
stemmer = SnowballStemmer('english')    
documents = sc.textFile("anarchism_clean.txt").map(pre_process)
docTermFreqs = documents.map(map_tf).cache()


docFreqs = docTermFreqs.flatMap(lambda x : x).reduceByKey(add)
num_docs = docTermFreqs.count()

In [None]:
idfs = docFreqs.map(lambda x: (x[0], log(num_docs/x[1])))
idTerms = idfs.keys().zipWithIndex()
term_ids = idTerms.map(lambda x: tuple(reversed(x)))

dict_id_terms = dict(idTerms.collect())
dict_terms_id = dict(term_ids.collect())
dict_term_freqs = dict(docFreqs.collect())
dict_idfs = dict(idfs.collect())
num_terms = len(dict_id_terms)

In [None]:
def mapping_function(termFreqs):
    docTotalTerms = sum([value[1] for value in termFreqs])
    return Vectors.sparse(num_terms, \
                          [(dict_id_terms[term[0]], dict_idfs[term[0]]*dict_term_freqs[term[0]]/docTotalTerms) \
                           for term in termFreqs])
    
vecs = docTermFreqs.map(mapping_function).cache()


In [None]:
mat = RowMatrix(vecs)
svd = computeSVD(mat,k,True)

In [None]:
def topTerms(svd, numConcepts, numTerms, termsIds):
    v = svd.V
    topTerms = []
    arr = v.toArray().ravel()
    for i in range(numConcepts):
        offs = i*v.numRows
        termWeights = [(termsIds[j], arr[j]) for j in range(0, v.numRows)]
        weights_sorted = sorted(termWeights, key=lambda x: x[1].all(), reverse=True)
        topTerms += weights_sorted[:numTerms]
    return topTerms
        
def topDocsInTopConcepts(svd, numConcepts, numDocs, docIds):
    u = svd.U
    topDocs = []
    for i in range(numConcepts):
        docWeights = u.rows.map(lambda x: x[i]).zipWithUniqueId()
        #print(i)
        #print (docWeights.top(numDocs))
        topDocs += [(x[1],x[0]) for x in docWeights.top(numDocs)]
    return topDocs



In [None]:
tt = topTerms(svd, k, 10, dict_terms_id)
td = topDocsInTopConcepts(svd,k,10,dict_terms_id)

In [None]:
def topTermsforTerm(normalizedVS,termId, k=10):
    rowVec = normalizedVS[termId,:]
    termScores = [(i,t) for i,t in zip(range(len(rowVec)),normalizedVS.dot(rowVec))]
    #print(termScores)
    return sorted(termScores,key = lambda x: x[1], reverse=True)[:k]

def multiplyByDiagonalMatrix(A, D):
    if isinstance(A,RowMatrix):
        n_cols = A.numCols()
        n_rows = A.numRows()
        A = A.rows.collect()
    else:
        n_cols = A.numCols
        n_rows = A.numRows
        
    mat = np.empty([n_rows,n_cols])
    for i in range(0,n_rows):
        for j in range(0,n_cols):
            mat[i,j] = A[i][j]*D[j]
    return mat

def printRelevantTerms(term, svd, k=10):
    stemmed_term = stemmer.stem(term)
    if stemmed_term not in dict_id_terms:
        print ("Term unknown")
    else:
        _id = dict_id_terms[stemmed_term]
        vs = multiplyByDiagonalMatrix(svd.V,svd.s)
        normalized_vs = normalize(vs, axis=1, norm="l2")
        topTerms = topTermsforTerm(normalized_vs,_id, k)

        for t in topTerms:
            print(dict_terms_id[t[0]],' -> ',t[1])


In [None]:
def topDocsForDoc(normalizedUS, docId, k=10):
    docRowArr = normalizedUS[docId,:]
    docScores = normalizedUS.dot(docRowArr)
    allDocsWeights = [(i,d) for i,d in zip(range(len(docScores)),docScores)]
    return sorted(allDocsWeights, key=lambda x: x[1], reverse = True)[:k]

def printRelevantDocsforDoc(doc,svd,k=10):
    _id = doc
    us = multiplyByDiagonalMatrix(svd.U, svd.s)
    normalized_us = normalize(us, axis=1, norm="l2")
    
    topDocs = topDocsForDoc(normalized_us, _id,k)
    for d in topDocs:
        print('Documento:',d[0],'\tSimilaridade:\t',d[1])

In [None]:
def topDocsForTerm(svd,termId,k=10):
    rowArr = svd.V.toArray()[termId]
    us = multiplyByDiagonalMatrix(svd.U,svd.s)
    normalized_us = normalize(us, axis=1, norm="l2")
    docScores = normalized_us.dot(rowArr)
    allDocsWeights = [(i,d) for i,d in zip(range(len(docScores)),docScores)]
    return sorted(allDocsWeights, key=lambda x: x[1], reverse = True)[:k]
    
def printRelevantDocsforTerm(term,svd,k=10):
    stemmed_term = stemmer.stem(term)
    if stemmed_term not in dict_id_terms:
        print ("Term unknown")
    else:
        _id = dict_id_terms[stemmed_term]
        topDocs = topDocsForTerm(svd,_id,k)
        for d in topDocs:
            print('Documento:',d[0],'\tRelevancia:\t',d[1])


In [None]:
def termsToQueryVector(terms):
    global dict_id_terms
    ids = [dict_id_terms[stemmer.stem(t)] for t in terms if stemmer.stem(t) in dict_id_terms]
    values = [dict_idfs[stemmer.stem(t)] for t in terms if stemmer.stem(t) in dict_id_terms] 
    return Vectors.sparse(len(dict_id_terms),zip(ids,values)) 

In [None]:
def topDocsForTermQuery(svd,query):
    termRowVec = svd.V.toArray().T.dot(query.toArray())
    US = multiplyByDiagonalMatrix(svd.U,svd.s)
    docScores = US.dot(termRowVec)
    allDocWeights = sorted([(i,docScores[i]) for i in range(docScores.shape[0])], key=lambda x: x[1], reverse=True)
    return allDocWeights[:10]

def printRelevantDocs(terms):
    global svd
    queryVec = termsToQueryVector(terms)
    print("Para os termos ", terms)
    for doc, relev in topDocsForTermQuery(svd, queryVec):
        print("Documento {},\tpossui relevancia {}".format(doc, relev))