1 parent 94fa817 commit 31ecd70b16553d5c9f70c60b5ad29ed8db7ef837 flavianv committed
29 tutorial/MatrixTutorial1.scala
 @@ -0,0 +1,29 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial1.scala +* +* Loads a directed graph adjacency matrix where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and compute the co-follows between any two nodes +* +* ../scripts/scald.rb --local MatrixTutorial1.scala --input data/graph.tsv --output data/cofollows.tsv +* +*/ + + +class CofollowsJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix = Tsv( args("input"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + // compute the innerproduct of the adjacency matrix with itself + (adjacencyMatrix * adjacencyMatrix.transpose).write( Tsv( args("output") ) ) +} +
40 tutorial/MatrixTutorial2.scala
 @@ -0,0 +1,40 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial2.scala +* +* Loads a directed graph adjacency matrix where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and returns a graph containing only the nodes with outdegree smaller than a given value +* +* ../scripts/scald.rb --local MatrixTutorial2.scala --input data/graph.tsv --maxOutdegree 1000 --output data/graphFiltered.tsv +* +*/ + + +class FilterOutdegreeJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix = Tsv( args("input"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + // Each row corresponds to the outgoing edges so to compute the outdegree we sum out the columns + val outdegree = adjacencyMatrix.sumColVectors + + // We convert the column vector to a matrix object to be able to use the matrix method filterValues + // we make all non zero values into ones and then convert it back to column vector + val outdegreeFiltered = outdegree.toMatrix[Int](1) + .filterValues{ _ < args("maxOutdegree").toDouble } + .binarizeAs[Double].getCol(1) + + // We multiply on the left hand side with the diagonal matrix created from the column vector + // to keep only the rows with outdregree smaller than maxOutdegree + (outdegreeFiltered.diag * adjacencyMatrix).write(Tsv( args("output") ) ) + +} +
39 tutorial/MatrixTutorial3.scala
33 tutorial/MatrixTutorial4.scala
 @@ -0,0 +1,33 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial4.scala +* +* Loads a directed graph adjacency matrix where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and computes the cosine of the angle between every two pairs of vectors +* +* ../scripts/scald.rb --local MatrixTutorial4.scala --input data/graph.tsv --output data/cosineSim.tsv +* +*/ + +class ComputeCosineJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix = Tsv( args("input"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + // we compute the L2 normalized adjacency graph + val normMatrix = adjacencyMatrix.rowL2Normalize + + // we compute the innerproduct of the normalized matrix with itself + // which is equivalent with computing cosine: AA^T / ||A|| * ||A|| + (normMatrix * normMatrix.transpose).write( Tsv( args("output") ) ) + +} +
44 tutorial/MatrixTutorial5.scala
 @@ -0,0 +1,44 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + + +/* +* MatrixTutorial5.scala +* +* Loads a directed graph adjacency matrix where a[i,j] = 1 if there is an edge from a[i] to b[j] +* and computes the jaccard similarity between any two pairs of vectors +* +* ../scripts/scald.rb --local MatrixTutorial5.scala --input data/graph.tsv --output data/jaccardSim.tsv +* +*/ + +class ComputeJaccardJob(args : Args) extends Job(args) { + + import Matrix._ + + val adjacencyMatrix = Tsv( args("input"), ('user1, 'user2, 'rel) ) + .read + .toMatrix[Long,Long,Double]('user1, 'user2, 'rel) + + val aBinary = adjacencyMatrix.binarizeAs[Double] + + // intersectMat holds the size of the intersection of row(a)_i n row (b)_j + val intersectMat = aBinary * aBinary.transpose + val aSumVct = aBinary.sumColVectors + val bSumVct = aBinary.sumRowVectors + + //Using zip to repeat the row and column vectors values on the right hand + //for all non-zeroes on the left hand matrix + val xMat = intersectMat.zip(aSumVct).mapValues( pair => pair._2 ) + val yMat = intersectMat.zip(bSumVct).mapValues( pair => pair._2 ) + + val unionMat = xMat + yMat - intersectMat + //We are guaranteed to have Double both in the intersection and in the union matrix + intersectMat.zip(unionMat) + .mapValues( pair => pair._1 / pair._2 ) + .write(Tsv( args("output") )) + +} +
40 tutorial/MatrixTutorial6.scala
 @@ -0,0 +1,40 @@ +package com.twitter.scalding.examples + +import com.twitter.scalding._ +import com.twitter.scalding.mathematics.Matrix + +/* +* MatrixTutorial6.scala +* +* Loads a document to word matrix where a[i,j] = freq of the word j in the document i +* computes the Tf-Idf score of each word w.r.t. to each document and keeps the top nrWords in each document +* (see http://en.wikipedia.org/wiki/Tf*idf for more info) +* +* ../scripts/scald.rb --local MatrixTutorial6.scala --input data/docBOW.tsv --nrWords 300 --output data/featSelectedMatrix.tsv +* +*/ + +class TfIdfJob(args : Args) extends Job(args) { + + import Matrix._ + + val docWordMatrix = Tsv( args("input"), ('doc, 'word, 'count) ) + .read + .toMatrix[Long,String,Double]('doc, 'word, 'count) + + // compute the overall document frequency of each row + val docFreq = docWordMatrix.sumRowVectors + + // compute the inverse document frequency vector + val invDocFreqVct = docFreq.toMatrix(1).rowL1Normalize.mapValues( x => log2(1/x) ) + + // zip the row vector along the entire document - word matrix + val invDocFreqMat = docWordMatrix.zip(invDocFreqVct.getRow(1)).mapValues( pair => pair._2 ) + + // multiply the term frequency with the inverse document frequency and keep the top nrWords + docWordMatrix.hProd(invDocFreqMat).topRowElems( args("nrWords").toInt ).write(Tsv( args("output") )) + + def log2(x : Double) = scala.math.log(x)/scala.math.log(2.0) + +} +
9 tutorial/data/docBOW.tsv
 @@ -0,0 +1,9 @@ +1 hello 2 +1 twitter 1 +2 conversation 1 +2 celebrities 1 +2 twitter 1 +3 elections 1 +3 debate 1 +3 twitter 1 +3 political 1
4 tutorial/data/graph.tsv
 @@ -1,3 +1,5 @@ 1 2 1 1 3 1 -2 3 1 +3 2 1 +4 2 2 +
3 tutorial/data/graph2.tsv
 @@ -0,0 +1,3 @@ +1 2 1 +1 3 1 +2 3 1
3 tutorial/data/helloDoc.txt
 @@ -0,0 +1,3 @@ +1 Hello world +2 See ya soon world +3 Hello again world