## TF-IDF ##

In [120]:
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix, DistributedMatrix
from pyspark.mllib.linalg import Vectors
import math

** Peso de um termo dado um documento **

In [8]:
def termDocWeight(termFrequencyInDoc, totalTermsInDoc, termFreqInCorpus, totalDocs):
    tf = termFrequencyInDoc / totalTermsInDoc
    docFreq = totalDocs/termFreqInCorpus
    idf = math.log(docFreq)
    return tf * idf

** Entrando arquivo **

In [13]:
#este arquivo é pré-processado

documents = sc.textFile("data_works.txt").map(lambda line: line.split(" "))

** Criando dicionário de termos por documento **

In [77]:
#função q retorna um dicionário de termos/frequencia de um documento
def caclDocTermFreq(doc):
    terms = dict()
    for term in doc:
        if term in terms:
            terms[term] += 1
        else:
            terms[term] = 1
    return terms
         
#para cada documento, seu dicionário
docTermFreqs = documents.map(caclDocTermFreq)

#como será usado ao menos mais duas vezes, manteremos em memória
docTermFreqs.cache()

#
docFreqs = docTermFreqs.flatMap(lambda _: _.keys()).map(lambda _: (_, 1)).reduceByKey(lambda x1, x2: x1 + x2)


** Calculando a inversa das frequencias do documento **

In [128]:
#numero de documentos
numDocs = docTermFreqs.count()

def inverseDocFreq((term, count)):
    return (term, math.log(numDocs / count))

#idfs é um dicionário termo/inversa
idfs = docFreqs.map(inverseDocFreq).collectAsMap()

#tdicionário id/termo e o reverso
termIds = dict(enumerate(idfs.keys()))
idTerms = dict(map(reversed, termIds.iteritems()))

#Because the term ID map is fairly large and we’ll use it in a few different places, let’s broadcast it along with the IDFs:
bIdfs = sc.broadcast(idfs).value
bIdTerms = sc.broadcast(idTerms).value


** TF-IDF para cada documento **

In [149]:
def generateVectors(termFreqs):
    docTotalTerms = sum(termFreqs.values())
    
    def calcScores(TF):
        filterTF = dict((k,v) for k, v in TF.iteritems() if bIdTerms.has_key(k))
        return dict((bIdTerms[k], bIdfs[k] * TF[k] / docTotalTerms) for k, v in filterTF.iteritems())
           
    return Vectors.sparse(len(bIdTerms), calcScores(termFreqs)) 
    
vecs = docTermFreqs.map(generateVectors)
vecs.cache()

PythonRDD[279] at RDD at PythonRDD.scala:43

** SVD **

In [2]:


hashingTF = HashingTF()
tf = hashingTF.transform(documents)

tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)


#idTerms = idTerms.groupByKey().collect()
#print idTerms.collect()
#idfIgnore = IDF(minDocFreq=2).fit(tf)
#tfidfIgnore = idfIgnore.transform(tf)

In [3]:
class SVD(JavaModelWrapper):
    """Wrapper around the SVD scala case class"""
    @property
    def U(self):
        """ Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True."""
        u = self.call("U")
        if u is not None:
            return RowMatrix(u)

    @property
    def s(self):
        """Returns a DenseVector with singular values in descending order."""
        return self.call("s")

    @property
    def V(self):
        """ Returns a DenseMatrix whose columns are the right singular vectors of the SVD."""
        return self.call("V")

def computeSVD(row_matrix, k, computeU=True, rCond=1e-9):
    """
    Computes the singular value decomposition of the RowMatrix.
    The given row matrix A of dimension (m X n) is decomposed into U * s * V'T where
    * s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.
    * U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A')
    * v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A' X A)
    :param k: number of singular values to keep. We might return less than k if there are numerically zero singular values.
    :param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1
    :param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value.
    :returns: SVD object
    """
    
    java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond))
    return SVD(java_model)

In [4]:
l = tfidf.collect()
#print l
rowM = RowMatrix (tfidf) 
matrixSVD = computeSVD(rowM,True)

#for i in l:
 #   print i


In [5]:
#print matrixSVD.U.rows.collect()

In [6]:
v = matrixSVD.V
arr = v.toArray()




topTerms = []


#for i in range(0, v.numCols):
    #offs = i * v.numRows
    #termWeights = arr[offs : offs+v.numRows]
    #sortWeights =  termWeights.sort()
    
    #sortedWeights = termWeights.sort()
    #topTerms.append(sortedWeights)
    